From 9f83fa08f1c6836a5809ed03c7ecfa37045e251b Mon Sep 17 00:00:00 2001 From: jack ning Date: Tue, 11 Nov 2025 14:10:23 +0800 Subject: [PATCH] update --- .../ConnectionHeartbeatFlushTask.java | 79 +++++++++++++++++++ .../socket/connection/ConnectionMetrics.java | 47 +++++++++++ .../connection/ConnectionRestService.java | 64 ++++++++++++++- 3 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionHeartbeatFlushTask.java create mode 100644 modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionMetrics.java diff --git a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionHeartbeatFlushTask.java b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionHeartbeatFlushTask.java new file mode 100644 index 0000000000..6f1a92a54d --- /dev/null +++ b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionHeartbeatFlushTask.java @@ -0,0 +1,79 @@ +/* + * Scheduled task to flush cached heartbeats from Redis to DB in batches. + */ +package com.bytedesk.core.socket.connection; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.bytedesk.core.constant.RedisConsts; + +import org.springframework.data.redis.core.StringRedisTemplate; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ConnectionHeartbeatFlushTask { + + private final StringRedisTemplate stringRedisTemplate; + private final ConnectionRestService connectionRestService; + + private static final String REDIS_HEARTBEAT_HASH_KEY = RedisConsts.BYTEDESK_REDIS_PREFIX + "core:conn:hb"; + private static final String REDIS_LAST_DB_WRITE_HASH_KEY = RedisConsts.BYTEDESK_REDIS_PREFIX + "core:conn:hb:lastdb"; + + // 每 10 秒批量刷新一次 + @Scheduled(fixedDelay = 10_000) + @Transactional + public void flush() { + if (stringRedisTemplate == null) { + return; + } + try { + long start = System.nanoTime(); + Map entries = stringRedisTemplate.opsForHash().entries(REDIS_HEARTBEAT_HASH_KEY); + if (entries == null || entries.isEmpty()) { + return; + } + Map heartbeats = new HashMap<>(); + for (Map.Entry e : entries.entrySet()) { + String clientId = String.valueOf(e.getKey()); + Object v = e.getValue(); + try { + long ts = Long.parseLong(String.valueOf(v)); + heartbeats.put(clientId, ts); + } catch (NumberFormatException ignore) { + } + } + if (heartbeats.isEmpty()) return; + + int updated = connectionRestService.flushHeartbeatCacheBatch(heartbeats); + long elapsed = System.nanoTime() - start; + log.debug("flushHeartbeatCacheBatch updated={} size={} elapsedMs={}", updated, heartbeats.size(), elapsed/1_000_000); + + // 清理已持久化的数据,避免哈希无限增长: + // 原则:若 lastdb >= hbTs,则可以安全移除该 clientId 的缓存心跳 + for (Map.Entry e : heartbeats.entrySet()) { + String clientId = e.getKey(); + Long hbTs = e.getValue(); + Object lastDbStr = stringRedisTemplate.opsForHash().get(REDIS_LAST_DB_WRITE_HASH_KEY, clientId); + if (lastDbStr != null) { + try { + long lastDb = Long.parseLong(String.valueOf(lastDbStr)); + if (lastDb >= hbTs) { + stringRedisTemplate.opsForHash().delete(REDIS_HEARTBEAT_HASH_KEY, clientId); + } + } catch (NumberFormatException ignore) {} + } + } + } catch (Exception e) { + log.warn("flushHeartbeatCacheBatch error: {}", e.getMessage()); + } + } +} diff --git a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionMetrics.java b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionMetrics.java new file mode 100644 index 0000000000..5fb5c7b5aa --- /dev/null +++ b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionMetrics.java @@ -0,0 +1,47 @@ +package com.bytedesk.core.socket.connection; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.stereotype.Component; + +/** + * Metrics for connection heartbeat & presence performance. + */ +@Component +@RequiredArgsConstructor +@ConditionalOnClass(MeterRegistry.class) +public class ConnectionMetrics { + + private final MeterRegistry meterRegistry; + + private Counter heartbeatCalls; + private Counter heartbeatDbWrites; + private Counter heartbeatSkipped; + private Counter heartbeatCreated; + private Timer flushTimer; + private DistributionSummary batchSizeSummary; + + @PostConstruct + public void init() { + heartbeatCalls = meterRegistry.counter("conn.hb.calls"); + heartbeatDbWrites = meterRegistry.counter("conn.hb.dbwrites"); + heartbeatSkipped = meterRegistry.counter("conn.hb.skipped"); + heartbeatCreated = meterRegistry.counter("conn.hb.created"); + flushTimer = meterRegistry.timer("conn.hb.flush.timer"); + batchSizeSummary = DistributionSummary.builder("conn.hb.flush.batch.size").register(meterRegistry); + } + + public void incHeartbeatCall() { heartbeatCalls.increment(); } + public void incDbWrite() { heartbeatDbWrites.increment(); } + public void incSkipped() { heartbeatSkipped.increment(); } + public void incCreated() { heartbeatCreated.increment(); } + public void recordFlush(long nanos, int batchSize) { + flushTimer.record(nanos, java.util.concurrent.TimeUnit.NANOSECONDS); + batchSizeSummary.record(batchSize); + } +} diff --git a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestService.java b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestService.java index a4cf0f383d..0bf36ac081 100644 --- a/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestService.java +++ b/modules/core/src/main/java/com/bytedesk/core/socket/connection/ConnectionRestService.java @@ -26,6 +26,7 @@ import org.springframework.orm.ObjectOptimisticLockingFailureException; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; +import org.springframework.data.redis.core.StringRedisTemplate; import com.bytedesk.core.base.BaseRestServiceWithExport; import com.bytedesk.core.rbac.auth.AuthService; import com.bytedesk.core.rbac.user.UserEntity; @@ -51,6 +52,16 @@ public class ConnectionRestService extends BaseRestServiceWithExport 0) { + cacheLastDbWrite(clientId, now); + connectionMetrics.incDbWrite(); + } else { // 未更新,可能记录不存在或刚刚已写入;尝试兜底创建(只在不存在时) if (!connectionRepository.existsByClientId(clientId) && clientId.contains("/")) { try { @@ -103,13 +119,57 @@ public class ConnectionRestService extends BaseRestServiceWithExport 2 ? parts[2] : null; if (userUid != null) { markConnected(userUid, null, clientId, deviceUid, ConnectionProtocalEnum.MQTT.name(), null, null, null, null); + cacheLastDbWrite(clientId, now); + connectionMetrics.incCreated(); } } catch (Exception ignore) { } + } else { + // 间隔不足,跳过数据库写 + connectionMetrics.incSkipped(); } } } + private void tryWriteHeartbeatToCache(String clientId, long ts) { + try { + if (stringRedisTemplate != null) { + stringRedisTemplate.opsForHash().put(REDIS_HEARTBEAT_HASH_KEY, clientId, String.valueOf(ts)); + } + } catch (Exception ignore) {} + } + + private void cacheLastDbWrite(String clientId, long ts) { + try { + if (stringRedisTemplate != null) { + stringRedisTemplate.opsForHash().put(REDIS_LAST_DB_WRITE_HASH_KEY, clientId, String.valueOf(ts)); + } + } catch (Exception ignore) {} + } + + /** + * 批量刷新心跳缓存到数据库,返回成功更新条数。 + * 该方法供定时任务调用。 + */ + @Transactional + public int flushHeartbeatCacheBatch(java.util.Map heartbeats) { + if (heartbeats == null || heartbeats.isEmpty()) return 0; + int updatedCount = 0; + long now = System.currentTimeMillis(); + long threshold = now - MIN_INTERVAL_MS; + for (java.util.Map.Entry e : heartbeats.entrySet()) { + String clientId = e.getKey(); + Long hbTs = e.getValue(); + if (clientId == null || hbTs == null) continue; + int u = connectionRepository.updateHeartbeatIfOlder(clientId, hbTs, threshold); + if (u > 0) { + updatedCount += u; + cacheLastDbWrite(clientId, hbTs); + } + } + return updatedCount; + } + /** Mark a client connection as disconnected */ @Transactional public void markDisconnected(String clientId) {