This commit is contained in:
jack ning
2025-11-20 16:11:29 +08:00
parent 34abcf2e30
commit 2bdff0b40c
3 changed files with 82 additions and 526 deletions

View File

@@ -248,7 +248,7 @@ public class QueueService {
QueueEntity primaryQueue;
String nickname;
switch (queueType) {
case ROBOT:
case AGENT:
@@ -264,32 +264,42 @@ public class QueueService {
default:
throw new IllegalArgumentException("Unsupported queue type: " + queueType);
}
validateQueue(primaryQueue, "Queue is full or not active");
return queueMemberRestService.enqueue(threadEntity, primaryQueue, queueType, member -> {
member.setJoinedAt(BdDateUtils.now());
switch (queueType) {
case ROBOT:
case WORKFLOW:
case UNIFIED:
member.setRobotQueue(primaryQueue);
break;
case AGENT:
member.setAgentQueue(primaryQueue);
break;
case WORKGROUP:
member.setWorkgroupQueue(primaryQueue);
QueueEntity agentOrRobotQueue = getAgentOrRobotQueue(agent, threadEntity.getOrgUid());
validateQueue(agentOrRobotQueue, "Agent queue is full or not active");
if (agent.getType().equals(ThreadTypeEnum.AGENT.name())) {
member.setAgentQueue(agentOrRobotQueue);
} else {
member.setRobotQueue(agentOrRobotQueue);
}
break;
}
});
// 创建队列成员
var memberBuilder = QueueMemberEntity.builder()
.uid(uidUtils.getUid())
.thread(threadEntity)
.queueNumber(primaryQueue.getNextNumber())
.joinedAt(BdDateUtils.now())
.orgUid(threadEntity.getOrgUid());
// 根据队列类型设置相应的队列
switch (queueType) {
case ROBOT:
case WORKFLOW:
case UNIFIED:
memberBuilder.robotQueue(primaryQueue);
break;
case AGENT:
memberBuilder.agentQueue(primaryQueue);
break;
case WORKGROUP:
memberBuilder.workgroupQueue(primaryQueue);
// 同时设置 agent/robot 队列
QueueEntity agentOrRobotQueue = getAgentOrRobotQueue(agent, threadEntity.getOrgUid());
validateQueue(agentOrRobotQueue, "Agent queue is full or not active");
if (agent.getType().equals(ThreadTypeEnum.AGENT.name())) {
memberBuilder.agentQueue(agentOrRobotQueue);
} else {
memberBuilder.robotQueue(agentOrRobotQueue);
}
break;
}
return saveQueueMember(memberBuilder.build());
}
/**

View File

@@ -13,14 +13,11 @@
*/
package com.bytedesk.service.queue_member;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import org.modelmapper.ModelMapper;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
@@ -48,15 +45,9 @@ import com.bytedesk.core.topic.TopicUtils;
import com.bytedesk.core.uid.UidUtils;
import com.bytedesk.core.utils.BdDateUtils;
import com.bytedesk.service.agent.AgentEntity;
// import com.bytedesk.service.queue.QueueAuditLogger;
import com.bytedesk.service.queue.QueueEntity;
import com.bytedesk.service.queue.QueueTypeEnum;
import com.bytedesk.service.queue.notification.QueueNotificationService;
import com.bytedesk.service.utils.ServiceConvertUtils;
import jakarta.persistence.EntityManager;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -68,56 +59,20 @@ public class QueueMemberRestService extends BaseRestServiceWithExport<QueueMembe
private final QueueMemberRepository queueMemberRepository;
private final ModelMapper modelMapper;
private final UidUtils uidUtils;
private final EntityManager entityManager;
// private final EntityManager entityManager;
private static final int IDLE_QUEUE_TIMEOUT_MINUTES = 5; // 超过5分钟未发首条消息视为过期
private static final String AGENT_QUEUE_THREAD_CACHE = "agent_queue_thread_uid";
private final ThreadRestService threadRestService;
private static final List<String> ACTIVE_STATUSES = Collections.singletonList(QueueMemberStatusEnum.QUEUING.name());
private static final int MAX_ENQUEUE_RETRIES = 20;
private static final long COLLISION_BACKOFF_MILLIS = 25L;
private static final String QUEUE_MEMBER_TABLE_NAME = resolveQueueMemberTableName();
// private static final int MAX_ENQUEUE_RETRIES = 20;
// private static final long COLLISION_BACKOFF_MILLIS = 25L;
// private static final String QUEUE_MEMBER_TABLE_NAME = resolveQueueMemberTableName();
// private static final String QUEUE_NUMBER_UNIQUE_CONSTRAINT = "uk7aviqofcxw7ae3fped747qrl7";
// private final QueueAuditLogger queueAuditLogger;
@Lazy
private final QueueNotificationService queueNotificationService;
/**
* 访客主动退出排队:标记离开时间并软删除队列成员记录
*/
public void visitorExitQueue(String threadUid) {
Optional<QueueMemberEntity> optional = findByThreadUid(threadUid);
if (!optional.isPresent()) {
return;
}
QueueMemberEntity entity = optional.get();
entity.setVisitorLeavedAt(BdDateUtils.now());
entity.setDeleted(true); // 不要删除仅修改status状态
entity.setStatus(QueueMemberStatusEnum.CANCELLED.name());
QueueMemberEntity saved = save(entity);
queueNotificationService.publishQueueLeaveNotice(saved);
}
/**
* 扫描超时(未发送首条消息)的排队成员并标记删除
*/
public int cleanupIdleQueueMembers() {
java.time.ZonedDateTime threshold = BdDateUtils.now().minusMinutes(IDLE_QUEUE_TIMEOUT_MINUTES);
java.util.List<QueueMemberEntity> idleList = queueMemberRepository.findIdleBefore(threshold);
int removed = 0;
for (QueueMemberEntity qm : idleList) {
// 只处理仍处于排队状态的线程
if (qm.getThread() != null && qm.getThread().isQueuing()
&& QueueMemberStatusEnum.QUEUING.name().equals(qm.getStatus())) {
// qm.setDeleted(true); // 不要删除仅修改status状态
qm.setVisitorLeavedAt(BdDateUtils.now());
qm.setStatus(QueueMemberStatusEnum.TIMEOUT.name());
QueueMemberEntity saved = save(qm);
queueNotificationService.publishQueueTimeoutNotice(saved);
removed++;
}
}
return removed;
}
public Optional<QueueMemberEntity> findActiveByThreadUid(String threadUid) {
return queueMemberRepository.findFirstByThreadUidAndDeletedFalseAndStatusIn(threadUid, ACTIVE_STATUSES);
}
@@ -167,181 +122,6 @@ public class QueueMemberRestService extends BaseRestServiceWithExport<QueueMembe
return cleanupIdleQueueMembers();
}
/**
* Centralized FIFO enqueue to guarantee tail placement and duplicate guard.
*/
@Transactional
public QueueMemberEntity enqueue(ThreadEntity threadEntity, QueueEntity targetQueue, QueueTypeEnum queueType, java.util.function.Consumer<QueueMemberEntity> enrich) {
cleanupBeforeEnqueue();
Optional<QueueMemberEntity> existing = findActiveByThreadUidForUpdate(threadEntity.getUid());
if (existing.isPresent()) {
return existing.get();
}
int attempt = 0;
Integer nextNumber = null;
while (attempt < MAX_ENQUEUE_RETRIES) {
QueueMemberEntity member = QueueMemberEntity.builder()
.uid(uidUtils.getUid())
.thread(threadEntity)
.orgUid(threadEntity.getOrgUid())
.build();
enrich.accept(member);
if (nextNumber == null) {
nextNumber = resolveInitialQueueNumber(member, targetQueue, queueType);
}
member.setQueueNumber(nextNumber);
try {
QueueMemberEntity savedMember = save(member);
// queueAuditLogger.logQueueJoin(savedMember, threadEntity, targetQueue, queueType);
return savedMember;
} catch (DataIntegrityViolationException ex) {
detachQueueMember(member);
clearPersistenceContext();
if (!isQueueNumberCollision(ex)) {
throw ex;
}
attempt++;
if (attempt >= MAX_ENQUEUE_RETRIES) {
log.error("Queue number collision threshold exceeded for queue {} after {} attempts", targetQueue != null ? targetQueue.getUid() : "unknown", attempt);
throw ex;
}
log.warn("Queue number collision detected for queue {} (attempt {}/{}), backing off", targetQueue != null ? targetQueue.getUid() : "unknown", attempt, MAX_ENQUEUE_RETRIES);
nextNumber++;
backoffAfterCollision(attempt);
}
}
throw new IllegalStateException("Unable to enqueue visitor after retries");
}
private int resolveInitialQueueNumber(QueueMemberEntity member, QueueEntity targetQueue, QueueTypeEnum queueType) {
int seed = nextQueueNumber(targetQueue, queueType);
if (queueType == QueueTypeEnum.WORKGROUP) {
seed = Math.max(seed, nextQueueNumberForAssociation(member.getAgentQueue(), QueueTypeEnum.AGENT));
seed = Math.max(seed, nextQueueNumberForAssociation(member.getRobotQueue(), QueueTypeEnum.ROBOT));
}
return seed;
}
private int nextQueueNumberForAssociation(QueueEntity queue, QueueTypeEnum associationType) {
if (queue == null) {
return 1;
}
return nextQueueNumber(queue, associationType);
}
private void detachQueueMember(QueueMemberEntity member) {
if (member == null) {
return;
}
try {
if (entityManager.contains(member)) {
entityManager.detach(member);
}
} catch (Exception e) {
log.debug("Failed to detach queue member {} after persistence error: {}", member.getUid(), e.getMessage());
}
}
private void clearPersistenceContext() {
try {
entityManager.clear();
} catch (Exception e) {
log.debug("Failed to clear persistence context after persistence error: {}", e.getMessage());
}
}
private boolean isQueueNumberCollision(DataIntegrityViolationException exception) {
if (exception == null) {
return false;
}
org.hibernate.exception.ConstraintViolationException constraintViolation =
findCause(exception, org.hibernate.exception.ConstraintViolationException.class);
if (constraintViolation != null && looksLikeQueueNumberConstraint(constraintViolation.getConstraintName())) {
return true;
}
SQLException sqlException = findCause(exception, SQLException.class);
if (isDuplicateKeyViolation(sqlException) && looksLikeQueueMemberMessage(sqlException.getMessage())) {
return true;
}
String fallbackMessage = exception.getMostSpecificCause() != null
? exception.getMostSpecificCause().getMessage()
: exception.getMessage();
return looksLikeQueueMemberMessage(fallbackMessage);
}
private static String resolveQueueMemberTableName() {
Table table = QueueMemberEntity.class.getAnnotation(Table.class);
String tableName = table != null ? table.name() : "bytedesk_service_queue_member";
return tableName.toLowerCase(Locale.ROOT);
}
private boolean looksLikeQueueNumberConstraint(String identifier) {
if (!StringUtils.hasText(identifier)) {
return false;
}
String lower = identifier.toLowerCase(Locale.ROOT);
return lower.contains("queue_number") || lower.contains(QUEUE_MEMBER_TABLE_NAME);
}
private boolean looksLikeQueueMemberMessage(String message) {
if (!StringUtils.hasText(message)) {
return false;
}
String lower = message.toLowerCase(Locale.ROOT);
return lower.contains("queue_number") || lower.contains(QUEUE_MEMBER_TABLE_NAME);
}
private boolean isDuplicateKeyViolation(SQLException sqlException) {
if (sqlException == null) {
return false;
}
String sqlState = sqlException.getSQLState();
if ("23000".equals(sqlState) || "23505".equals(sqlState)) {
return true;
}
return false;
}
private <T extends Throwable> T findCause(Throwable throwable, Class<T> type) {
Throwable current = throwable;
while (current != null) {
if (type.isInstance(current)) {
return type.cast(current);
}
current = current.getCause();
}
return null;
}
private void backoffAfterCollision(int attempt) {
long delay = Math.min(COLLISION_BACKOFF_MILLIS * attempt, COLLISION_BACKOFF_MILLIS * 4);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
@Transactional
public int nextQueueNumber(QueueEntity queue, QueueTypeEnum queueType) {
if (queue == null) {
return 1;
}
String queueTypeKey = switch (queueType) {
case WORKGROUP -> "WORKGROUP";
case AGENT -> "AGENT";
default -> "ROBOT";
};
Integer currentMax = queueMemberRepository.findMaxQueueNumberForQueue(queue, queueTypeKey);
return (currentMax == null ? 0 : currentMax) + 1;
}
@Cacheable(value = "queue_member", key = "#uid", unless = "#result == null")
@Override
public Optional<QueueMemberEntity> findByUid(String uid) {
@@ -358,6 +138,7 @@ public class QueueMemberRestService extends BaseRestServiceWithExport<QueueMembe
protected QueueMemberEntity doSave(QueueMemberEntity entity) {
return queueMemberRepository.save(entity);
}
@Override
public QueueMemberEntity handleOptimisticLockingFailureException(ObjectOptimisticLockingFailureException e,
@@ -546,6 +327,45 @@ public class QueueMemberRestService extends BaseRestServiceWithExport<QueueMembe
return response != null ? response.getUid() : null;
}
/**
* 访客主动退出排队:标记离开时间并软删除队列成员记录
*/
public void visitorExitQueue(String threadUid) {
Optional<QueueMemberEntity> optional = findByThreadUid(threadUid);
if (!optional.isPresent()) {
return;
}
QueueMemberEntity entity = optional.get();
entity.setVisitorLeavedAt(BdDateUtils.now());
entity.setDeleted(true); // 不要删除仅修改status状态
entity.setStatus(QueueMemberStatusEnum.CANCELLED.name());
QueueMemberEntity saved = save(entity);
queueNotificationService.publishQueueLeaveNotice(saved);
}
/**
* 扫描超时(未发送首条消息)的排队成员并标记删除
*/
public int cleanupIdleQueueMembers() {
java.time.ZonedDateTime threshold = BdDateUtils.now().minusMinutes(IDLE_QUEUE_TIMEOUT_MINUTES);
java.util.List<QueueMemberEntity> idleList = queueMemberRepository.findIdleBefore(threshold);
int removed = 0;
for (QueueMemberEntity qm : idleList) {
// 只处理仍处于排队状态的线程
if (qm.getThread() != null && qm.getThread().isQueuing()
&& QueueMemberStatusEnum.QUEUING.name().equals(qm.getStatus())) {
// qm.setDeleted(true); // 不要删除仅修改status状态
qm.setVisitorLeavedAt(BdDateUtils.now());
qm.setStatus(QueueMemberStatusEnum.TIMEOUT.name());
QueueMemberEntity saved = save(qm);
queueNotificationService.publishQueueTimeoutNotice(saved);
removed++;
}
}
return removed;
}
/** 客服排队会话org/queue/{agent_uid} */
public ThreadResponse createAgentQueueThread(AgentEntity agent) {
//

View File

@@ -1,274 +0,0 @@
package com.bytedesk.service.queue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.modelmapper.ModelMapper;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import com.bytedesk.core.thread.ThreadEntity;
import com.bytedesk.core.thread.ThreadRestService;
import com.bytedesk.core.thread.enums.ThreadProcessStatusEnum;
import com.bytedesk.core.thread.enums.ThreadTypeEnum;
import com.bytedesk.core.uid.UidUtils;
import com.bytedesk.service.queue_member.QueueMemberEntity;
import com.bytedesk.service.queue_member.QueueMemberRepository;
import com.bytedesk.service.queue_member.QueueMemberResponse;
import com.bytedesk.service.queue_member.QueueMemberRestService;
import com.bytedesk.service.queue_member.QueueMemberStatusEnum;
import com.bytedesk.service.queue.notification.QueueNotificationService;
import com.bytedesk.service.utils.ServiceConvertUtils;
import jakarta.persistence.EntityManager;
@ExtendWith(MockitoExtension.class)
class QueueMemberRestServiceTest {
@Mock
private QueueMemberRepository queueMemberRepository;
@Mock
private UidUtils uidUtils;
@Mock
private ThreadRestService threadRestService;
@Mock
private QueueNotificationService queueNotificationService;
@Mock
private EntityManager entityManager;
private QueueMemberRestService queueMemberRestService;
@BeforeEach
void setUp() {
queueMemberRestService = new QueueMemberRestService(
queueMemberRepository,
new ModelMapper(),
uidUtils,
entityManager,
threadRestService,
queueNotificationService);
}
@Test
void enqueueAssignsSequentialQueueNumbers() {
QueueEntity agentQueue = buildQueue("queue-1");
ThreadEntity threadOne = buildThread("thread-1");
ThreadEntity threadTwo = buildThread("thread-2");
when(queueMemberRepository.findIdleBefore(any())).thenReturn(Collections.emptyList());
when(queueMemberRepository.findFirstByThreadUidAndDeletedFalseAndStatusInForUpdate(anyString(), anyList()))
.thenReturn(Optional.empty());
when(queueMemberRepository.findMaxQueueNumberForQueue(eq(agentQueue), anyString())).thenReturn(0, 1);
when(queueMemberRepository.save(any())).thenAnswer(invocation -> invocation.getArgument(0));
when(uidUtils.getUid()).thenReturn("member-1", "member-2");
QueueMemberEntity first = queueMemberRestService.enqueue(threadOne, agentQueue, QueueTypeEnum.AGENT,
member -> member.setAgentQueue(agentQueue));
QueueMemberEntity second = queueMemberRestService.enqueue(threadTwo, agentQueue, QueueTypeEnum.AGENT,
member -> member.setAgentQueue(agentQueue));
assertEquals(1, first.getQueueNumber());
assertEquals(2, second.getQueueNumber());
verify(queueMemberRepository, times(2)).save(any());
verify(queueMemberRepository, atLeastOnce()).findIdleBefore(any());
}
@Test
void enqueueReturnsExistingMemberWhenDuplicateThread() {
QueueEntity agentQueue = buildQueue("queue-dup");
ThreadEntity thread = buildThread("thread-dup");
QueueMemberEntity existing = QueueMemberEntity.builder()
.uid("existing")
.queueNumber(9)
.status(QueueMemberStatusEnum.QUEUING.name())
.thread(thread)
.agentQueue(agentQueue)
.build();
when(queueMemberRepository.findIdleBefore(any())).thenReturn(Collections.emptyList());
when(queueMemberRepository.findFirstByThreadUidAndDeletedFalseAndStatusInForUpdate(eq("thread-dup"),
anyList())).thenReturn(Optional.of(existing));
QueueMemberEntity result = queueMemberRestService.enqueue(thread, agentQueue, QueueTypeEnum.AGENT,
member -> member.setAgentQueue(agentQueue));
assertThat(result).isSameAs(existing);
verify(queueMemberRepository, never()).save(any());
verify(queueMemberRepository, never()).findMaxQueueNumberForQueue(any(), anyString());
}
@Test
void findAgentQueueMembersReturnsPagedSnapshot() {
Pageable pageable = PageRequest.of(0, 10);
QueueMemberEntity entityOne = buildQueueMemberEntity("member-1", 1);
QueueMemberEntity entityTwo = buildQueueMemberEntity("member-2", 2);
Page<QueueMemberEntity> entityPage = new PageImpl<>(Arrays.asList(entityOne, entityTwo), pageable, 2);
when(queueMemberRepository.findByAgentQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(
eq("queue-uid"), eq(QueueMemberStatusEnum.QUEUING.name()), eq(pageable)))
.thenReturn(entityPage);
try (MockedStatic<ServiceConvertUtils> mocked = Mockito.mockStatic(ServiceConvertUtils.class)) {
mocked.when(() -> ServiceConvertUtils.convertToQueueMemberResponse(entityOne))
.thenReturn(QueueMemberResponse.builder().uid("member-1").queueNumber(1).build());
mocked.when(() -> ServiceConvertUtils.convertToQueueMemberResponse(entityTwo))
.thenReturn(QueueMemberResponse.builder().uid("member-2").queueNumber(2).build());
Page<QueueMemberResponse> result = queueMemberRestService.findAgentQueueMembers("queue-uid",
pageable);
assertThat(result.getTotalElements()).isEqualTo(2);
}
}
@Test
void enqueueClearsPersistenceContextAfterCollision() {
QueueEntity agentQueue = buildQueue("queue-collision");
ThreadEntity thread = buildThread("thread-collision");
DataIntegrityViolationException collision = new DataIntegrityViolationException(
"duplicate key value violates unique constraint UK7aviqofcxw7ae3fped747qrl7");
when(queueMemberRepository.findIdleBefore(any())).thenReturn(Collections.emptyList());
when(queueMemberRepository.findFirstByThreadUidAndDeletedFalseAndStatusInForUpdate(eq("thread-collision"),
anyList())).thenReturn(Optional.empty());
when(queueMemberRepository.findMaxQueueNumberForQueue(eq(agentQueue), anyString())).thenReturn(0);
when(queueMemberRepository.save(any())).thenThrow(collision)
.thenAnswer(invocation -> invocation.getArgument(0));
when(uidUtils.getUid()).thenReturn("member-collision-1", "member-collision-2");
when(entityManager.contains(any())).thenReturn(true);
QueueMemberEntity result = queueMemberRestService.enqueue(thread, agentQueue, QueueTypeEnum.AGENT,
member -> member.setAgentQueue(agentQueue));
assertThat(result.getUid()).isEqualTo("member-collision-2");
assertThat(result.getQueueNumber()).isEqualTo(2);
verify(entityManager, atLeastOnce()).detach(any());
verify(entityManager).clear();
verify(queueMemberRepository, times(2)).save(any());
verify(queueMemberRepository, times(1)).findMaxQueueNumberForQueue(eq(agentQueue), anyString());
}
@Test
void enqueueMonotonicallyIncrementsQueueNumberAfterCollisions() {
QueueEntity workgroupQueue = buildQueue("queue-wg");
QueueEntity robotQueue = buildQueue("queue-robot");
ThreadEntity thread = buildThread("thread-wg");
DataIntegrityViolationException collision = new DataIntegrityViolationException(
"duplicate key value violates unique constraint UK7aviqofcxw7ae3fped747qrl7");
when(queueMemberRepository.findIdleBefore(any())).thenReturn(Collections.emptyList());
when(queueMemberRepository.findFirstByThreadUidAndDeletedFalseAndStatusInForUpdate(eq("thread-wg"),
anyList())).thenReturn(Optional.empty());
when(queueMemberRepository.findMaxQueueNumberForQueue(eq(workgroupQueue), anyString())).thenReturn(30);
when(queueMemberRepository.findMaxQueueNumberForQueue(eq(robotQueue), anyString())).thenReturn(30);
AtomicInteger attempts = new AtomicInteger();
when(queueMemberRepository.save(any())).thenAnswer(invocation -> {
if (attempts.getAndIncrement() < 2) {
throw collision;
}
return invocation.getArgument(0);
});
when(uidUtils.getUid()).thenReturn("member-31", "member-32", "member-33");
QueueMemberEntity result = queueMemberRestService.enqueue(thread, workgroupQueue, QueueTypeEnum.WORKGROUP,
member -> {
member.setWorkgroupQueue(workgroupQueue);
member.setRobotQueue(robotQueue);
});
assertThat(result.getQueueNumber()).isEqualTo(33);
ArgumentCaptor<QueueMemberEntity> captor = ArgumentCaptor.forClass(QueueMemberEntity.class);
verify(queueMemberRepository, times(3)).save(captor.capture());
assertThat(captor.getAllValues().stream().map(QueueMemberEntity::getQueueNumber).toList())
.containsExactly(31, 32, 33);
verify(queueMemberRepository, times(1)).findMaxQueueNumberForQueue(eq(workgroupQueue), anyString());
}
@Test
void enqueueWorkgroupSeedsFromHighestAssociatedQueue() {
QueueEntity workgroupQueue = buildQueue("queue-wg-high");
QueueEntity robotQueue = buildQueue("queue-robot-high");
ThreadEntity thread = buildThread("thread-wg-high");
when(queueMemberRepository.findIdleBefore(any())).thenReturn(Collections.emptyList());
when(queueMemberRepository.findFirstByThreadUidAndDeletedFalseAndStatusInForUpdate(eq("thread-wg-high"),
anyList())).thenReturn(Optional.empty());
when(queueMemberRepository.findMaxQueueNumberForQueue(eq(workgroupQueue), eq("WORKGROUP"))).thenReturn(30);
when(queueMemberRepository.findMaxQueueNumberForQueue(eq(robotQueue), eq("ROBOT"))).thenReturn(75);
when(queueMemberRepository.save(any())).thenAnswer(invocation -> invocation.getArgument(0));
when(uidUtils.getUid()).thenReturn("member-seed");
QueueMemberEntity result = queueMemberRestService.enqueue(thread, workgroupQueue, QueueTypeEnum.WORKGROUP,
member -> {
member.setWorkgroupQueue(workgroupQueue);
member.setRobotQueue(robotQueue);
});
assertThat(result.getQueueNumber()).isEqualTo(76);
}
private QueueEntity buildQueue(String uid) {
return QueueEntity.builder()
.uid(uid)
.nickname("Queue " + uid)
.topic("queue/topic/" + uid)
.day("2025-11-20")
.status(QueueStatusEnum.ACTIVE.name())
.build();
}
private ThreadEntity buildThread(String uid) {
ThreadEntity thread = new ThreadEntity();
thread.setUid(uid);
thread.setTopic("thread/topic/" + uid);
thread.setOrgUid("org-1");
thread.setStatus(ThreadProcessStatusEnum.QUEUING.name());
thread.setType(ThreadTypeEnum.AGENT.name());
thread.setCreatedAt(ZonedDateTime.now());
return thread;
}
private QueueMemberEntity buildQueueMemberEntity(String uid, int queueNumber) {
QueueMemberEntity entity = QueueMemberEntity.builder()
.uid(uid)
.queueNumber(queueNumber)
.status(QueueMemberStatusEnum.QUEUING.name())
.build();
entity.setAgentQueue(buildQueue("queue-uid"));
entity.setThread(buildThread("thread-" + uid));
return entity;
}
}