This commit is contained in:
jack ning
2025-11-20 17:31:25 +08:00
parent cdc382b3c0
commit 7e5feb66a5
10 changed files with 95 additions and 90 deletions

View File

@@ -52,6 +52,7 @@ import com.bytedesk.service.queue.exception.QueueMemberAlreadyExistsException;
import com.bytedesk.service.queue_member.QueueMemberEntity;
import com.bytedesk.service.queue_member.QueueMemberRestService;
import com.bytedesk.service.queue_member.QueueMemberResponse;
import com.bytedesk.service.queue_member.QueueMemberStatusEnum;
import com.bytedesk.service.utils.ServiceConvertUtils;
import com.bytedesk.service.workgroup.WorkgroupEntity;
import com.bytedesk.service.workgroup.WorkgroupRepository;
@@ -261,9 +262,11 @@ public class QueueRestService extends BaseRestServiceWithExport<QueueEntity, Que
ThreadEntity thread = requireThread(request.getThreadUid());
ensureSameOrg(agent, thread);
queueMemberRestService.findActiveByThreadUid(thread.getUid()).ifPresent(existing -> {
throw new QueueMemberAlreadyExistsException("Thread " + thread.getUid() + " already queued");
});
queueMemberRestService.findByThreadUid(thread.getUid())
.filter(existing -> QueueMemberStatusEnum.QUEUING.name().equals(existing.getStatus()))
.ifPresent(existing -> {
throw new QueueMemberAlreadyExistsException("Thread " + thread.getUid() + " already queued");
});
QueueMemberEntity queueMemberEntity = queueService.enqueueAgent(thread, agent, request.getVisitor());
return queueMemberRestService.convertToResponse(queueMemberEntity);

View File

