This commit is contained in:
jack ning
2025-11-11 14:10:23 +08:00
parent 32fe897766
commit 9f83fa08f1
3 changed files with 188 additions and 2 deletions

View File

@@ -0,0 +1,79 @@
/*
* Scheduled task to flush cached heartbeats from Redis to DB in batches.
*/
package com.bytedesk.core.socket.connection;
import java.util.HashMap;
import java.util.Map;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.bytedesk.core.constant.RedisConsts;
import org.springframework.data.redis.core.StringRedisTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
@RequiredArgsConstructor
public class ConnectionHeartbeatFlushTask {
private final StringRedisTemplate stringRedisTemplate;
private final ConnectionRestService connectionRestService;
private static final String REDIS_HEARTBEAT_HASH_KEY = RedisConsts.BYTEDESK_REDIS_PREFIX + "core:conn:hb";
private static final String REDIS_LAST_DB_WRITE_HASH_KEY = RedisConsts.BYTEDESK_REDIS_PREFIX + "core:conn:hb:lastdb";
// 每 10 秒批量刷新一次
@Scheduled(fixedDelay = 10_000)
@Transactional
public void flush() {
if (stringRedisTemplate == null) {
return;
}
try {
long start = System.nanoTime();
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(REDIS_HEARTBEAT_HASH_KEY);
if (entries == null || entries.isEmpty()) {
return;
}
Map<String, Long> heartbeats = new HashMap<>();
for (Map.Entry<Object, Object> e : entries.entrySet()) {
String clientId = String.valueOf(e.getKey());
Object v = e.getValue();
try {
long ts = Long.parseLong(String.valueOf(v));
heartbeats.put(clientId, ts);
} catch (NumberFormatException ignore) {
}
}
if (heartbeats.isEmpty()) return;
int updated = connectionRestService.flushHeartbeatCacheBatch(heartbeats);
long elapsed = System.nanoTime() - start;
log.debug("flushHeartbeatCacheBatch updated={} size={} elapsedMs={}", updated, heartbeats.size(), elapsed/1_000_000);
// 清理已持久化的数据,避免哈希无限增长:
// 原则:若 lastdb >= hbTs则可以安全移除该 clientId 的缓存心跳
for (Map.Entry<String, Long> e : heartbeats.entrySet()) {
String clientId = e.getKey();
Long hbTs = e.getValue();
Object lastDbStr = stringRedisTemplate.opsForHash().get(REDIS_LAST_DB_WRITE_HASH_KEY, clientId);
if (lastDbStr != null) {
try {
long lastDb = Long.parseLong(String.valueOf(lastDbStr));
if (lastDb >= hbTs) {
stringRedisTemplate.opsForHash().delete(REDIS_HEARTBEAT_HASH_KEY, clientId);
}
} catch (NumberFormatException ignore) {}
}
}
} catch (Exception e) {
log.warn("flushHeartbeatCacheBatch error: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,47 @@
package com.bytedesk.core.socket.connection;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
/**
* Metrics for connection heartbeat & presence performance.
*/
@Component
@RequiredArgsConstructor
@ConditionalOnClass(MeterRegistry.class)
public class ConnectionMetrics {
private final MeterRegistry meterRegistry;
private Counter heartbeatCalls;
private Counter heartbeatDbWrites;
private Counter heartbeatSkipped;
private Counter heartbeatCreated;
private Timer flushTimer;
private DistributionSummary batchSizeSummary;
@PostConstruct
public void init() {
heartbeatCalls = meterRegistry.counter("conn.hb.calls");
heartbeatDbWrites = meterRegistry.counter("conn.hb.dbwrites");
heartbeatSkipped = meterRegistry.counter("conn.hb.skipped");
heartbeatCreated = meterRegistry.counter("conn.hb.created");
flushTimer = meterRegistry.timer("conn.hb.flush.timer");
batchSizeSummary = DistributionSummary.builder("conn.hb.flush.batch.size").register(meterRegistry);
}
public void incHeartbeatCall() { heartbeatCalls.increment(); }
public void incDbWrite() { heartbeatDbWrites.increment(); }
public void incSkipped() { heartbeatSkipped.increment(); }
public void incCreated() { heartbeatCreated.increment(); }
public void recordFlush(long nanos, int batchSize) {
flushTimer.record(nanos, java.util.concurrent.TimeUnit.NANOSECONDS);
batchSizeSummary.record(batchSize);
}
}

View File

@@ -26,6 +26,7 @@ import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import com.bytedesk.core.base.BaseRestServiceWithExport;
import com.bytedesk.core.rbac.auth.AuthService;
import com.bytedesk.core.rbac.user.UserEntity;
@@ -51,6 +52,16 @@ public class ConnectionRestService extends BaseRestServiceWithExport<ConnectionE
private final AuthService authService;
private final PresenceTtlResolver presenceTtlResolver;
private final ConnectionMetrics connectionMetrics;
private final StringRedisTemplate stringRedisTemplate;
// Redis 缓存心跳Key
private static final String REDIS_HEARTBEAT_HASH_KEY = "core:conn:hb";
// Redis 最近一次数据库写入时间Key
private static final String REDIS_LAST_DB_WRITE_HASH_KEY = "core:conn:hb:lastdb";
// 最小数据库写入间隔(毫秒)
private static final long MIN_INTERVAL_MS = 5000L;
/* ================= Presence APIs (multi-client) ================= */
@@ -89,12 +100,17 @@ public class ConnectionRestService extends BaseRestServiceWithExport<ConnectionE
if (clientId == null || clientId.isEmpty()) {
return;
}
connectionMetrics.incHeartbeatCall();
long now = System.currentTimeMillis();
final long MIN_INTERVAL_MS = 5000L; // 基础节流策略5 秒内重复心跳不落库
long threshold = now - MIN_INTERVAL_MS;
// 先写入Redis缓存仅记录最新心跳时间减少数据库频繁写
tryWriteHeartbeatToCache(clientId, now);
// 直接使用轻量条件更新,避免读取+save 整实体开销
int updated = connectionRepository.updateHeartbeatIfOlder(clientId, now, threshold);
if (updated == 0) {
if (updated > 0) {
cacheLastDbWrite(clientId, now);
connectionMetrics.incDbWrite();
} else {
// 未更新,可能记录不存在或刚刚已写入;尝试兜底创建(只在不存在时)
if (!connectionRepository.existsByClientId(clientId) && clientId.contains("/")) {
try {
@@ -103,13 +119,57 @@ public class ConnectionRestService extends BaseRestServiceWithExport<ConnectionE
String deviceUid = parts.length > 2 ? parts[2] : null;
if (userUid != null) {
markConnected(userUid, null, clientId, deviceUid, ConnectionProtocalEnum.MQTT.name(), null, null, null, null);
cacheLastDbWrite(clientId, now);
connectionMetrics.incCreated();
}
} catch (Exception ignore) {
}
} else {
// 间隔不足,跳过数据库写
connectionMetrics.incSkipped();
}
}
}
private void tryWriteHeartbeatToCache(String clientId, long ts) {
try {
if (stringRedisTemplate != null) {
stringRedisTemplate.opsForHash().put(REDIS_HEARTBEAT_HASH_KEY, clientId, String.valueOf(ts));
}
} catch (Exception ignore) {}
}
private void cacheLastDbWrite(String clientId, long ts) {
try {
if (stringRedisTemplate != null) {
stringRedisTemplate.opsForHash().put(REDIS_LAST_DB_WRITE_HASH_KEY, clientId, String.valueOf(ts));
}
} catch (Exception ignore) {}
}
/**
* 批量刷新心跳缓存到数据库,返回成功更新条数。
* 该方法供定时任务调用。
*/
@Transactional
public int flushHeartbeatCacheBatch(java.util.Map<String, Long> heartbeats) {
if (heartbeats == null || heartbeats.isEmpty()) return 0;
int updatedCount = 0;
long now = System.currentTimeMillis();
long threshold = now - MIN_INTERVAL_MS;
for (java.util.Map.Entry<String, Long> e : heartbeats.entrySet()) {
String clientId = e.getKey();
Long hbTs = e.getValue();
if (clientId == null || hbTs == null) continue;
int u = connectionRepository.updateHeartbeatIfOlder(clientId, hbTs, threshold);
if (u > 0) {
updatedCount += u;
cacheLastDbWrite(clientId, hbTs);
}
}
return updatedCount;
}
/** Mark a client connection as disconnected */
@Transactional
public void markDisconnected(String clientId) {