diff --git a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionEventListener.java b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionEventListener.java index 684752d9c8..2dbfd6899c 100644 --- a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionEventListener.java +++ b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionEventListener.java @@ -35,7 +35,7 @@ public class ConnectionEventListener { String clientId = event.getClientId(); // 用户clientId格式: uid/client/deviceUid final String uid = clientId.split("/")[0]; - // log.info("agent onMqttConnectedEvent uid {}, clientId {}", uid, clientId); + log.info("user onMqttConnectedEvent uid {}, clientId {}", uid, clientId); // 标记连接(使用 ConnectionEntity 支持多端在线) // 无法从事件中获取更多上下文,使用协议 MQTT,其它信息置空/默认 connectionRestService.markConnected(uid, null, clientId, @@ -47,13 +47,10 @@ public class ConnectionEventListener { public void onMqttDisconnectedEvent(MqttDisconnectedEvent event) { String clientId = event.getClientId(); // 用户clientId格式: uid/client/deviceUid - // final String uid = clientId.split("/")[0]; - // log.info("agent onMqttDisconnectedEvent uid {}, clientId {}", uid, clientId); + final String uid = clientId.split("/")[0]; + log.info("user onMqttDisconnectedEvent uid {}, clientId {}", uid, clientId); // 先标记该 client 断开 connectionRestService.markDisconnected(clientId); - // 根据 ConnectionEntity 汇总判断是否仍在线(多端) - // boolean online = connectionRestService.isUserOnline(uid); - // agentRestService.updateConnect(uid, online); } /** diff --git a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestController.java b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestController.java index 5310f3fe2c..865e2343d2 100644 --- a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestController.java +++ b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestController.java @@ -13,6 +13,8 @@ */ package com.bytedesk.core.socket.connection; +import java.util.List; + import org.springframework.data.domain.Page; import org.springframework.http.ResponseEntity; // import org.springframework.security.access.prepost.PreAuthorize; @@ -127,6 +129,14 @@ public class ConnectionRestController extends BaseRestController listActiveConnections(@PathVariable("userUid") String userUid) { + List list = connectionRestService.listActiveConnections(userUid); + return ResponseEntity.ok(JsonResult.success(list)); + } + } \ No newline at end of file diff --git a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestService.java b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestService.java index f09b0d9a8f..84f8fb34f2 100644 --- a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestService.java +++ b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestService.java @@ -15,6 +15,7 @@ package com.bytedesk.core.socket.connection; import java.util.Optional; import java.util.List; +import java.util.ArrayList; import org.modelmapper.ModelMapper; import org.springframework.cache.annotation.Cacheable; @@ -86,10 +87,26 @@ public class ConnectionRestService extends BaseRestServiceWithExport optional = connectionRepository.findByClientId(clientId); - optional.ifPresent(entity -> { + if (optional.isPresent()) { + ConnectionEntity entity = optional.get(); entity.setLastHeartbeatAt(System.currentTimeMillis()); save(entity); - }); + } else { + // 兜底:若不存在记录(可能未触发连接事件),尝试按 clientId 格式 uid/client/deviceUid 创建 + if (clientId != null && clientId.contains("/")) { + try { + String[] parts = clientId.split("/"); + String userUid = parts.length > 0 ? parts[0] : null; + String deviceUid = parts.length > 2 ? parts[2] : null; + if (userUid != null) { + // 使用 MQTT 协议默认 TTL,其他上下文信息留空 + markConnected(userUid, null, clientId, deviceUid, "MQTT", null, null, null, null); + } + } catch (Exception ignore) { + // 保守处理,避免异常中断心跳 + } + } + } } /** Mark a client connection as disconnected */ @@ -174,6 +191,23 @@ public class ConnectionRestService extends BaseRestServiceWithExport 0).activeCount(active).build(); } + @Transactional(readOnly = true) + public List listActiveConnections(String userUid) { + List list = connectionRepository.findByUserUidAndDeletedFalse(userUid); + long now = System.currentTimeMillis(); + List result = new ArrayList<>(); + for (ConnectionEntity c : list) { + if (!CONNECTED.name().equals(c.getStatus())) continue; + Long last = c.getLastHeartbeatAt(); + Integer ttl = c.getTtlSeconds(); + if (last == null || ttl == null) continue; + if (last + ttl * 1000L >= now) { + result.add(convertToResponse(c)); + } + } + return result; + } + @Override protected Specification createSpecification(ConnectionRequest request) { return ConnectionSpecification.search(request, authService); diff --git a/modules/core/src/main/java/com/bytedesk/core/socket/mqtt/protocol/PingReq.java b/modules/core/src/main/java/com/bytedesk/core/socket/mqtt/protocol/PingReq.java index df46e8d37a..ba59920287 100755 --- a/modules/core/src/main/java/com/bytedesk/core/socket/mqtt/protocol/PingReq.java +++ b/modules/core/src/main/java/com/bytedesk/core/socket/mqtt/protocol/PingReq.java @@ -16,6 +16,7 @@ package com.bytedesk.core.socket.mqtt.protocol; import com.bytedesk.core.socket.mqtt.MqttChannelUtils; import com.bytedesk.core.socket.mqtt.service.MqttConnectionService; +import com.bytedesk.core.socket.connection.ConnectionRestService; import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.*; @@ -27,6 +28,7 @@ import lombok.AllArgsConstructor; public class PingReq { private final MqttConnectionService mqttConnectionService; + private final ConnectionRestService connectionRestService; // client send ping every 30 seconds public void processPingReq(Channel channel, MqttMessage message) { @@ -34,6 +36,14 @@ public class PingReq { String clientId = MqttChannelUtils.getClientId(channel); // log.info("PINGREQ - clientId: {}", clientId); mqttConnectionService.addConnected(clientId); + // 同步刷新数据库心跳,保障 Presence TTL 不会过期 + try { + if (clientId != null) { + connectionRestService.heartbeat(clientId); + } + } catch (Exception ignore) { + // 避免心跳异常影响协议响应 + } // send ping response PINGRESP MqttMessage pingRespMessage = MqttMessageFactory.newMessage( diff --git a/modules/core/src/main/java/com/bytedesk/core/socket/mqtt/protocol/ProtocolProcess.java b/modules/core/src/main/java/com/bytedesk/core/socket/mqtt/protocol/ProtocolProcess.java index 99d2de4015..cc65496e20 100755 --- a/modules/core/src/main/java/com/bytedesk/core/socket/mqtt/protocol/ProtocolProcess.java +++ b/modules/core/src/main/java/com/bytedesk/core/socket/mqtt/protocol/ProtocolProcess.java @@ -6,6 +6,7 @@ import com.bytedesk.core.socket.mqtt.service.MqttAuthService; import com.bytedesk.core.socket.mqtt.service.MqttConnectionService; import com.bytedesk.core.socket.mqtt.service.MqttMessageIdService; import com.bytedesk.core.socket.mqtt.service.MqttSessionService; +import com.bytedesk.core.socket.connection.ConnectionRestService; import lombok.Data; @@ -34,6 +35,9 @@ public class ProtocolProcess { @Autowired private MqttConnectionService mqttConnectionService; + @Autowired + private ConnectionRestService connectionRestService; + private Connect connect; private Subscribe subscribe; @@ -91,7 +95,7 @@ public class ProtocolProcess { public PingReq pingReq() { if (pingReq == null) { - pingReq = new PingReq(mqttConnectionService); + pingReq = new PingReq(mqttConnectionService, connectionRestService); } return pingReq; }