This commit is contained in:
jack ning
2025-11-20 13:35:19 +08:00
parent 7bbdc65956
commit 61e05082da
7 changed files with 223 additions and 19 deletions

View File

@@ -62,6 +62,7 @@ Saving debug log to /var/log/letsencrypt/letsencrypt.log
Please enter the domain name(s) you would like on your certificate (comma and/or
<!-- 注意添加多个域名支持2级、3级通配符域名 -->
space separated) (Enter 'c' to cancel): weiyuai.cn,*.weiyuai.cn
bytedesk.com,*.bytedesk.com
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
<!-- 添加域名解析TXT记录 -->

View File

@@ -18,10 +18,11 @@ import com.bytedesk.core.base.BaseEntity;
import com.bytedesk.core.constant.AvatarConsts;
import com.bytedesk.core.constant.BytedeskConsts;
import com.bytedesk.core.constant.I18Consts;
import com.bytedesk.core.member.MemberEntity;
import com.bytedesk.core.rbac.user.UserProtobuf;
import com.bytedesk.core.rbac.user.UserTypeEnum;
import com.bytedesk.service.agent_settings.AgentSettingsEntity;
import com.bytedesk.core.member.MemberEntity;
import com.bytedesk.service.queue_settings.QueueSettingsEntity;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
@@ -164,5 +165,15 @@ public class AgentEntity extends BaseEntity {
return I18Consts.I18N_AGENT_TIMEOUT_TIP;
}
/**
* Resolve the per-agent queue notice batch window with a sane default when the agent lacks explicit queue settings.
*/
public int resolveQueueNoticeBatchWindowMs() {
if (this.settings != null) {
return this.settings.resolveQueueNoticeBatchWindowMs();
}
return QueueSettingsEntity.DEFAULT_QUEUE_NOTICE_BATCH_WINDOW_MS;
}
}

View File

@@ -397,6 +397,10 @@ public class AgentRestService extends BaseRestService<AgentEntity, AgentRequest,
if (agent.getMember() != null) {
agent.getMember().getUser(); // 触发加载
}
if (agent.getSettings() != null) {
agent.getSettings().getQueueSettings();
agent.getSettings().getDraftQueueSettings();
}
});
return agentOptional;
}

View File

