This commit is contained in:
jack ning
2025-11-21 07:33:57 +08:00
parent 39bc949cb3
commit 4e84029c16
4 changed files with 61 additions and 0 deletions

View File

@@ -27,6 +27,7 @@ public class QueueNotificationBuilder {
private final Clock clock;
/** Build a QUEUE_NOTICE event emitted when a visitor newly joins the queue. */
public QueueNotificationPayload buildJoinNotice(QueueMemberEntity queueMember, String fallbackAgentUid) {
return basePayload(queueMember, fallbackAgentUid)
.messageType(QUEUE_NOTICE)
@@ -34,6 +35,7 @@ public class QueueNotificationBuilder {
.build();
}
/** Build a QUEUE_NOTICE event for visitors leaving voluntarily or via backend cleanup. */
public QueueNotificationPayload buildLeaveNotice(QueueMemberEntity queueMember, String fallbackAgentUid) {
return basePayload(queueMember, fallbackAgentUid)
.messageType(QUEUE_NOTICE)
@@ -41,6 +43,7 @@ public class QueueNotificationBuilder {
.build();
}
/** Build a QUEUE_TIMEOUT notification when a visitor waits longer than SLA and gets dropped. */
public QueueNotificationPayload buildTimeoutNotice(QueueMemberEntity queueMember, String fallbackAgentUid) {
return basePayload(queueMember, fallbackAgentUid)
.messageType(QUEUE_TIMEOUT)
@@ -48,6 +51,7 @@ public class QueueNotificationBuilder {
.build();
}
/** Build a QUEUE_ACCEPT message once the queue member gets assigned to an agent. */
public QueueNotificationPayload buildAssignmentNotice(QueueMemberEntity queueMember, String agentUid) {
return basePayload(queueMember, agentUid)
.messageType(QUEUE_ACCEPT)
@@ -57,6 +61,10 @@ public class QueueNotificationBuilder {
.build();
}
/**
* Bundle several queue events into a single QUEUE_UPDATE payload that carries optional
* member snapshots so the client can reconcile local state efficiently.
*/
public QueueNotificationPayload buildBatchUpdate(String agentUid,
List<QueueNotificationPayload> events,
List<QueueNotificationPayload.QueueNotificationSnapshot> snapshots) {
@@ -80,6 +88,7 @@ public class QueueNotificationBuilder {
.build();
}
/** Create a lightweight snapshot entry for the visitor name + queue position. */
public QueueNotificationPayload.QueueNotificationSnapshot buildSnapshot(QueueMemberEntity queueMember) {
if (queueMember == null) {
return null;
@@ -99,6 +108,7 @@ public class QueueNotificationBuilder {
.build();
}
/** Shared builder that pre-populates common queue metadata before tweaking per event. */
private QueueNotificationPayload.QueueNotificationPayloadBuilder basePayload(QueueMemberEntity queueMember,
String fallbackAgentUid) {
ThreadEntity thread = queueMember.getThread();
@@ -117,6 +127,7 @@ public class QueueNotificationBuilder {
.serverTimestamp(clock.millis());
}
/** Try to read the assigned agent uid from the thread JSON blob. */
private String resolveAgentUid(ThreadEntity thread) {
if (thread == null) {
return null;
@@ -129,6 +140,7 @@ public class QueueNotificationBuilder {
return agent != null ? agent.getUid() : null;
}
/** Very rough wait estimation (2 minutes per queued visitor) until SLA stats are available. */
private long estimateWaitMillis(QueueMemberEntity member) {
int queueSize = member.getAgentQueue() != null ? member.getAgentQueue().getQueuingCount() : 0;
// 当前缺少 SLA 统计,先使用固定 2min/人估算,后续可替换为 rolling window 平均

View File

@@ -8,22 +8,46 @@ import lombok.Value;
@Value
@Builder
public class QueueNotificationPayload {
/** High level channel that consumers use to route the message (notice/update/timeout/etc). */
QueueNotificationType messageType;
/** Fine grained delta describing what changed for this queue member. */
QueueNotificationDelta delta;
/** Queue member that triggered the update (null only for legacy fallbacks). */
String queueMemberUid;
/** Associated conversation thread uid so clients can correlate with chat sessions. */
String threadUid;
/** Agent currently responsible for the visitor, falls back to provided agent when unknown. */
String agentUid;
/** Member position in the agent queue; null when no longer waiting (e.g. assigned). */
Integer position;
/** Current queue size exposed for realtime dashboards, may be null when queue missing. */
Integer queueSize;
/** Estimated milliseconds until assignment using the same heuristic shown in Builder. */
Long estimatedWaitMs;
/** Populated only for QUEUE_UPDATE payloads so clients can refresh multiple members at once. */
List<QueueNotificationSnapshot> snapshot;
/** Server side epoch millis when the payload was produced. */
long serverTimestamp;
@Value
@Builder
public static class QueueNotificationSnapshot {
/** Queue member represented by this snapshot entry. */
String queueMemberUid;
/** Human friendly visitor name rendered on the queue list. */
String displayName;
/** Position for this member at the time the snapshot was generated. */
Integer position;
}
}

View File

@@ -51,6 +51,7 @@ public class QueueNotificationService {
return thread;
});
/** Enqueue a QUEUE_NOTICE event when a visitor joins the agent's waiting list. */
public void publishQueueJoinNotice(AgentEntity agent, QueueMemberEntity queueMemberEntity) {
if (agent == null || queueMemberEntity == null) {
return;
@@ -67,6 +68,7 @@ public class QueueNotificationService {
}
}
/** Send the assignment notification immediately once the visitor is matched to the agent. */
public void publishQueueAssignmentNotice(AgentEntity agent, QueueMemberEntity queueMemberEntity) {
if (agent == null || queueMemberEntity == null) {
return;
@@ -82,14 +84,20 @@ public class QueueNotificationService {
}
}
/**
* Publish a QUEUE_NOTICE/LEFT delta for members removed from the queue. Uses agent inferred from
* the member or queue.
*/
public void publishQueueLeaveNotice(QueueMemberEntity queueMemberEntity) {
publishMemberDelta(queueMemberEntity, queueNotificationBuilder::buildLeaveNotice, "leave");
}
/** Publish a QUEUE_TIMEOUT delta when a member exceeded SLA wait time. */
public void publishQueueTimeoutNotice(QueueMemberEntity queueMemberEntity) {
publishMemberDelta(queueMemberEntity, queueNotificationBuilder::buildTimeoutNotice, "timeout");
}
/** Allow external callers to provide a fully built payload and deliver it to the agent topic. */
public void publishAgentNotification(String agentUid, QueueNotificationRequest request) {
Assert.hasText(agentUid, "agentUid must not be blank");
AgentEntity agent = agentRestService.findByUid(agentUid)
@@ -99,17 +107,20 @@ public class QueueNotificationService {
dispatch(agentQueueThread, payload);
}
/** Resolve the backing agent queue thread, ensuring it exists before dispatching messages. */
private ThreadEntity resolveAgentQueueThread(AgentEntity agent) {
String threadUid = queueMemberRestService().ensureAgentQueueThreadUid(agent);
return threadRestService.findByUid(threadUid)
.orElseThrow(() -> new NotFoundException("Agent queue thread not found: " + threadUid));
}
/** Build a queue-notice protobuf for the thread and hand it off to the messaging layer. */
private void dispatch(ThreadEntity thread, QueueNotificationPayload payload) {
MessageProtobuf message = ThreadMessageUtil.getAgentQueueNoticeMessage(payload, thread);
messageSendService.sendProtobufMessage(message);
}
/** Buffer single-member deltas so we can batch them later, unless they must be sent immediately. */
private void bufferAndSchedule(AgentEntity agent, QueueNotificationPayload payload,
QueueNotificationPayload.QueueNotificationSnapshot snapshot) {
if (agent == null || payload == null) {
@@ -121,6 +132,7 @@ public class QueueNotificationService {
scheduleFlush(agentUid, agent.resolveQueueNoticeBatchWindowMs());
}
/** Schedule the batch flush task based on the agent-configured batching window. */
private void scheduleFlush(String agentUid, int batchWindowMs) {
PendingBatch batch = pendingBatches.get(agentUid);
if (batch == null) {
@@ -148,6 +160,7 @@ public class QueueNotificationService {
}
}
/** Drain buffered events for an agent and deliver either a single event or bulk update. */
private void flushBatch(String agentUid) {
PendingBatch batch = pendingBatches.remove(agentUid);
if (batch == null) {
@@ -174,11 +187,13 @@ public class QueueNotificationService {
}
}
/** Ensure we do not leak pending tasks when the bean is destroyed. */
@PreDestroy
public void shutdownBatchingExecutor() {
batchingExecutor.shutdownNow();
}
/** Shared helper to emit queue deltas (leave/timeout) with snapshot batching. */
private void publishMemberDelta(QueueMemberEntity member,
BiFunction<QueueMemberEntity, String, QueueNotificationPayload> payloadBuilder,
String context) {
@@ -202,6 +217,7 @@ public class QueueNotificationService {
}
}
/** Find an agent uid associated with the member from thread, topic, or queue metadata. */
private String resolveAgentUidFromMember(QueueMemberEntity member) {
ThreadEntity thread = member.getThread();
if (thread != null) {
@@ -223,6 +239,7 @@ public class QueueNotificationService {
return null;
}
/** Extract the trailing segment from a topic string which encodes agent uid. */
private String extractUidFromQueueTopic(String topic) {
if (!StringUtils.hasText(topic)) {
return null;
@@ -231,6 +248,7 @@ public class QueueNotificationService {
return idx >= 0 ? topic.substring(idx + 1) : topic;
}
/** Lazily fetch the queue member rest service to avoid circular dependencies. */
private com.bytedesk.service.queue_member.QueueMemberRestService queueMemberRestService() {
return queueMemberRestServiceProvider.getObject();
}
@@ -240,10 +258,12 @@ public class QueueNotificationService {
private final Map<String, QueueNotificationPayload.QueueNotificationSnapshot> snapshotByMember = new LinkedHashMap<>();
private ScheduledFuture<?> scheduledTask;
/** Remember the agent identity for potential future logging hooks. */
private PendingBatch(String agentUid) {
// agentUid reserved for logging/debug hooks if needed later
}
/** Add a queue event and latest snapshot override for the corresponding member. */
private synchronized void addEvent(QueueNotificationPayload payload,
QueueNotificationPayload.QueueNotificationSnapshot snapshot) {
events.add(payload);
@@ -252,10 +272,12 @@ public class QueueNotificationService {
}
}
/** Return snapshot list preserving insertion order for consistent UI rendering. */
private synchronized List<QueueNotificationPayload.QueueNotificationSnapshot> snapshotValues() {
return new ArrayList<>(snapshotByMember.values());
}
/** Copy buffered events then clear internal state for the next batch cycle. */
private synchronized List<QueueNotificationPayload> drainEvents() {
List<QueueNotificationPayload> copy = new ArrayList<>(events);
events.clear();
@@ -264,10 +286,12 @@ public class QueueNotificationService {
return copy;
}
/** Indicates whether a flush runnable is already scheduled. */
private synchronized boolean hasScheduledTask() {
return scheduledTask != null && !scheduledTask.isDone();
}
/** Track scheduled flush so we do not enqueue duplicate timers. */
private synchronized void setScheduledTask(ScheduledFuture<?> future) {
this.scheduledTask = future;
}

View File

@@ -46,6 +46,7 @@ public abstract class AbstractThreadRoutingStrategy {
// ==================== 常量定义 ====================
// TODO: 后台可配置
/** 每人预估等待时间(分钟) */
protected static final int ESTIMATED_WAIT_TIME_PER_PERSON = 2;