@@ -58,6 +58,11 @@ public class QueueService {
private final QueueNotificationService queueNotificationService;
private Optional<QueueMemberEntity> findQueueingMember(String threadUid) {
return queueMemberRestService.findByThreadUid(threadUid)
.filter(member -> QueueMemberStatusEnum.QUEUING.name().equals(member.getStatus()));
}
@Transactional
public QueueMemberEntity enqueueRobot(ThreadEntity threadEntity, UserProtobuf agent, VisitorRequest visitorRequest) {
return enqueueToQueue(threadEntity, agent, null, QueueTypeEnum.ROBOT);
@@ -73,7 +78,7 @@ public class QueueService {
public QueueEnqueueResult enqueueAgentWithResult(ThreadEntity threadEntity, AgentEntity agentEntity,
VisitorRequest visitorRequest) {
UserProtobuf agent = agentEntity.toUserProtobuf();
boolean alreadyQueued = queueMemberRestService.findActiveByThreadUid(threadEntity.getUid()).isPresent();
boolean alreadyQueued = findQueueingMember(threadEntity.getUid()).isPresent();
QueueMemberEntity queueMemberEntity = enqueueToQueue(threadEntity, agent, null, QueueTypeEnum.AGENT);
if (!alreadyQueued) {
queueNotificationService.publishQueueJoinNotice(agentEntity, queueMemberEntity);
@@ -96,7 +101,7 @@ public class QueueService {
@Transactional
public QueueEnqueueResult enqueueWorkgroupWithResult(ThreadEntity threadEntity, UserProtobuf agent,
WorkgroupEntity workgroupEntity, VisitorRequest visitorRequest) {
boolean alreadyQueued = queueMemberRestService.findActiveByThreadUid(threadEntity.getUid()).isPresent();
boolean alreadyQueued = findQueueingMember(threadEntity.getUid()).isPresent();
QueueMemberEntity queueMemberEntity = enqueueToQueue(threadEntity, agent, workgroupEntity, QueueTypeEnum.WORKGROUP);
return new QueueEnqueueResult(queueMemberEntity, alreadyQueued);
}
@@ -210,7 +215,7 @@ public class QueueService {
WorkgroupEntity workgroupEntity, QueueTypeEnum queueType) {
// 1. 检查是否已存在队列成员
Optional<QueueMemberEntity> memberOptional = queueMemberRestService.findActiveByThreadUidForUpdate(threadEntity.getUid());
Optional<QueueMemberEntity> memberOptional = findQueueingMember(threadEntity.getUid());
if (memberOptional.isPresent()) {
return handleExistingMember(memberOptional.get(), agent, threadEntity, queueType);
}
@@ -272,7 +277,7 @@ public class QueueService {
.uid(uidUtils.getUid())
.thread(threadEntity)
.queueNumber(primaryQueue.getNextNumber())
.joinedAt(BdDateUtils.now())
.visitorEnqueueAt(BdDateUtils.now())
.orgUid(threadEntity.getOrgUid());
// 根据队列类型设置相应的队列
@@ -315,11 +320,23 @@ public class QueueService {
* 保存队列成员并验证结果
*/
private QueueMemberEntity saveQueueMember(QueueMemberEntity member) {
QueueMemberEntity updatedMember = queueMemberRestService.save(member);
if (updatedMember == null) {
throw new RuntimeException("Failed to save queue member");
try {
QueueMemberEntity updatedMember = queueMemberRestService.save(member);
if (updatedMember == null) {
throw new RuntimeException("Failed to save queue member");
}
return updatedMember;
} catch (DataIntegrityViolationException ex) {
String threadUid = member.getThread() != null ? member.getThread().getUid() : null;
if (threadUid != null) {
Optional<QueueMemberEntity> existingMember = queueMemberRestService.findByThreadUid(threadUid);
if (existingMember.isPresent()) {
log.debug("Queue member already exists for thread {}, returning existing record", threadUid);
return existingMember.get();
}
}
throw ex;
}
return updatedMember;
}
/**

View File

@@ -85,8 +85,7 @@ public class QueueMemberEntity extends BaseEntity {
private Integer queueNumber = 0; // 排队号码
@Builder.Default
@Column(name = "visitor_enqueue_at")
private ZonedDateTime joinedAt = BdDateUtils.now(); // 加入时间
private ZonedDateTime visitorEnqueueAt = BdDateUtils.now(); // 加入时间
private ZonedDateTime lastNotifiedAt; // 最近一次通知时间
@@ -296,14 +295,14 @@ public class QueueMemberEntity extends BaseEntity {
* 计算等待时间(秒)
*/
public long getWaitLength() {
if (joinedAt == null) return 0;
if (visitorEnqueueAt == null) return 0;
if (thread.isOffline() || agentOffline) return 0;
// 首先判断robotAcceptTime是否为空如果不为空则使用robotAcceptTime作为结束时间
if (robotAcceptedAt != null) {
return Duration.between(joinedAt, robotAcceptedAt).getSeconds();
return Duration.between(visitorEnqueueAt, robotAcceptedAt).getSeconds();
}
ZonedDateTime endWaitLength = agentAcceptedAt != null ? agentAcceptedAt : BdDateUtils.now();
return Duration.between(joinedAt, endWaitLength).getSeconds();
return Duration.between(visitorEnqueueAt, endWaitLength).getSeconds();
}
public void manualAcceptThread() {

View File

@@ -55,7 +55,7 @@ public class QueueMemberExcel extends BaseExcel {
@ExcelProperty(value = "入队时间", converter = com.bytedesk.core.converter.ZonedDateTimeConverter.class)
@DateTimeFormat("yyyy-MM-dd HH:mm:ss")
@ColumnWidth(25)
private ZonedDateTime joinedAt;
private ZonedDateTime visitorEnqueueAt;
@ExcelProperty(value = "状态")
@ColumnWidth(20)

View File

@@ -30,73 +30,69 @@ import com.bytedesk.service.queue.QueueEntity;
import jakarta.persistence.LockModeType;
public interface QueueMemberRepository
extends JpaRepository<QueueMemberEntity, Long>, JpaSpecificationExecutor<QueueMemberEntity> {
extends JpaRepository<QueueMemberEntity, Long>, JpaSpecificationExecutor<QueueMemberEntity> {
Optional<QueueMemberEntity> findByUid(String uid);
Optional<QueueMemberEntity> findByUid(String uid);
List<QueueMemberEntity> findByOrgUidAndCreatedAtBetweenAndResolved(String orgUid, ZonedDateTime startTime,
ZonedDateTime endTime, boolean resolved);
List<QueueMemberEntity> findByOrgUidAndCreatedAtBetweenAndResolved(String orgUid, ZonedDateTime startTime,
ZonedDateTime endTime, boolean resolved);
List<QueueMemberEntity> findByOrgUidAndCreatedAtBetweenAndAgentAcceptType(String orgUid, ZonedDateTime startTime,
ZonedDateTime endTime, String acceptType);
List<QueueMemberEntity> findByOrgUidAndCreatedAtBetweenAndAgentAcceptType(String orgUid,
ZonedDateTime startTime,
ZonedDateTime endTime, String acceptType);
// 修改查询方法,使用 JPQL 通过关联的 Thread 实体的 uid 字段查询
@Query("SELECT qm FROM QueueMemberEntity qm WHERE qm.thread.uid = :threadUid AND qm.deleted = false")
Optional<QueueMemberEntity> findByThreadUid(@Param("threadUid") String threadUid);
// 修改查询方法,使用 JPQL 通过关联的 Thread 实体的 uid 字段查询
@Query("SELECT qm FROM QueueMemberEntity qm WHERE qm.thread.uid = :threadUid AND qm.deleted = false")
Optional<QueueMemberEntity> findByThreadUid(@Param("threadUid") String threadUid);
// 统计指定组织在指定日期范围内的会话总数
@Query("SELECT COUNT(qm) FROM QueueMemberEntity qm WHERE qm.orgUid = :orgUid AND qm.createdAt >= :startDate AND qm.createdAt <= :endDate")
Long countByOrgUidAndDateBetween(@Param("orgUid") String orgUid, @Param("startDate") ZonedDateTime startDate,
@Param("endDate") ZonedDateTime endDate);
// 统计指定组织在指定日期范围内的会话总数
@Query("SELECT COUNT(qm) FROM QueueMemberEntity qm WHERE qm.orgUid = :orgUid AND qm.createdAt >= :startDate AND qm.createdAt <= :endDate")
Long countByOrgUidAndDateBetween(@Param("orgUid") String orgUid, @Param("startDate") ZonedDateTime startDate,
@Param("endDate") ZonedDateTime endDate);
// 统计指定工作组在指定日期范围内的会话总数
@Query("SELECT COUNT(qm) FROM QueueMemberEntity qm WHERE qm.orgUid = :orgUid AND qm.workgroupQueue IS NOT NULL AND qm.createdAt >= :startDate AND qm.createdAt <= :endDate")
Long countByWorkgroupUidAndDateBetween(@Param("orgUid") String orgUid, @Param("workgroupUid") String workgroupUid,
@Param("startDate") ZonedDateTime startDate, @Param("endDate") ZonedDateTime endDate);
// 统计指定工作组在指定日期范围内的会话总数
@Query("SELECT COUNT(qm) FROM QueueMemberEntity qm WHERE qm.orgUid = :orgUid AND qm.workgroupQueue IS NOT NULL AND qm.createdAt >= :startDate AND qm.createdAt <= :endDate")
Long countByWorkgroupUidAndDateBetween(@Param("orgUid") String orgUid,
@Param("workgroupUid") String workgroupUid,
@Param("startDate") ZonedDateTime startDate, @Param("endDate") ZonedDateTime endDate);
// 统计指定客服在指定日期范围内的会话总数
@Query("SELECT COUNT(qm) FROM QueueMemberEntity qm WHERE qm.thread.agent LIKE CONCAT('%', :agentUid, '%') AND qm.createdAt >= :startDate AND qm.createdAt <= :endDate")
Long countByAgentUidAndDateBetween(@Param("agentUid") String agentUid, @Param("startDate") ZonedDateTime startDate,
@Param("endDate") ZonedDateTime endDate);
// 统计指定客服在指定日期范围内的会话总数
@Query("SELECT COUNT(qm) FROM QueueMemberEntity qm WHERE qm.thread.agent LIKE CONCAT('%', :agentUid, '%') AND qm.createdAt >= :startDate AND qm.createdAt <= :endDate")
Long countByAgentUidAndDateBetween(@Param("agentUid") String agentUid,
@Param("startDate") ZonedDateTime startDate,
@Param("endDate") ZonedDateTime endDate);
// 统计指定机器人在指定日期范围内的会话总数
@Query("SELECT COUNT(qm) FROM QueueMemberEntity qm WHERE qm.orgUid = :orgUid AND qm.robotQueue IS NOT NULL AND qm.createdAt >= :startDate AND qm.createdAt <= :endDate")
Long countByRobotUidAndDateBetween(@Param("orgUid") String orgUid, @Param("robotUid") String robotUid,
@Param("startDate") ZonedDateTime startDate, @Param("endDate") ZonedDateTime endDate);
// 统计指定机器人在指定日期范围内的会话总数
@Query("SELECT COUNT(qm) FROM QueueMemberEntity qm WHERE qm.orgUid = :orgUid AND qm.robotQueue IS NOT NULL AND qm.createdAt >= :startDate AND qm.createdAt <= :endDate")
Long countByRobotUidAndDateBetween(@Param("orgUid") String orgUid, @Param("robotUid") String robotUid,
@Param("startDate") ZonedDateTime startDate, @Param("endDate") ZonedDateTime endDate);
/**
* 查找在指定时间之前仍未发送任何访客消息(visitorMessageCount=0)的排队成员
*/
@Query("SELECT qm FROM QueueMemberEntity qm WHERE qm.visitorMessageCount = 0 AND qm.deleted = false AND qm.joinedAt < :threshold")
List<QueueMemberEntity> findIdleBefore(@Param("threshold") ZonedDateTime threshold);
/**
* 查找在指定时间之前仍未发送任何访客消息(visitorMessageCount=0)的排队成员
*/
@Query("SELECT qm FROM QueueMemberEntity qm WHERE qm.visitorMessageCount = 0 AND qm.deleted = false AND qm.visitorEnqueueAt < :threshold")
List<QueueMemberEntity> findIdleBefore(@Param("threshold") ZonedDateTime threshold);
Optional<QueueMemberEntity> findFirstByAgentQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(
String agentQueueUid, String status);
Optional<QueueMemberEntity> findFirstByAgentQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(
String agentQueueUid, String status);
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT qm FROM QueueMemberEntity qm WHERE qm.agentQueue.uid = :agentQueueUid AND qm.deleted = false AND qm.status = :status ORDER BY qm.queueNumber ASC")
List<QueueMemberEntity> findAgentQueueHeadForUpdate(@Param("agentQueueUid") String agentQueueUid,
@Param("status") String status,
Pageable pageable);
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT qm FROM QueueMemberEntity qm WHERE qm.agentQueue.uid = :agentQueueUid AND qm.deleted = false AND qm.status = :status ORDER BY qm.queueNumber ASC")
List<QueueMemberEntity> findAgentQueueHeadForUpdate(@Param("agentQueueUid") String agentQueueUid,
@Param("status") String status,
Pageable pageable);
Optional<QueueMemberEntity> findFirstByWorkgroupQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(
String workgroupQueueUid, String status);
Optional<QueueMemberEntity> findFirstByWorkgroupQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(
String workgroupQueueUid, String status);
Optional<QueueMemberEntity> findFirstByRobotQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(
String robotQueueUid, String status);
Optional<QueueMemberEntity> findFirstByRobotQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(
String robotQueueUid, String status);
Page<QueueMemberEntity> findByAgentQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(String agentQueueUid,
String status, Pageable pageable);
Page<QueueMemberEntity> findByAgentQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(String agentQueueUid,
String status, Pageable pageable);
Optional<QueueMemberEntity> findFirstByThreadUidAndDeletedFalseAndStatusIn(String threadUid, List<String> statuses);
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT qm FROM QueueMemberEntity qm WHERE qm.thread.uid = :threadUid AND qm.deleted = false AND qm.status IN :statuses")
Optional<QueueMemberEntity> findFirstByThreadUidAndDeletedFalseAndStatusInForUpdate(
@Param("threadUid") String threadUid, @Param("statuses") List<String> statuses);
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT COALESCE(MAX(qm.queueNumber), 0) FROM QueueMemberEntity qm WHERE qm.deleted = false AND ((:queueType = 'AGENT' AND qm.agentQueue = :queue) OR (:queueType = 'WORKGROUP' AND qm.workgroupQueue = :queue) OR (:queueType = 'ROBOT' AND qm.robotQueue = :queue))")
Integer findMaxQueueNumberForQueue(@Param("queue") QueueEntity queue, @Param("queueType") String queueType);
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT COALESCE(MAX(qm.queueNumber), 0) FROM QueueMemberEntity qm WHERE qm.deleted = false AND ((:queueType = 'AGENT' AND qm.agentQueue = :queue) OR (:queueType = 'WORKGROUP' AND qm.workgroupQueue = :queue) OR (:queueType = 'ROBOT' AND qm.robotQueue = :queue))")
Integer findMaxQueueNumberForQueue(@Param("queue") QueueEntity queue, @Param("queueType") String queueType);
}

View File

@@ -62,7 +62,7 @@ public class QueueMemberRequest extends BaseRequest {
private Integer queueNumber = 0; // 排队号码
@Builder.Default
private ZonedDateTime joinedAt = BdDateUtils.now(); // 加入时间
private ZonedDateTime visitorEnqueueAt = BdDateUtils.now(); // 加入时间
private ZonedDateTime lastNotifiedAt; // 上次通知时间
@@ -193,18 +193,18 @@ public class QueueMemberRequest extends BaseRequest {
private String endDate;
// /**
// * @deprecated 请改用 joinedAt
// * @deprecated 请改用 visitorEnqueueAt
// */
// @Deprecated
// public ZonedDateTime getVisitorEnqueueAt() {
// return joinedAt;
// return visitorEnqueueAt;
// }
// /**
// * @deprecated 请改用 joinedAt
// * @deprecated 请改用 visitorEnqueueAt
// */
// @Deprecated
// public void setVisitorEnqueueAt(ZonedDateTime visitorEnqueueAt) {
// this.joinedAt = visitorEnqueueAt;
// this.visitorEnqueueAt = visitorEnqueueAt;
// }
}

View File

@@ -59,7 +59,7 @@ public class QueueMemberResponse extends BaseResponse {
* 统计访客消息总数
*/
@Builder.Default
private ZonedDateTime joinedAt = BdDateUtils.now(); // 加入时间
private ZonedDateTime visitorEnqueueAt = BdDateUtils.now(); // 加入时间
private ZonedDateTime lastNotifiedAt; // 最近一次通知时间
@@ -242,7 +242,7 @@ public class QueueMemberResponse extends BaseResponse {
// ZonedDateTime 字段的格式化 getter 方法
public String getJoinedAt() {
return BdDateUtils.formatDatetimeToString(joinedAt);
return BdDateUtils.formatDatetimeToString(visitorEnqueueAt);
}
// /**

View File

@@ -13,7 +13,6 @@
*/
package com.bytedesk.service.queue_member;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -65,7 +64,6 @@ public class QueueMemberRestService extends BaseRestServiceWithExport<QueueMembe
private static final int IDLE_QUEUE_TIMEOUT_MINUTES = 5; // 超过5分钟未发首条消息视为过期
private static final String AGENT_QUEUE_THREAD_CACHE = "agent_queue_thread_uid";
private final ThreadRestService threadRestService;
private static final List<String> ACTIVE_STATUSES = Collections.singletonList(QueueMemberStatusEnum.QUEUING.name());
// private static final int MAX_ENQUEUE_RETRIES = 20;
// private static final long COLLISION_BACKOFF_MILLIS = 25L;
// private static final String QUEUE_MEMBER_TABLE_NAME = resolveQueueMemberTableName();
@@ -74,14 +72,6 @@ public class QueueMemberRestService extends BaseRestServiceWithExport<QueueMembe
@Lazy
private final QueueNotificationService queueNotificationService;
public Optional<QueueMemberEntity> findActiveByThreadUid(String threadUid) {
return queueMemberRepository.findFirstByThreadUidAndDeletedFalseAndStatusIn(threadUid, ACTIVE_STATUSES);
}
public Optional<QueueMemberEntity> findActiveByThreadUidForUpdate(String threadUid) {
return queueMemberRepository.findFirstByThreadUidAndDeletedFalseAndStatusInForUpdate(threadUid, ACTIVE_STATUSES);
}
public Optional<QueueMemberEntity> findEarliestAgentQueueMember(String agentQueueUid) {
return queueMemberRepository.findFirstByAgentQueue_UidAndDeletedFalseAndStatusOrderByQueueNumberAsc(agentQueueUid, QueueMemberStatusEnum.QUEUING.name());

View File

@@ -15,7 +15,7 @@
<include file="db/changelog/migration/251111_alter_thread_content_to_text.xml" />
<include file="db/changelog/migration/251117_rename_ticket_title_to_summary.xml" />
<include file="db/changelog/migration/251120_queue_member_core_fields.xml" />
<!-- <include file="db/changelog/migration/251120_queue_member_core_fields.xml" /> -->
<include file="db/changelog/migration/251120_queue_settings_notice_batch_window.xml" />
</databaseChangeLog>

View File

@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- <?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
@@ -167,4 +167,4 @@
</rollback>
</changeSet>
</databaseChangeLog>
</databaseChangeLog> -->