This commit is contained in:
jack ning
2025-11-11 13:37:22 +08:00
parent fb01fca93e
commit 7f35755bf9
5 changed files with 64 additions and 9 deletions

View File

@@ -35,7 +35,7 @@ public class ConnectionEventListener {
String clientId = event.getClientId();
// 用户clientId格式: uid/client/deviceUid
final String uid = clientId.split("/")[0];
// log.info("agent onMqttConnectedEvent uid {}, clientId {}", uid, clientId);
log.info("user onMqttConnectedEvent uid {}, clientId {}", uid, clientId);
// 标记连接(使用 ConnectionEntity 支持多端在线)
// 无法从事件中获取更多上下文,使用协议 MQTT其它信息置空/默认
connectionRestService.markConnected(uid, null, clientId,
@@ -47,13 +47,10 @@ public class ConnectionEventListener {
public void onMqttDisconnectedEvent(MqttDisconnectedEvent event) {
String clientId = event.getClientId();
// 用户clientId格式: uid/client/deviceUid
// final String uid = clientId.split("/")[0];
// log.info("agent onMqttDisconnectedEvent uid {}, clientId {}", uid, clientId);
final String uid = clientId.split("/")[0];
log.info("user onMqttDisconnectedEvent uid {}, clientId {}", uid, clientId);
// 先标记该 client 断开
connectionRestService.markDisconnected(clientId);
// 根据 ConnectionEntity 汇总判断是否仍在线(多端)
// boolean online = connectionRestService.isUserOnline(uid);
// agentRestService.updateConnect(uid, online);
}
/**

View File

@@ -13,6 +13,8 @@
*/
package com.bytedesk.core.socket.connection;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.http.ResponseEntity;
// import org.springframework.security.access.prepost.PreAuthorize;
@@ -127,6 +129,14 @@ public class ConnectionRestController extends BaseRestController<ConnectionReque
return ResponseEntity.ok(JsonResult.success(presence));
}
@ActionAnnotation(title = "在线", action = "查询", description = "active connections by user")
@Operation(summary = "List Active Connections", description = "List user's active (non-expired) connections")
@GetMapping("/presence/{userUid}/list")
public ResponseEntity<?> listActiveConnections(@PathVariable("userUid") String userUid) {
List<ConnectionResponse> list = connectionRestService.listActiveConnections(userUid);
return ResponseEntity.ok(JsonResult.success(list));
}
}

View File

@@ -15,6 +15,7 @@ package com.bytedesk.core.socket.connection;
import java.util.Optional;
import java.util.List;
import java.util.ArrayList;
import org.modelmapper.ModelMapper;
import org.springframework.cache.annotation.Cacheable;
@@ -86,10 +87,26 @@ public class ConnectionRestService extends BaseRestServiceWithExport<ConnectionE
@Transactional
public void heartbeat(String clientId) {
Optional<ConnectionEntity> optional = connectionRepository.findByClientId(clientId);
optional.ifPresent(entity -> {
if (optional.isPresent()) {
ConnectionEntity entity = optional.get();
entity.setLastHeartbeatAt(System.currentTimeMillis());
save(entity);
});
} else {
// 兜底:若不存在记录(可能未触发连接事件),尝试按 clientId 格式 uid/client/deviceUid 创建
if (clientId != null && clientId.contains("/")) {
try {
String[] parts = clientId.split("/");
String userUid = parts.length > 0 ? parts[0] : null;
String deviceUid = parts.length > 2 ? parts[2] : null;
if (userUid != null) {
// 使用 MQTT 协议默认 TTL其他上下文信息留空
markConnected(userUid, null, clientId, deviceUid, "MQTT", null, null, null, null);
}
} catch (Exception ignore) {
// 保守处理,避免异常中断心跳
}
}
}
}
/** Mark a client connection as disconnected */
@@ -174,6 +191,23 @@ public class ConnectionRestService extends BaseRestServiceWithExport<ConnectionE
return PresenceResponse.builder().online(active > 0).activeCount(active).build();
}
@Transactional(readOnly = true)
public List<ConnectionResponse> listActiveConnections(String userUid) {
List<ConnectionEntity> list = connectionRepository.findByUserUidAndDeletedFalse(userUid);
long now = System.currentTimeMillis();
List<ConnectionResponse> result = new ArrayList<>();
for (ConnectionEntity c : list) {
if (!CONNECTED.name().equals(c.getStatus())) continue;
Long last = c.getLastHeartbeatAt();
Integer ttl = c.getTtlSeconds();
if (last == null || ttl == null) continue;
if (last + ttl * 1000L >= now) {
result.add(convertToResponse(c));
}
}
return result;
}
@Override
protected Specification<ConnectionEntity> createSpecification(ConnectionRequest request) {
return ConnectionSpecification.search(request, authService);

View File

@@ -16,6 +16,7 @@ package com.bytedesk.core.socket.mqtt.protocol;
import com.bytedesk.core.socket.mqtt.MqttChannelUtils;
import com.bytedesk.core.socket.mqtt.service.MqttConnectionService;
import com.bytedesk.core.socket.connection.ConnectionRestService;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.*;
@@ -27,6 +28,7 @@ import lombok.AllArgsConstructor;
public class PingReq {
private final MqttConnectionService mqttConnectionService;
private final ConnectionRestService connectionRestService;
// client send ping every 30 seconds
public void processPingReq(Channel channel, MqttMessage message) {
@@ -34,6 +36,14 @@ public class PingReq {
String clientId = MqttChannelUtils.getClientId(channel);
// log.info("PINGREQ - clientId: {}", clientId);
mqttConnectionService.addConnected(clientId);
// 同步刷新数据库心跳,保障 Presence TTL 不会过期
try {
if (clientId != null) {
connectionRestService.heartbeat(clientId);
}
} catch (Exception ignore) {
// 避免心跳异常影响协议响应
}
// send ping response PINGRESP
MqttMessage pingRespMessage = MqttMessageFactory.newMessage(

View File

@@ -6,6 +6,7 @@ import com.bytedesk.core.socket.mqtt.service.MqttAuthService;
import com.bytedesk.core.socket.mqtt.service.MqttConnectionService;
import com.bytedesk.core.socket.mqtt.service.MqttMessageIdService;
import com.bytedesk.core.socket.mqtt.service.MqttSessionService;
import com.bytedesk.core.socket.connection.ConnectionRestService;
import lombok.Data;
@@ -34,6 +35,9 @@ public class ProtocolProcess {
@Autowired
private MqttConnectionService mqttConnectionService;
@Autowired
private ConnectionRestService connectionRestService;
private Connect connect;
private Subscribe subscribe;
@@ -91,7 +95,7 @@ public class ProtocolProcess {
public PingReq pingReq() {
if (pingReq == null) {
pingReq = new PingReq(mqttConnectionService);
pingReq = new PingReq(mqttConnectionService, connectionRestService);
}
return pingReq;
}