diff --git a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java index ff7eb611b1..bd27a32f9a 100644 --- a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java +++ b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java @@ -41,7 +41,7 @@ public class JmsArtemisConfig { private final JmsErrorHandler jmsErrorHandler; - @Bean + @Bean public JmsListenerContainerFactory jmsArtemisQueueFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); @@ -49,7 +49,8 @@ public class JmsArtemisConfig { configurer.configure(factory, connectionFactory); // You could still override some settings if necessary. factory.setPubSubDomain(false); - // factory.setConcurrency("3-10"); + // 设置确认模式为客户端确认,确保消息处理成功后才被确认 + factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); // 设置错误处理器 factory.setErrorHandler(jmsErrorHandler); return factory; diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/config/ArtemisConfig.java b/modules/kbase/src/main/java/com/bytedesk/kbase/config/ArtemisConfig.java deleted file mode 100644 index f4f2cb3060..0000000000 --- a/modules/kbase/src/main/java/com/bytedesk/kbase/config/ArtemisConfig.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * @Author: jackning 270580156@qq.com - * @Date: 2025-05-17 10:15:00 - * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-05-17 10:15:00 - * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk - * Please be aware of the BSL license restrictions before installing Bytedesk IM – - * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. - * Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE - * contact: 270580156@qq.com - * 联系:270580156@qq.com - * Copyright (c) 2024 by bytedesk.com, All Rights Reserved. - */ -package com.bytedesk.kbase.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.jms.annotation.EnableJms; -import org.springframework.jms.config.DefaultJmsListenerContainerFactory; -import org.springframework.jms.config.JmsListenerContainerFactory; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.support.converter.MappingJackson2MessageConverter; -import org.springframework.jms.support.converter.MessageConverter; -import org.springframework.jms.support.converter.MessageType; - -import jakarta.jms.ConnectionFactory; - -/** - * Artemis配置类 - * 用于配置JMS相关的Bean - */ -@Configuration -@EnableJms -public class ArtemisConfig { - - /** - * 配置消息转换器,使用Jackson将Java对象转换为JSON格式 - */ - @Bean - public MessageConverter jacksonJmsMessageConverter() { - MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); - converter.setTargetType(MessageType.TEXT); - converter.setTypeIdPropertyName("_type"); - return converter; - } - - /** - * 配置JMS监听器容器工厂 - */ - @Bean - public JmsListenerContainerFactory faqJmsListenerContainerFactory(ConnectionFactory connectionFactory) { - DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); - factory.setConnectionFactory(connectionFactory); - factory.setMessageConverter(jacksonJmsMessageConverter()); - // 配置并发消费者数量,动态调整 - factory.setConcurrency("3-10"); - // 使用事务管理 - factory.setSessionTransacted(true); - // 设置消息确认模式 - factory.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE); - // 设置恢复间隔,默认5000ms - factory.setRecoveryInterval(5000L); - return factory; - } - - /** - * 配置JMS模板 - */ - @Bean - public JmsTemplate faqJmsTemplate(ConnectionFactory connectionFactory) { - JmsTemplate template = new JmsTemplate(connectionFactory); - template.setMessageConverter(jacksonJmsMessageConverter()); - // 设置消息确认模式为客户端确认 - template.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE); - // 设置默认目标名称 - template.setDefaultDestinationName("bytedesk.faq.index"); - return template; - } -} diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqEventListener.java b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqEventListener.java index 5a03f57b9a..745a5a7224 100644 --- a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqEventListener.java +++ b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqEventListener.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2024-09-07 15:42:23 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-05-17 10:20:00 + * @LastEditTime: 2025-05-17 09:14:03 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. @@ -36,9 +36,9 @@ import lombok.extern.slf4j.Slf4j; @AllArgsConstructor public class FaqEventListener { - private final FaqElasticService faqService; + // private final FaqElasticService faqService; - private final FaqVectorService faqVectorService; + // private final FaqVectorService faqVectorService; private final FaqRestService faqRestService; diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexConsumer.java b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexConsumer.java index fc3c7ffd08..df7ed94f59 100644 --- a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexConsumer.java +++ b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexConsumer.java @@ -40,11 +40,14 @@ public class FaqIndexConsumer { /** * 处理FAQ索引队列中的消息 + * 使用客户端确认模式,只有成功处理后才确认消息 * + * @param jmsMessage JMS原始消息 * @param message FAQ索引消息 */ - @JmsListener(destination = "bytedesk.faq.index", containerFactory = "jmsArtemisQueueFactory", concurrency = "3-10") - public void processIndexMessage(FaqIndexMessage message) { + @JmsListener(destination = JmsArtemisConstants.QUEUE_FAQ_INDEX, containerFactory = "jmsArtemisQueueFactory", concurrency = "3-10") + public void processIndexMessage(jakarta.jms.Message jmsMessage, FaqIndexMessage message) { + boolean success = false; try { log.debug("接收到FAQ索引请求: {}", message.getFaqUid()); @@ -56,6 +59,8 @@ public class FaqIndexConsumer { Optional optionalFaq = faqRestService.findByUid(message.getFaqUid()); if (!optionalFaq.isPresent()) { log.warn("无法找到要索引的FAQ: {}", message.getFaqUid()); + // 消息处理完成,但没有找到实体,也认为是成功的(避免重复处理已删除的实体) + success = true; return; } @@ -68,15 +73,31 @@ public class FaqIndexConsumer { handleIndexOperation(faq, message); } + // 成功处理消息 + success = true; + } catch (Exception e) { log.error("处理FAQ索引消息时出错: {}", e.getMessage(), e); - } + success = false; // 发生异常,处理失败 + } finally { + // 只有成功处理消息后才确认 + if (success) { + acknowledgeMessage(jmsMessage); + } else { + log.warn("FAQ索引消息处理失败,不确认消息,等待重新处理: {}", message.getFaqUid()); + } } /** * 处理索引操作 + * 如果索引操作失败,将抛出异常以便于消息重试 */ private void handleIndexOperation(FaqEntity faq, FaqIndexMessage message) { + boolean elasticSuccess = true; + boolean vectorSuccess = true; + Exception elasticException = null; + Exception vectorException = null; + // 执行全文索引 if (message.isUpdateElasticIndex()) { try { @@ -84,6 +105,8 @@ public class FaqIndexConsumer { faqElasticService.indexFaq(faq); } catch (Exception e) { log.error("FAQ全文索引创建失败: {}, 错误: {}", faq.getUid(), e.getMessage(), e); + elasticSuccess = false; + elasticException = e; } } @@ -97,8 +120,28 @@ public class FaqIndexConsumer { faqVectorService.indexFaqVector(faq); } catch (Exception e) { log.error("FAQ向量索引创建失败: {}, 错误: {}", faq.getUid(), e.getMessage(), e); + vectorSuccess = false; + vectorException = e; } } + + // 如果有任何索引失败,抛出异常以便于消息重试 + if (!elasticSuccess || !vectorSuccess) { + StringBuilder errorMessage = new StringBuilder("FAQ索引失败: "); + if (!elasticSuccess) { + errorMessage.append("全文索引错误"); + } + if (!vectorSuccess) { + if (!elasticSuccess) { + errorMessage.append(" 和 "); + } + errorMessage.append("向量索引错误"); + } + + // 抛出异常,阻止消息确认,系统将重新投递消息 + throw new RuntimeException(errorMessage.toString(), + !elasticSuccess ? elasticException : vectorException); + } } /** @@ -115,6 +158,7 @@ public class FaqIndexConsumer { } } catch (Exception e) { log.error("从全文索引中删除FAQ失败: {}, 错误: {}", faq.getUid(), e.getMessage(), e); + throw new RuntimeException("全文索引删除失败", e); // 抛出异常以便于消息重试 } } @@ -128,7 +172,25 @@ public class FaqIndexConsumer { } } catch (Exception e) { log.error("从向量索引中删除FAQ失败: {}, 错误: {}", faq.getUid(), e.getMessage(), e); + throw new RuntimeException("向量索引删除失败", e); // 抛出异常以便于消息重试 } } } + + /** + * 安全地确认消息 + * 只有在消息处理成功后才调用此方法 + * + * @param message JMS消息 + */ + private void acknowledgeMessage(jakarta.jms.Message message) { + try { + if (message != null) { + message.acknowledge(); + log.debug("消息已确认"); + } + } catch (jakarta.jms.JMSException e) { + log.error("确认消息失败: {}", e.getMessage(), e); + } + } } diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexTestController.java b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexTestController.java new file mode 100644 index 0000000000..6958cc3929 --- /dev/null +++ b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexTestController.java @@ -0,0 +1,176 @@ +/* + * @Author: jackning 270580156@qq.com + * @Date: 2025-05-17 16:30:00 + * @LastEditors: jackning 270580156@qq.com + * @LastEditTime: 2025-05-17 16:30:00 + * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk + * Please be aware of the BSL license restrictions before installing Bytedesk IM – + * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. + * Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE + * contact: 270580156@qq.com + * 联系:270580156@qq.com + * Copyright (c) 2024 by bytedesk.com, All Rights Reserved. + */ +package com.bytedesk.kbase.faq; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import com.bytedesk.core.base.BaseResultPageJsonController; +import com.bytedesk.core.rbac.auth.AuthService; + +import lombok.extern.slf4j.Slf4j; + +/** + * FAQ索引测试控制器 + * 用于手动触发并发索引测试 + */ +@Slf4j +@RestController +@RequestMapping("/api/faq/test") +public class FaqIndexTestController extends BaseResultPageJsonController { + + @Autowired + private FaqRestService faqRestService; + + @Autowired + private FaqMessageService faqMessageService; + + @Autowired + private AuthService authService; + + /** + * 批量索引FAQ + * + * @param count 索引的FAQ数量 + * @param threads 并发线程数 + * @return 测试结果信息 + */ + @GetMapping("/batch-index") + public ResponseEntity batchIndex( + @RequestParam(required = false, defaultValue = "50") int count, + @RequestParam(required = false, defaultValue = "5") int threads) { + + // 验证管理员权限 + if (!authService.isCurrentUserAdmin()) { + return ResponseEntity.status(403).body("权限不足,仅管理员可执行此操作"); + } + + log.info("启动批量索引测试 - 数量: {}, 线程数: {}", count, threads); + + ExecutorService executorService = Executors.newFixedThreadPool(threads); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + List processedFaqs = new ArrayList<>(); + + int perThread = count / threads; + if (perThread < 1) perThread = 1; + + // 启动所有线程 + for (int i = 0; i < threads; i++) { + executorService.submit(() -> { + try { + for (int j = 0; j < perThread; j++) { + try { + Optional optFaq = faqRestService.findRandomFaq(); + if (optFaq.isPresent()) { + FaqEntity faq = optFaq.get(); + String faqUid = faq.getUid(); + + // 发送到索引队列 + faqMessageService.sendToIndexQueue(faqUid); + + synchronized (processedFaqs) { + processedFaqs.add(faqUid); + } + + successCount.incrementAndGet(); + + // 小暂停避免过载 + Thread.sleep(100); + } + } catch (Exception e) { + log.error("批量索引FAQ时出错: {}", e.getMessage(), e); + failCount.incrementAndGet(); + } + } + } catch (Exception e) { + log.error("执行批量索引线程时出错: {}", e.getMessage(), e); + } + }); + } + + // 关闭线程池并等待完成 + executorService.shutdown(); + try { + executorService.awaitTermination(5, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.warn("批量索引等待中断: {}", e.getMessage()); + Thread.currentThread().interrupt(); + } + + // 返回结果 + Map result = new HashMap<>(); + result.put("success", successCount.get()); + result.put("fail", failCount.get()); + result.put("total", count); + result.put("threads", threads); + result.put("processedCount", processedFaqs.size()); + + return ResponseEntity.ok(result); + } + + /** + * 测试单个FAQ的索引处理 + * + * @param faqUid FAQ的唯一ID + * @return 处理结果 + */ + @GetMapping("/index-single") + public ResponseEntity indexSingle( + @RequestParam String faqUid) { + + try { + // 验证管理员权限 + if (!authService.isCurrentUserAdmin()) { + return ResponseEntity.status(403).body("权限不足,仅管理员可执行此操作"); + } + + log.info("测试单个FAQ索引: {}", faqUid); + + // 检查FAQ是否存在 + Optional optFaq = faqRestService.findByUid(faqUid); + if (!optFaq.isPresent()) { + return ResponseEntity.notFound().build(); + } + + // 发送到索引队列 + faqMessageService.sendToIndexQueue(faqUid); + + Map result = new HashMap<>(); + result.put("status", "success"); + result.put("faqUid", faqUid); + result.put("message", "FAQ索引消息已发送到队列"); + + return ResponseEntity.ok(result); + + } catch (Exception e) { + log.error("索引单个FAQ时出错: {}", e.getMessage(), e); + return ResponseEntity.status(500).body("处理失败: " + e.getMessage()); + } + } +} diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqMessageService.java b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqMessageService.java index e69de29bb2..f0c27ed830 100644 --- a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqMessageService.java +++ b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqMessageService.java @@ -0,0 +1,97 @@ +/* + * @Author: jackning 270580156@qq.com + * @Date: 2025-05-17 10:05:00 + * @LastEditors: jackning 270580156@qq.com + * @LastEditTime: 2025-05-17 10:30:00 + * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk + * Please be aware of the BSL license restrictions before installing Bytedesk IM – + * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. + * Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE + * contact: 270580156@qq.com + * 联系:270580156@qq.com + * Copyright (c) 2024 by bytedesk.com, All Rights Reserved. + */ +package com.bytedesk.kbase.faq; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.stereotype.Service; + +import com.bytedesk.core.jms.JmsArtemisConstants; + +import lombok.extern.slf4j.Slf4j; + +/** + * FAQ消息服务 + * 使用核心模块中的JmsTemplate发送FAQ索引请求 + */ +@Slf4j +@Service +public class FaqMessageService { + + @Autowired + private JmsTemplate jmsTemplate; + + /** + * 发送FAQ到索引队列,用于创建或更新索引 + * + * @param faqUid FAQ的唯一标识 + */ + public void sendToIndexQueue(String faqUid) { + try { + log.debug("发送FAQ到索引队列: {}", faqUid); + + FaqIndexMessage message = FaqIndexMessage.builder() + .faqUid(faqUid) + .operationType("index") + .updateElasticIndex(true) + .updateVectorIndex(true) + .build(); + + // 使用JmsArtemisConstants中定义的常量 + jmsTemplate.convertAndSend(JmsArtemisConstants.QUEUE_FAQ_INDEX, message); + log.debug("消息已发送到队列: {}", JmsArtemisConstants.QUEUE_FAQ_INDEX); + } catch (Exception e) { + log.error("发送FAQ索引消息失败: {}", e.getMessage(), e); + } + } + + /** + * 发送FAQ删除请求到索引队列 + * + * @param faqUid FAQ的唯一标识 + */ + public void sendToDeleteQueue(String faqUid) { + try { + log.debug("发送FAQ删除请求到索引队列: {}", faqUid); + + FaqIndexMessage message = FaqIndexMessage.builder() + .faqUid(faqUid) + .operationType("delete") + .updateElasticIndex(true) + .updateVectorIndex(true) + .build(); + + jmsTemplate.convertAndSend(JmsArtemisConstants.QUEUE_FAQ_INDEX, message); + log.debug("删除消息已发送到队列: {}", JmsArtemisConstants.QUEUE_FAQ_INDEX); + } catch (Exception e) { + log.error("发送FAQ删除消息失败: {}", e.getMessage(), e); + } + } + + /** + * 批量发送FAQ到索引队列 + * + * @param faqUids FAQ唯一标识列表 + */ + public void batchSendToIndexQueue(Iterable faqUids) { + try { + log.debug("批量发送FAQ到索引队列"); + for (String faqUid : faqUids) { + sendToIndexQueue(faqUid); + } + } catch (Exception e) { + log.error("批量发送FAQ索引消息失败: {}", e.getMessage(), e); + } + } +} diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqRepository.java b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqRepository.java index 4b18a5e5ee..86af888fcf 100644 --- a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqRepository.java +++ b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqRepository.java @@ -32,4 +32,12 @@ public interface FaqRepository extends JpaRepository, JpaSpecif Boolean existsByQuestionAndAnswerAndKbase_UidAndOrgUidAndDeletedFalse(String question, String answer, String kbUid, String orgUid); + /** + * 获取随机FAQ,用于测试 + * + * @param limit 限制返回的数量 + * @return 随机FAQ列表 + */ + @org.springframework.data.jpa.repository.Query(value = "SELECT * FROM faq WHERE deleted = false ORDER BY RAND() LIMIT :limit", nativeQuery = true) + List findRandomFaq(@org.springframework.data.repository.query.Param("limit") int limit); } diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqRestService.java b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqRestService.java index 1201c3d46b..0693375949 100644 --- a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqRestService.java +++ b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqRestService.java @@ -706,4 +706,22 @@ public class FaqRestService extends BaseRestServiceWithExcel findRandomFaq() { + try { + // 获取系统中任意一个FAQ + List randomFaqs = faqRepository.findRandomFaq(1); + if (randomFaqs != null && !randomFaqs.isEmpty()) { + return Optional.of(randomFaqs.get(0)); + } + } catch (Exception e) { + log.error("获取随机FAQ时出错: {}", e.getMessage(), e); + } + return Optional.empty(); + } + } diff --git a/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqBulkIndexTester.java b/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqBulkIndexTester.java new file mode 100644 index 0000000000..ef910a5975 --- /dev/null +++ b/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqBulkIndexTester.java @@ -0,0 +1,91 @@ +/* + * @Author: jackning 270580156@qq.com + * @Date: 2025-05-17 11:00:00 + * @LastEditors: jackning 270580156@qq.com + * @LastEditTime: 2025-05-17 09:12:31 + * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk + * Please be aware of the BSL license restrictions before installing Bytedesk IM – + * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. + * Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE + * contact: 270580156@qq.com + * 联系:270580156@qq.com + * Copyright (c) 2025 by bytedesk.com, All Rights Reserved. + */ +package com.bytedesk.kbase.faq; + +import java.time.LocalDateTime; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; + +/** + * FAQ索引批量测试工具 + * 用于模拟批量处理FAQ创建事件,测试消息队列的有效性 + */ +@Slf4j +@Component +public class FaqBulkIndexTester { + + @Autowired + private FaqMessageService faqMessageService; + + @Autowired + private FaqRepository faqRepository; + + private final ExecutorService executorService = Executors.newFixedThreadPool(5); + + /** + * 测试批量发送FAQ到索引队列 + * 每个FAQ都分别发送一条索引消息 + * + * @param faqCount 测试的FAQ数量 + */ + public void testBulkFaqIndexing(int faqCount) { + log.info("开始测试批量FAQ索引, 数量: {}", faqCount); + + // 从数据库获取前N个FAQ + var faqs = faqRepository.findAll().stream().limit(faqCount).toList(); + if (faqs.isEmpty()) { + log.warn("没有找到FAQ记录,无法进行批量索引测试"); + return; + } + + log.info("共找到{}个FAQ记录用于测试", faqs.size()); + + // 多线程并发发送消息 + for (FaqEntity faq : faqs) { + executorService.submit(() -> { + try { + log.debug("发送FAQ索引请求: {}", faq.getUid()); + faqMessageService.sendToIndexQueue(faq.getUid()); + } catch (Exception e) { + log.error("发送FAQ索引消息失败: {}", e.getMessage(), e); + } + }); + } + + log.info("批量索引测试任务已提交,时间: {}", LocalDateTime.now()); + } + + /** + * 清理资源 + */ + @PreDestroy + public void destroy() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqIndexTestController.java b/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqIndexTestController.java new file mode 100644 index 0000000000..57da3ddaea --- /dev/null +++ b/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqIndexTestController.java @@ -0,0 +1,47 @@ +/* + * @Author: jackning 270580156@qq.com + * @Date: 2025-05-17 11:10:00 + * @LastEditors: jackning 270580156@qq.com + * @LastEditTime: 2025-05-17 11:10:00 + * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk + * Please be aware of the BSL license restrictions before installing Bytedesk IM – + * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. + * Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE + * contact: 270580156@qq.com + * 联系:270580156@qq.com + * Copyright (c) 2025 by bytedesk.com, All Rights Reserved. + */ +package com.bytedesk.kbase.faq; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; + +/** + * FAQ索引测试控制器 + * 用于手动触发FAQ索引测试 + */ +@Tag(name = "FAQ索引测试") +@RestController +@RequestMapping("/api/v1/kbase/faq/test") +public class FaqIndexTestController { + + @Autowired + private FaqBulkIndexTester faqBulkIndexTester; + + @Operation(summary = "测试批量FAQ索引") + @PostMapping("/bulk-index") + public ResponseEntity testBulkFaqIndexing( + @RequestParam(value = "count", defaultValue = "10") int count) { + + faqBulkIndexTester.testBulkFaqIndexing(count); + + return ResponseEntity.ok("FAQ批量索引测试已启动,索引数量: " + count); + } +} diff --git a/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqMessageQueueTests.java b/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqMessageQueueTests.java index 0a74fd0034..768e8e20a4 100644 --- a/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqMessageQueueTests.java +++ b/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqMessageQueueTests.java @@ -1,16 +1,31 @@ package com.bytedesk.kbase.faq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.util.List; import java.util.Optional; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import jakarta.jms.JMSException; +import jakarta.jms.Message; import lombok.extern.slf4j.Slf4j; /** * FAQ消息队列测试类 + * 包含集成测试和单元测试 */ @Slf4j @SpringBootTest @@ -22,6 +37,97 @@ public class FaqMessageQueueTests { @Autowired private FaqRepository faqRepository; + /** + * 测试客户端确认模式的消息队列处理 + */ + @ExtendWith(MockitoExtension.class) + static class ClientAcknowledgeTests { + + @Mock + private FaqElasticService faqElasticService; + + @Mock + private FaqVectorService faqVectorService; + + @Mock + private FaqRestService faqRestService; + + @InjectMocks + private FaqIndexConsumer faqIndexConsumer; + + /** + * 测试成功处理消息时确认消息 + */ + @Test + void testMessageAcknowledgmentOnSuccess() throws Exception { + // 准备测试数据 + String faqUid = "test-faq-uid"; + FaqIndexMessage indexMessage = new FaqIndexMessage(faqUid, "index", true, true); + Message jmsMessage = mock(Message.class); + + // 模拟FAQ存在 + FaqEntity mockFaq = new FaqEntity(); + mockFaq.setUid(faqUid); + when(faqRestService.findByUid(faqUid)).thenReturn(Optional.of(mockFaq)); + + // 调用被测试的方法 + faqIndexConsumer.processIndexMessage(jmsMessage, indexMessage); + + // 验证消息被确认 + verify(jmsMessage, times(1)).acknowledge(); + } + + /** + * 测试处理失败时不确认消息 + */ + @Test + void testNoAcknowledgmentOnFailure() throws Exception { + // 准备测试数据 + String faqUid = "test-faq-uid"; + FaqIndexMessage indexMessage = new FaqIndexMessage(faqUid, "index", true, true); + Message jmsMessage = mock(Message.class); + + // 模拟FAQ存在 + FaqEntity mockFaq = new FaqEntity(); + mockFaq.setUid(faqUid); + when(faqRestService.findByUid(faqUid)).thenReturn(Optional.of(mockFaq)); + + // 模拟索引操作抛出异常 + doThrow(new RuntimeException("模拟索引失败")).when(faqElasticService).indexFaq(any()); + + // 调用被测试的方法 + faqIndexConsumer.processIndexMessage(jmsMessage, indexMessage); + + // 验证消息没有被确认 + verify(jmsMessage, never()).acknowledge(); + } + + /** + * 测试确认消息时出现异常的情况 + */ + @Test + void testAcknowledgmentException() throws Exception { + // 准备测试数据 + String faqUid = "test-faq-uid"; + FaqIndexMessage indexMessage = new FaqIndexMessage(faqUid, "index", true, false); + Message jmsMessage = mock(Message.class); + + // 模拟FAQ存在 + FaqEntity mockFaq = new FaqEntity(); + mockFaq.setUid(faqUid); + when(faqRestService.findByUid(faqUid)).thenReturn(Optional.of(mockFaq)); + + // 模拟确认消息时抛出异常 + doThrow(new JMSException("模拟确认失败")).when(jmsMessage).acknowledge(); + + // 调用被测试的方法 - 不应抛出异常 + faqIndexConsumer.processIndexMessage(jmsMessage, indexMessage); + + // 验证尝试确认消息 + verify(jmsMessage, times(1)).acknowledge(); + } + } + /** * 测试发送单个FAQ到索引队列 */ diff --git a/starter/src/main/resources/artemis.properties b/starter/src/main/resources/artemis.properties new file mode 100644 index 0000000000..f563c4e5dc --- /dev/null +++ b/starter/src/main/resources/artemis.properties @@ -0,0 +1,17 @@ +# =============================== +#= Artemis JMS Properties +# =============================== +# 嵌入式Artemis配置 +spring.artemis.mode=embedded +spring.artemis.embedded.enabled=true +# 配置队列和主题 +spring.artemis.embedded.queues=artemis.queue.faq.index +# 可选:数据存储路径 +spring.artemis.embedded.data-directory=${user.dir}/artemis-data +# 并发消费者数 +spring.jms.listener.concurrency=1 +spring.jms.listener.max-concurrency=10 +# 消息确认模式 +spring.jms.listener.acknowledge-mode=client +# 会话事务 +spring.jms.listener.auto-startup=true diff --git a/starter/src/main/resources/properties/artemis.properties b/starter/src/main/resources/properties/artemis.properties new file mode 100644 index 0000000000..60db496e89 --- /dev/null +++ b/starter/src/main/resources/properties/artemis.properties @@ -0,0 +1,16 @@ +spring.artemis.mode=embedded +spring.artemis.embedded.enabled=true +spring.artemis.embedded.queues=artemis.queue.faq.index + +# 客户端确认模式设置 +spring.jms.listener.acknowledge-mode=client + +# 配置消息重传机制 +spring.artemis.embedded.max-delivery-attempts=5 +spring.artemis.embedded.redelivery-delay=5000 +spring.artemis.embedded.redelivery-delay-multiplier=2.0 +spring.artemis.embedded.max-redelivery-delay=60000 + +# 优化消息处理和事务 +spring.jms.listener.concurrency=2 +spring.jms.listener.max-concurrency=10