This commit is contained in:
jack ning
2025-11-24 15:43:56 +08:00
parent d53b5ab095
commit 4ddef78f0a
4 changed files with 92 additions and 71 deletions

View File

@@ -50,4 +50,9 @@ public class RedisConsts {
// 验证码相关常量 // 验证码相关常量
public static final String KAPTCHA_PREFIX = BYTEDESK_REDIS_PREFIX + "kaptcha:"; public static final String KAPTCHA_PREFIX = BYTEDESK_REDIS_PREFIX + "kaptcha:";
// Redis 缓存心跳Key
public static final String REDIS_HEARTBEAT_HASH_KEY = RedisConsts.BYTEDESK_REDIS_PREFIX + "core:conn:hb";
// Redis 最近一次数据库写入时间Key
public static final String REDIS_LAST_DB_WRITE_HASH_KEY = RedisConsts.BYTEDESK_REDIS_PREFIX + "core:conn:hb:lastdb";
} }

View File

@@ -25,9 +25,7 @@ public class ConnectionHeartbeatFlushTask {
private final StringRedisTemplate stringRedisTemplate; private final StringRedisTemplate stringRedisTemplate;
private final ConnectionRestService connectionRestService; private final ConnectionRestService connectionRestService;
private static final String REDIS_HEARTBEAT_HASH_KEY = RedisConsts.BYTEDESK_REDIS_PREFIX + "core:conn:hb";
private static final String REDIS_LAST_DB_WRITE_HASH_KEY = RedisConsts.BYTEDESK_REDIS_PREFIX + "core:conn:hb:lastdb";
// 每 10 秒批量刷新一次 // 每 10 秒批量刷新一次
@Scheduled(fixedDelay = 10_000) @Scheduled(fixedDelay = 10_000)
@Transactional @Transactional
@@ -37,7 +35,7 @@ public class ConnectionHeartbeatFlushTask {
} }
try { try {
long start = System.nanoTime(); long start = System.nanoTime();
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(REDIS_HEARTBEAT_HASH_KEY); Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(RedisConsts.REDIS_HEARTBEAT_HASH_KEY);
if (entries == null || entries.isEmpty()) { if (entries == null || entries.isEmpty()) {
return; return;
} }
@@ -62,12 +60,12 @@ public class ConnectionHeartbeatFlushTask {
for (Map.Entry<String, Long> e : heartbeats.entrySet()) { for (Map.Entry<String, Long> e : heartbeats.entrySet()) {
String clientId = e.getKey(); String clientId = e.getKey();
Long hbTs = e.getValue(); Long hbTs = e.getValue();
Object lastDbStr = stringRedisTemplate.opsForHash().get(REDIS_LAST_DB_WRITE_HASH_KEY, clientId); Object lastDbStr = stringRedisTemplate.opsForHash().get(RedisConsts.REDIS_LAST_DB_WRITE_HASH_KEY, clientId);
if (lastDbStr != null) { if (lastDbStr != null) {
try { try {
long lastDb = Long.parseLong(String.valueOf(lastDbStr)); long lastDb = Long.parseLong(String.valueOf(lastDbStr));
if (lastDb >= hbTs) { if (lastDb >= hbTs) {
stringRedisTemplate.opsForHash().delete(REDIS_HEARTBEAT_HASH_KEY, clientId); stringRedisTemplate.opsForHash().delete(RedisConsts.REDIS_HEARTBEAT_HASH_KEY, clientId);
} }
} catch (NumberFormatException ignore) {} } catch (NumberFormatException ignore) {}
} }

View File

@@ -28,6 +28,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import com.bytedesk.core.base.BaseRestServiceWithExport; import com.bytedesk.core.base.BaseRestServiceWithExport;
import com.bytedesk.core.constant.RedisConsts;
import com.bytedesk.core.rbac.auth.AuthService; import com.bytedesk.core.rbac.auth.AuthService;
import com.bytedesk.core.rbac.user.UserEntity; import com.bytedesk.core.rbac.user.UserEntity;
import com.bytedesk.core.uid.UidUtils; import com.bytedesk.core.uid.UidUtils;
@@ -56,10 +57,7 @@ public class ConnectionRestService extends BaseRestServiceWithExport<ConnectionE
private final StringRedisTemplate stringRedisTemplate; private final StringRedisTemplate stringRedisTemplate;
// Redis 缓存心跳Key
private static final String REDIS_HEARTBEAT_HASH_KEY = "core:conn:hb";
// Redis 最近一次数据库写入时间Key
private static final String REDIS_LAST_DB_WRITE_HASH_KEY = "core:conn:hb:lastdb";
// 最小数据库写入间隔(毫秒) // 最小数据库写入间隔(毫秒)
private static final long MIN_INTERVAL_MS = 5000L; private static final long MIN_INTERVAL_MS = 5000L;
@@ -134,7 +132,7 @@ public class ConnectionRestService extends BaseRestServiceWithExport<ConnectionE
private void tryWriteHeartbeatToCache(String clientId, long ts) { private void tryWriteHeartbeatToCache(String clientId, long ts) {
try { try {
if (stringRedisTemplate != null) { if (stringRedisTemplate != null) {
stringRedisTemplate.opsForHash().put(REDIS_HEARTBEAT_HASH_KEY, clientId, String.valueOf(ts)); stringRedisTemplate.opsForHash().put(RedisConsts.REDIS_HEARTBEAT_HASH_KEY, clientId, String.valueOf(ts));
} }
} catch (Exception ignore) {} } catch (Exception ignore) {}
} }
@@ -142,7 +140,7 @@ public class ConnectionRestService extends BaseRestServiceWithExport<ConnectionE
private void cacheLastDbWrite(String clientId, long ts) { private void cacheLastDbWrite(String clientId, long ts) {
try { try {
if (stringRedisTemplate != null) { if (stringRedisTemplate != null) {
stringRedisTemplate.opsForHash().put(REDIS_LAST_DB_WRITE_HASH_KEY, clientId, String.valueOf(ts)); stringRedisTemplate.opsForHash().put(RedisConsts.REDIS_LAST_DB_WRITE_HASH_KEY, clientId, String.valueOf(ts));
} }
} catch (Exception ignore) {} } catch (Exception ignore) {}
} }

View File

@@ -387,66 +387,109 @@ public class WorkgroupThreadRoutingStrategy extends AbstractThreadRoutingStrateg
if (!isInServiceTime) { if (!isInServiceTime) {
log.info("不在服务时间内,路由到离线留言"); log.info("不在服务时间内,路由到离线留言");
// 选择离线留言接待客服 return routeToOfflineMessage(visitorRequest, thread, workgroup);
AgentEntity messageLeaveAgent = workgroup.getMessageLeaveAgent();
if (messageLeaveAgent == null) {
log.error("离线留言接待客服不存在,请配置工作组留言接待客服 - workgroupUid: {}",
workgroup.getUid());
throw new IllegalStateException("Workgroup message leave agent not found");
}
log.debug("使用离线留言接待客服 - agentUid: {}", messageLeaveAgent.getUid());
QueueMemberEntity queueMemberEntity = queueService
.enqueueWorkgroupWithResult(thread, messageLeaveAgent, workgroup, visitorRequest)
.queueMember();
// 直接返回离线留言消息
return getOfflineMessage(visitorRequest, thread, messageLeaveAgent, workgroup, queueMemberEntity);
} }
// 选择客服 return routeToAgentDuringServiceTime(visitorRequest, thread, workgroup);
}
/**
* 在服务时间内分配客服并根据在线/负载状态进行路由
*/
private MessageProtobuf routeToAgentDuringServiceTime(VisitorRequest visitorRequest, ThreadEntity thread,
WorkgroupEntity workgroup) {
log.debug("在服务时间内,开始选择客服"); log.debug("在服务时间内,开始选择客服");
List<AgentEntity> availableAgents = presenceFacadeService.getAvailableAgents(workgroup);
if (availableAgents.isEmpty()) {
log.info("无在线客服可用,降级为离线留言");
return routeToOfflineMessage(visitorRequest, thread, workgroup);
}
long selectStartTime = System.currentTimeMillis(); long selectStartTime = System.currentTimeMillis();
AgentEntity agentEntity = selectAgent(workgroup, thread); AgentEntity agentEntity = resolvePreferredAgent(workgroup, thread, availableAgents);
log.info("客服选择完成 - agentUid: {}, 选择耗时: {}ms", agentEntity.getUid(), System.currentTimeMillis() - selectStartTime); if (agentEntity == null) {
log.warn("未能根据路由策略选出可用客服,降级为离线留言 - workgroupUid: {}", workgroup.getUid());
return routeToOfflineMessage(visitorRequest, thread, workgroup);
}
log.info("客服选择完成 - agentUid: {}, 选择耗时: {}ms", agentEntity.getUid(),
System.currentTimeMillis() - selectStartTime);
// 加入队列 // 加入队列
log.debug("开始将线程加入工作组队列"); log.debug("开始将线程加入工作组队列");
long enqueueStartTime = System.currentTimeMillis(); long enqueueStartTime = System.currentTimeMillis();
QueueService.QueueEnqueueResult enqueueResult = queueService QueueService.QueueEnqueueResult enqueueResult = queueService
.enqueueWorkgroupWithResult(thread, agentEntity, workgroup, visitorRequest); .enqueueWorkgroupWithResult(thread, agentEntity, workgroup, visitorRequest);
QueueMemberEntity queueMemberEntity = enqueueResult.queueMember(); QueueMemberEntity queueMemberEntity = enqueueResult.queueMember();
log.info("工作组队列加入完成 - queueMemberUid: {}, 耗时: {}ms", queueMemberEntity.getUid(), System.currentTimeMillis() - enqueueStartTime); log.info("工作组队列加入完成 - queueMemberUid: {}, 耗时: {}ms", queueMemberEntity.getUid(),
System.currentTimeMillis() - enqueueStartTime);
// 处理强制转人工
if (visitorRequest.getForceAgent()) { if (visitorRequest.getForceAgent()) {
log.debug("处理强制转人工标记"); log.debug("处理强制转人工标记");
handleForceAgentTransfer(visitorRequest, thread, queueMemberEntity); handleForceAgentTransfer(visitorRequest, thread, queueMemberEntity);
} }
// 根据客服状态进行路由 boolean onlineAndAvailable = presenceFacadeService.isAgentOnlineAndAvailable(agentEntity);
log.debug("开始根据客服状态进行最终路由"); if (onlineAndAvailable) {
return routeByAgentStatus(agentEntity, thread, queueMemberEntity, workgroup, visitorRequest); log.debug("客服在线且可接待,检查接待名额");
boolean hasCapacity = queueMemberEntity.getWorkgroupQueue().getChattingCount() < agentEntity.getMaxThreadCount();
if (hasCapacity) {
log.info("客服有接待名额,进入接待流程 - agentUid: {}, agentName {}", agentEntity.getUid(),
agentEntity.getNickname());
return handleAvailableWorkgroup(thread, agentEntity, queueMemberEntity);
}
log.info("客服接待名额已满,进入排队 - agentUid: {}, agentName {}", agentEntity.getUid(),
agentEntity.getNickname());
return handleQueuedWorkgroup(thread, agentEntity, queueMemberEntity);
}
log.info("客服离线或不可接待,降级为离线留言 - agentUid: {}, agentName {}", agentEntity.getUid(),
agentEntity.getNickname());
return getOfflineMessage(visitorRequest, thread, agentEntity, workgroup, queueMemberEntity);
} }
/** /**
* 选择客服 * 选择优先客服:优先使用路由策略结果,其次选择第一个在线可用客服
* TODO: 存在选择逻辑bug当某个客服agent在线却选择了另外不在线的客服agent导致无法正确路由请首先排除离线客服agent
* 请结合在线状态和接待负载进行优化
* 合并 selectAgent 和 routeByAgentStatus 方法
*/ */
private AgentEntity selectAgent(WorkgroupEntity workgroup, ThreadEntity thread) { private AgentEntity resolvePreferredAgent(WorkgroupEntity workgroup, ThreadEntity thread,
AgentEntity agentEntity = workgroupRoutingService.selectAgent(workgroup, thread); List<AgentEntity> candidates) {
if (candidates == null || candidates.isEmpty()) {
return null;
}
log.debug("选择优先客服:优先使用路由策略结果,其次选择第一个在线可用客服");
if (agentEntity == null) { AgentEntity routedAgent = workgroupRoutingService.selectAgent(workgroup, thread);
// 离线留言接待客服 if (routedAgent != null && presenceFacadeService.isAgentOnlineAndAvailable(routedAgent)) {
agentEntity = workgroup.getMessageLeaveAgent(); boolean existsInCandidate = candidates.stream()
if (agentEntity == null) { .anyMatch(agent -> StringUtils.hasText(agent.getUid())
log.error("离线留言接待客服不存在,请配置工作组留言接待客服"); && agent.getUid().equals(routedAgent.getUid()));
throw new IllegalStateException("Workgroup message leave agent not found"); if (existsInCandidate) {
return routedAgent;
} }
} }
log.debug("未能使用路由策略结果,选择第一个在线可用客服");
return agentEntity; return candidates.stream()
.filter(presenceFacadeService::isAgentOnlineAndAvailable)
.findFirst()
.orElse(null);
}
/**
* 不满足服务时间或无在线客服时,统一进入离线留言流程
*/
private MessageProtobuf routeToOfflineMessage(VisitorRequest visitorRequest, ThreadEntity thread,
WorkgroupEntity workgroup) {
AgentEntity messageLeaveAgent = workgroup.getMessageLeaveAgent();
if (messageLeaveAgent == null) {
log.error("离线留言接待客服不存在,请配置工作组留言接待客服 - workgroupUid: {}", workgroup.getUid());
throw new IllegalStateException("Workgroup message leave agent not found");
}
log.debug("使用离线留言接待客服 - agentUid: {}", messageLeaveAgent.getUid());
QueueMemberEntity queueMemberEntity = queueService
.enqueueWorkgroupWithResult(thread, messageLeaveAgent, workgroup, visitorRequest)
.queueMember();
return getOfflineMessage(visitorRequest, thread, messageLeaveAgent, workgroup, queueMemberEntity);
} }
/** /**
@@ -470,29 +513,6 @@ public class WorkgroupThreadRoutingStrategy extends AbstractThreadRoutingStrateg
} }
} }
/**
* 根据客服状态进行路由
*/
private MessageProtobuf routeByAgentStatus(AgentEntity agentEntity, ThreadEntity thread,
QueueMemberEntity queueMemberEntity, WorkgroupEntity workgroup, VisitorRequest visitorRequest) {
if (presenceFacadeService.isAgentOnlineAndAvailable(agentEntity)) {
log.info("客服在线且可接待 - agentUid: {}, agentName {}", agentEntity.getUid(), agentEntity.getNickname());
// 客服在线且可接待
if (queueMemberEntity.getWorkgroupQueue().getChattingCount() < agentEntity.getMaxThreadCount()) {
log.info("客服有可用接待名额 - agentUid: {}, agentName {}", agentEntity.getUid(), agentEntity.getNickname());
// 有可用接待名额
return handleAvailableWorkgroup(thread, agentEntity, queueMemberEntity);
} else {
log.info("客服接待名额已满,进入排队 - agentUid: {}, agentName {}", agentEntity.getUid(), agentEntity.getNickname());
return handleQueuedWorkgroup(thread, agentEntity, queueMemberEntity);
}
} else {
log.info("客服离线或不可接待 - agentUid: {}, agentName {}", agentEntity.getUid(), agentEntity.getNickname());
// 客服离线或不可接待
return getOfflineMessage(visitorRequest, thread, agentEntity, workgroup, queueMemberEntity);
}
}
/** /**
* 处理可用工作组客服 * 处理可用工作组客服