@@ -1,14 +1,17 @@
package com.bytedesk.service.queue.notification;
import static com.bytedesk.service.queue.notification.QueueNotificationDelta.ASSIGNED;
import static com.bytedesk.service.queue.notification.QueueNotificationDelta.BULK_CLEANUP;
import static com.bytedesk.service.queue.notification.QueueNotificationDelta.JOINED;
import static com.bytedesk.service.queue.notification.QueueNotificationDelta.LEFT;
import static com.bytedesk.service.queue.notification.QueueNotificationDelta.TIMEOUT;
import static com.bytedesk.service.queue.notification.QueueNotificationType.QUEUE_ACCEPT;
import static com.bytedesk.service.queue.notification.QueueNotificationType.QUEUE_NOTICE;
import static com.bytedesk.service.queue.notification.QueueNotificationType.QUEUE_TIMEOUT;
import static com.bytedesk.service.queue.notification.QueueNotificationType.QUEUE_UPDATE;
import java.time.Clock;
import java.util.List;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -54,6 +57,48 @@ public class QueueNotificationBuilder {
.build();
}
public QueueNotificationPayload buildBatchUpdate(String agentUid,
List<QueueNotificationPayload> events,
List<QueueNotificationPayload.QueueNotificationSnapshot> snapshots) {
if (events == null || events.isEmpty()) {
throw new IllegalArgumentException("events must not be empty");
}
QueueNotificationPayload anchor = events.get(events.size() - 1);
List<QueueNotificationPayload.QueueNotificationSnapshot> safeSnapshots =
(snapshots == null || snapshots.isEmpty()) ? null : snapshots;
return QueueNotificationPayload.builder()
.messageType(QUEUE_UPDATE)
.delta(BULK_CLEANUP)
.queueMemberUid(anchor.getQueueMemberUid())
.threadUid(anchor.getThreadUid())
.agentUid(StringUtils.hasText(anchor.getAgentUid()) ? anchor.getAgentUid() : agentUid)
.position(anchor.getPosition())
.queueSize(anchor.getQueueSize())
.estimatedWaitMs(anchor.getEstimatedWaitMs())
.snapshot(safeSnapshots)
.serverTimestamp(clock.millis())
.build();
}
public QueueNotificationPayload.QueueNotificationSnapshot buildSnapshot(QueueMemberEntity queueMember) {
if (queueMember == null) {
return null;
}
ThreadEntity thread = queueMember.getThread();
String displayName = null;
if (thread != null && StringUtils.hasText(thread.getUser())) {
UserProtobuf visitor = UserProtobuf.fromJson(thread.getUser());
if (visitor != null) {
displayName = visitor.getNickname();
}
}
return QueueNotificationPayload.QueueNotificationSnapshot.builder()
.queueMemberUid(queueMember.getUid())
.displayName(displayName)
.position(queueMember.getQueueNumber())
.build();
}
private QueueNotificationPayload.QueueNotificationPayloadBuilder basePayload(QueueMemberEntity queueMember,
String fallbackAgentUid) {
ThreadEntity thread = queueMember.getThread();

View File

@@ -1,10 +1,21 @@
package com.bytedesk.service.queue.notification;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import com.bytedesk.core.exception.NotFoundException;
import com.bytedesk.core.message.MessageProtobuf;
@@ -15,10 +26,10 @@ import com.bytedesk.core.topic.TopicUtils;
import com.bytedesk.service.agent.AgentEntity;
import com.bytedesk.service.agent.AgentRestService;
import com.bytedesk.service.queue_member.QueueMemberEntity;
import com.bytedesk.service.queue_member.QueueMemberRestService;
import com.bytedesk.service.utils.ThreadMessageUtil;
import com.bytedesk.core.rbac.user.UserProtobuf;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -28,20 +39,28 @@ import lombok.extern.slf4j.Slf4j;
public class QueueNotificationService {
private final QueueNotificationBuilder queueNotificationBuilder;
private final QueueMemberRestService queueMemberRestService;
private final ObjectProvider<com.bytedesk.service.queue_member.QueueMemberRestService> queueMemberRestServiceProvider;
private final ThreadRestService threadRestService;
private final AgentRestService agentRestService;
private final IMessageSendService messageSendService;
private final Map<String, PendingBatch> pendingBatches = new ConcurrentHashMap<>();
private final ScheduledExecutorService batchingExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "queue-notice-batcher");
thread.setDaemon(true);
return thread;
});
public void publishQueueJoinNotice(AgentEntity agent, QueueMemberEntity queueMemberEntity) {
if (agent == null || queueMemberEntity == null) {
return;
}
try {
ThreadEntity agentQueueThread = resolveAgentQueueThread(agent);
QueueNotificationPayload payload = queueNotificationBuilder
.buildJoinNotice(queueMemberEntity, agent.getUid());
dispatch(agentQueueThread, payload);
QueueNotificationPayload.QueueNotificationSnapshot snapshot = queueNotificationBuilder
.buildSnapshot(queueMemberEntity);
bufferAndSchedule(agent, payload, snapshot);
} catch (Exception ex) {
log.warn("Failed to publish queue notice for agent {} member {}: {}", agent.getUid(),
queueMemberEntity.getUid(), ex.getMessage(), ex);
@@ -81,7 +100,7 @@ public class QueueNotificationService {
}
private ThreadEntity resolveAgentQueueThread(AgentEntity agent) {
String threadUid = queueMemberRestService.ensureAgentQueueThreadUid(agent);
String threadUid = queueMemberRestService().ensureAgentQueueThreadUid(agent);
return threadRestService.findByUid(threadUid)
.orElseThrow(() -> new NotFoundException("Agent queue thread not found: " + threadUid));
}
@@ -91,6 +110,75 @@ public class QueueNotificationService {
messageSendService.sendProtobufMessage(message);
}
private void bufferAndSchedule(AgentEntity agent, QueueNotificationPayload payload,
QueueNotificationPayload.QueueNotificationSnapshot snapshot) {
if (agent == null || payload == null) {
return;
}
String agentUid = agent.getUid();
PendingBatch batch = pendingBatches.computeIfAbsent(agentUid, PendingBatch::new);
batch.addEvent(payload, snapshot);
scheduleFlush(agentUid, agent.resolveQueueNoticeBatchWindowMs());
}
private void scheduleFlush(String agentUid, int batchWindowMs) {
PendingBatch batch = pendingBatches.get(agentUid);
if (batch == null) {
return;
}
if (batchWindowMs <= 0) {
try {
batchingExecutor.execute(() -> flushBatch(agentUid));
} catch (RejectedExecutionException ex) {
log.warn("Skip scheduling immediate queue batch for agent {}: {}", agentUid, ex.getMessage());
}
return;
}
synchronized (batch) {
if (batch.hasScheduledTask()) {
return;
}
try {
ScheduledFuture<?> future = batchingExecutor.schedule(() -> flushBatch(agentUid), batchWindowMs,
TimeUnit.MILLISECONDS);
batch.setScheduledTask(future);
} catch (RejectedExecutionException ex) {
log.warn("Skip scheduling queue batch for agent {}: {}", agentUid, ex.getMessage());
}
}
}
private void flushBatch(String agentUid) {
PendingBatch batch = pendingBatches.remove(agentUid);
if (batch == null) {
return;
}
List<QueueNotificationPayload.QueueNotificationSnapshot> snapshots = batch.snapshotValues();
List<QueueNotificationPayload> events = batch.drainEvents();
if (events.isEmpty()) {
return;
}
try {
AgentEntity agent = agentRestService.findByUid(agentUid)
.orElseThrow(() -> new NotFoundException("Agent " + agentUid + " not found"));
ThreadEntity agentQueueThread = resolveAgentQueueThread(agent);
QueueNotificationPayload payloadToSend;
if (events.size() == 1) {
payloadToSend = events.get(0);
} else {
payloadToSend = queueNotificationBuilder.buildBatchUpdate(agentUid, events, snapshots);
}
dispatch(agentQueueThread, payloadToSend);
} catch (Exception ex) {
log.warn("Failed to flush queue notice batch for agent {}: {}", agentUid, ex.getMessage(), ex);
}
}
@PreDestroy
public void shutdownBatchingExecutor() {
batchingExecutor.shutdownNow();
}
private void publishMemberDelta(QueueMemberEntity member,
BiFunction<QueueMemberEntity, String, QueueNotificationPayload> payloadBuilder,
String context) {
@@ -105,9 +193,9 @@ public class QueueNotificationService {
try {
AgentEntity agent = agentRestService.findByUid(agentUid)
.orElseThrow(() -> new NotFoundException("Agent " + agentUid + " not found"));
ThreadEntity agentQueueThread = resolveAgentQueueThread(agent);
QueueNotificationPayload payload = payloadBuilder.apply(member, agentUid);
dispatch(agentQueueThread, payload);
QueueNotificationPayload.QueueNotificationSnapshot snapshot = queueNotificationBuilder.buildSnapshot(member);
bufferAndSchedule(agent, payload, snapshot);
} catch (Exception ex) {
log.warn("Failed to publish queue {} notice for member {} (agent {}): {}", context,
member.getUid(), agentUid, ex.getMessage(), ex);
@@ -142,4 +230,46 @@ public class QueueNotificationService {
int idx = topic.lastIndexOf('/');
return idx >= 0 ? topic.substring(idx + 1) : topic;
}
private com.bytedesk.service.queue_member.QueueMemberRestService queueMemberRestService() {
return queueMemberRestServiceProvider.getObject();
}
private static final class PendingBatch {
private final List<QueueNotificationPayload> events = new ArrayList<>();
private final Map<String, QueueNotificationPayload.QueueNotificationSnapshot> snapshotByMember = new LinkedHashMap<>();
private ScheduledFuture<?> scheduledTask;
private PendingBatch(String agentUid) {
// agentUid reserved for logging/debug hooks if needed later
}
private synchronized void addEvent(QueueNotificationPayload payload,
QueueNotificationPayload.QueueNotificationSnapshot snapshot) {
events.add(payload);
if (snapshot != null && StringUtils.hasText(snapshot.getQueueMemberUid())) {
snapshotByMember.put(snapshot.getQueueMemberUid(), snapshot);
}
}
private synchronized List<QueueNotificationPayload.QueueNotificationSnapshot> snapshotValues() {
return new ArrayList<>(snapshotByMember.values());
}
private synchronized List<QueueNotificationPayload> drainEvents() {
List<QueueNotificationPayload> copy = new ArrayList<>(events);
events.clear();
snapshotByMember.clear();
scheduledTask = null;
return copy;
}
private synchronized boolean hasScheduledTask() {
return scheduledTask != null && !scheduledTask.isDone();
}
private synchronized void setScheduledTask(ScheduledFuture<?> future) {
this.scheduledTask = future;
}
}
}

View File

@@ -41,6 +41,7 @@ import com.bytedesk.service.queue_member.QueueMemberResponse;
import com.bytedesk.service.queue_member.QueueMemberRestService;
import com.bytedesk.service.queue_member.QueueMemberStatusEnum;
import com.bytedesk.service.utils.ServiceConvertUtils;
import com.bytedesk.service.queue.notification.QueueNotificationService;
@ExtendWith(MockitoExtension.class)
class QueueMemberRestServiceTest {
@@ -57,6 +58,9 @@ class QueueMemberRestServiceTest {
@Mock
private QueueAuditLogger queueAuditLogger;
@Mock
private QueueNotificationService queueNotificationService;
private ModelMapper modelMapper;
private QueueMemberRestService queueMemberRestService;
@@ -69,7 +73,8 @@ class QueueMemberRestServiceTest {
modelMapper,
uidUtils,
threadRestService,
queueAuditLogger);
queueAuditLogger,
queueNotificationService);
}
@Test

View File

@@ -36,6 +36,7 @@ import com.bytedesk.service.queue_member.QueueMemberRestService;
import com.bytedesk.service.queue_member.QueueMemberStatusEnum;
import com.bytedesk.service.queue_member.mq.QueueMemberMessageService;
import com.bytedesk.core.uid.UidUtils;
import com.bytedesk.service.queue.notification.QueueNotificationService;
@ExtendWith(MockitoExtension.class)
class QueueServiceTest {
@@ -55,11 +56,14 @@ class QueueServiceTest {
@Mock
private BytedeskEventPublisher bytedeskEventPublisher;
@Mock
private QueueRepository queueRepository;
@Mock
private QueueRepository queueRepository;
@Mock
private UidUtils uidUtils;
@Mock
private UidUtils uidUtils;
@Mock
private QueueNotificationService queueNotificationService;
private QueueService queueService;
@@ -70,9 +74,10 @@ class QueueServiceTest {
agentRestService,
threadRestService,
queueMemberMessageService,
bytedeskEventPublisher,
queueRepository,
uidUtils);
bytedeskEventPublisher,
queueRepository,
uidUtils,
queueNotificationService);
}
@Test
@@ -108,14 +113,17 @@ class QueueServiceTest {
member.setOrgUid("org-1");
when(agentRestService.findByUid("agent-1")).thenReturn(Optional.of(agent));
when(queueRepository.findFirstByTopicAndDayAndDeletedFalseOrderByCreatedAtDesc(eq("org/queue/agent-1"), anyString()))
when(queueRepository.findFirstByTopicAndDayAndDeletedFalseOrderByCreatedAtDesc(eq("org/queue/agent-1"),
anyString()))
.thenReturn(Optional.of(agentQueue));
when(queueMemberRestService.findEarliestAgentQueueMemberForUpdate("queue-agent-1"))
.thenReturn(Optional.of(member));
when(threadRestService.save(any(ThreadEntity.class))).thenAnswer(invocation -> invocation.getArgument(0));
when(queueMemberRestService.save(any(QueueMemberEntity.class))).thenAnswer(invocation -> invocation.getArgument(0));
when(queueMemberRestService.save(any(QueueMemberEntity.class)))
.thenAnswer(invocation -> invocation.getArgument(0));
Optional<QueueService.QueueAssignmentResult> resultOptional = queueService.assignNextAgentQueueMember("agent-1");
Optional<QueueService.QueueAssignmentResult> resultOptional = queueService
.assignNextAgentQueueMember("agent-1");
assertThat(resultOptional).isPresent();
QueueService.QueueAssignmentResult result = resultOptional.get();