diff --git a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java index bd27a32f9a..40105493be 100644 --- a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java +++ b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java @@ -65,6 +65,8 @@ public class JmsArtemisConfig { // You could still override some settings if necessary. factory.setPubSubDomain(true); // factory.setConcurrency("3-10"); + // 设置确认模式为客户端确认,确保消息处理成功后才被确认 + factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); // 设置错误处理器 factory.setErrorHandler(jmsErrorHandler); return factory; diff --git a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisListener.java b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisListener.java index 3cb2455271..4619697cac 100644 --- a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisListener.java +++ b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisListener.java @@ -22,6 +22,7 @@ import com.bytedesk.core.config.BytedeskEventPublisher; // import com.bytedesk.core.message.MessageProtobuf; import com.bytedesk.core.utils.ApplicationContextHolder; +import jakarta.jms.JMSException; import lombok.extern.slf4j.Slf4j; @@ -54,10 +55,17 @@ public class JmsArtemisListener { // topic pub sub // @TabooJsonFilter(title = "敏感词", action = "JmsArtemisListener") @JmsListener(destination = JmsArtemisConstants.TOPIC_STRING_NAME, containerFactory = "jmsArtemisPubsubFactory") - public void receiveTopicMessage(String json) { - // log.info("jms receiveTopicMessage string {}", json); - BytedeskEventPublisher bytedeskEventPublisher = ApplicationContextHolder.getBean(BytedeskEventPublisher.class); - bytedeskEventPublisher.publishMessageJsonEvent(json); + public void receiveTopicMessage(String json, jakarta.jms.Message message) throws JMSException { + try { + // log.info("jms receiveTopicMessage string {}", json); + BytedeskEventPublisher bytedeskEventPublisher = ApplicationContextHolder.getBean(BytedeskEventPublisher.class); + bytedeskEventPublisher.publishMessageJsonEvent(json); + // 处理成功后,显式确认消息已被消费 + message.acknowledge(); + } catch (Exception e) { + log.error("处理Topic消息失败: {}", e.getMessage(), e); + throw e; // 重新抛出异常,让错误处理器处理 + } } // @JmsListener(destination = JmsArtemisConstants.TOPIC_MESSAGE_NAME, containerFactory = "jmsArtemisPubsubFactory") diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexConsumer.java b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexConsumer.java index df7ed94f59..5441fb6c0e 100644 --- a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexConsumer.java +++ b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexConsumer.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2025-05-17 10:10:00 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-05-17 10:10:00 + * @LastEditTime: 2025-05-17 09:39:41 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. @@ -86,6 +86,7 @@ public class FaqIndexConsumer { } else { log.warn("FAQ索引消息处理失败,不确认消息,等待重新处理: {}", message.getFaqUid()); } + } } /** diff --git a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexTestController.java b/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexTestController.java deleted file mode 100644 index 6958cc3929..0000000000 --- a/modules/kbase/src/main/java/com/bytedesk/kbase/faq/FaqIndexTestController.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * @Author: jackning 270580156@qq.com - * @Date: 2025-05-17 16:30:00 - * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-05-17 16:30:00 - * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk - * Please be aware of the BSL license restrictions before installing Bytedesk IM – - * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. - * Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE - * contact: 270580156@qq.com - * 联系:270580156@qq.com - * Copyright (c) 2024 by bytedesk.com, All Rights Reserved. - */ -package com.bytedesk.kbase.faq; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -import com.bytedesk.core.base.BaseResultPageJsonController; -import com.bytedesk.core.rbac.auth.AuthService; - -import lombok.extern.slf4j.Slf4j; - -/** - * FAQ索引测试控制器 - * 用于手动触发并发索引测试 - */ -@Slf4j -@RestController -@RequestMapping("/api/faq/test") -public class FaqIndexTestController extends BaseResultPageJsonController { - - @Autowired - private FaqRestService faqRestService; - - @Autowired - private FaqMessageService faqMessageService; - - @Autowired - private AuthService authService; - - /** - * 批量索引FAQ - * - * @param count 索引的FAQ数量 - * @param threads 并发线程数 - * @return 测试结果信息 - */ - @GetMapping("/batch-index") - public ResponseEntity batchIndex( - @RequestParam(required = false, defaultValue = "50") int count, - @RequestParam(required = false, defaultValue = "5") int threads) { - - // 验证管理员权限 - if (!authService.isCurrentUserAdmin()) { - return ResponseEntity.status(403).body("权限不足,仅管理员可执行此操作"); - } - - log.info("启动批量索引测试 - 数量: {}, 线程数: {}", count, threads); - - ExecutorService executorService = Executors.newFixedThreadPool(threads); - AtomicInteger successCount = new AtomicInteger(0); - AtomicInteger failCount = new AtomicInteger(0); - List processedFaqs = new ArrayList<>(); - - int perThread = count / threads; - if (perThread < 1) perThread = 1; - - // 启动所有线程 - for (int i = 0; i < threads; i++) { - executorService.submit(() -> { - try { - for (int j = 0; j < perThread; j++) { - try { - Optional optFaq = faqRestService.findRandomFaq(); - if (optFaq.isPresent()) { - FaqEntity faq = optFaq.get(); - String faqUid = faq.getUid(); - - // 发送到索引队列 - faqMessageService.sendToIndexQueue(faqUid); - - synchronized (processedFaqs) { - processedFaqs.add(faqUid); - } - - successCount.incrementAndGet(); - - // 小暂停避免过载 - Thread.sleep(100); - } - } catch (Exception e) { - log.error("批量索引FAQ时出错: {}", e.getMessage(), e); - failCount.incrementAndGet(); - } - } - } catch (Exception e) { - log.error("执行批量索引线程时出错: {}", e.getMessage(), e); - } - }); - } - - // 关闭线程池并等待完成 - executorService.shutdown(); - try { - executorService.awaitTermination(5, TimeUnit.MINUTES); - } catch (InterruptedException e) { - log.warn("批量索引等待中断: {}", e.getMessage()); - Thread.currentThread().interrupt(); - } - - // 返回结果 - Map result = new HashMap<>(); - result.put("success", successCount.get()); - result.put("fail", failCount.get()); - result.put("total", count); - result.put("threads", threads); - result.put("processedCount", processedFaqs.size()); - - return ResponseEntity.ok(result); - } - - /** - * 测试单个FAQ的索引处理 - * - * @param faqUid FAQ的唯一ID - * @return 处理结果 - */ - @GetMapping("/index-single") - public ResponseEntity indexSingle( - @RequestParam String faqUid) { - - try { - // 验证管理员权限 - if (!authService.isCurrentUserAdmin()) { - return ResponseEntity.status(403).body("权限不足,仅管理员可执行此操作"); - } - - log.info("测试单个FAQ索引: {}", faqUid); - - // 检查FAQ是否存在 - Optional optFaq = faqRestService.findByUid(faqUid); - if (!optFaq.isPresent()) { - return ResponseEntity.notFound().build(); - } - - // 发送到索引队列 - faqMessageService.sendToIndexQueue(faqUid); - - Map result = new HashMap<>(); - result.put("status", "success"); - result.put("faqUid", faqUid); - result.put("message", "FAQ索引消息已发送到队列"); - - return ResponseEntity.ok(result); - - } catch (Exception e) { - log.error("索引单个FAQ时出错: {}", e.getMessage(), e); - return ResponseEntity.status(500).body("处理失败: " + e.getMessage()); - } - } -} diff --git a/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqIndexTestController.java b/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqIndexTestController.java index 57da3ddaea..7049c1f1d5 100644 --- a/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqIndexTestController.java +++ b/modules/kbase/src/test/java/com/bytedesk/kbase/faq/FaqIndexTestController.java @@ -1,47 +1,170 @@ /* * @Author: jackning 270580156@qq.com - * @Date: 2025-05-17 11:10:00 + * @Date: 2025-05-17 16:30:00 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-05-17 11:10:00 + * @LastEditTime: 2025-05-17 09:38:32 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. * Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE * contact: 270580156@qq.com * 联系:270580156@qq.com - * Copyright (c) 2025 by bytedesk.com, All Rights Reserved. + * Copyright (c) 2024 by bytedesk.com, All Rights Reserved. */ package com.bytedesk.kbase.faq; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.extern.slf4j.Slf4j; /** * FAQ索引测试控制器 - * 用于手动触发FAQ索引测试 + * 用于手动触发并发索引测试 */ -@Tag(name = "FAQ索引测试") +@Slf4j @RestController -@RequestMapping("/api/v1/kbase/faq/test") +@RequestMapping("/api/faq/test") public class FaqIndexTestController { - - @Autowired - private FaqBulkIndexTester faqBulkIndexTester; - @Operation(summary = "测试批量FAQ索引") - @PostMapping("/bulk-index") - public ResponseEntity testBulkFaqIndexing( - @RequestParam(value = "count", defaultValue = "10") int count) { + @Autowired + private FaqRestService faqRestService; + + @Autowired + private FaqMessageService faqMessageService; + + /** + * 批量索引FAQ + * + * @param count 索引的FAQ数量 + * @param threads 并发线程数 + * @return 测试结果信息 + */ + @GetMapping("/batch-index") + public ResponseEntity batchIndex( + @RequestParam(required = false, defaultValue = "50") int count, + @RequestParam(required = false, defaultValue = "5") int threads) { - faqBulkIndexTester.testBulkFaqIndexing(count); + // 验证管理员权限 + // if (!authService.isCurrentUserAdmin()) { + // return ResponseEntity.status(403).body("权限不足,仅管理员可执行此操作"); + // } - return ResponseEntity.ok("FAQ批量索引测试已启动,索引数量: " + count); + log.info("启动批量索引测试 - 数量: {}, 线程数: {}", count, threads); + + ExecutorService executorService = Executors.newFixedThreadPool(threads); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + List processedFaqs = new ArrayList<>(); + + int perThread = count / threads; + // if (perThread < 1) perThread = 1; + + // 启动所有线程 + for (int i = 0; i < threads; i++) { + executorService.submit(() -> { + try { + for (int j = 0; j < perThread; j++) { + try { + Optional optFaq = faqRestService.findRandomFaq(); + if (optFaq.isPresent()) { + FaqEntity faq = optFaq.get(); + String faqUid = faq.getUid(); + + // 发送到索引队列 + faqMessageService.sendToIndexQueue(faqUid); + + synchronized (processedFaqs) { + processedFaqs.add(faqUid); + } + + successCount.incrementAndGet(); + + // 小暂停避免过载 + Thread.sleep(100); + } + } catch (Exception e) { + log.error("批量索引FAQ时出错: {}", e.getMessage(), e); + failCount.incrementAndGet(); + } + } + } catch (Exception e) { + log.error("执行批量索引线程时出错: {}", e.getMessage(), e); + } + }); + } + + // 关闭线程池并等待完成 + executorService.shutdown(); + try { + executorService.awaitTermination(5, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.warn("批量索引等待中断: {}", e.getMessage()); + Thread.currentThread().interrupt(); + } + + // 返回结果 + Map result = new HashMap<>(); + result.put("success", successCount.get()); + result.put("fail", failCount.get()); + result.put("total", count); + result.put("threads", threads); + result.put("processedCount", processedFaqs.size()); + + return ResponseEntity.ok(result); + } + + /** + * 测试单个FAQ的索引处理 + * + * @param faqUid FAQ的唯一ID + * @return 处理结果 + */ + @GetMapping("/index-single") + public ResponseEntity indexSingle( + @RequestParam String faqUid) { + + try { + // 验证管理员权限 + // if (!authService.isCurrentUserAdmin()) { + // return ResponseEntity.status(403).body("权限不足,仅管理员可执行此操作"); + // } + + log.info("测试单个FAQ索引: {}", faqUid); + + // 检查FAQ是否存在 + Optional optFaq = faqRestService.findByUid(faqUid); + if (!optFaq.isPresent()) { + return ResponseEntity.notFound().build(); + } + + // 发送到索引队列 + faqMessageService.sendToIndexQueue(faqUid); + + Map result = new HashMap<>(); + result.put("status", "success"); + result.put("faqUid", faqUid); + result.put("message", "FAQ索引消息已发送到队列"); + + return ResponseEntity.ok(result); + + } catch (Exception e) { + log.error("索引单个FAQ时出错: {}", e.getMessage(), e); + return ResponseEntity.status(500).body("处理失败: " + e.getMessage()); + } } } diff --git a/starter/src/main/resources/artemis.properties b/starter/src/main/resources/artemis.properties index f563c4e5dc..d742300866 100644 --- a/starter/src/main/resources/artemis.properties +++ b/starter/src/main/resources/artemis.properties @@ -13,5 +13,12 @@ spring.jms.listener.concurrency=1 spring.jms.listener.max-concurrency=10 # 消息确认模式 spring.jms.listener.acknowledge-mode=client + +# 配置消息重传机制 +spring.artemis.embedded.max-delivery-attempts=5 +spring.artemis.embedded.redelivery-delay=5000 +spring.artemis.embedded.redelivery-delay-multiplier=2.0 +spring.artemis.embedded.max-redelivery-delay=60000 + # 会话事务 spring.jms.listener.auto-startup=true diff --git a/starter/src/main/resources/compose.yaml b/starter/src/main/resources/compose.yaml index a5e50094e4..6e71622c31 100644 --- a/starter/src/main/resources/compose.yaml +++ b/starter/src/main/resources/compose.yaml @@ -34,6 +34,7 @@ services: timeout: 5s retries: 5 + # http://127.0.0.1:21434 bytedesk-ollama: image: ollama/ollama:latest container_name: ollama-bytedesk @@ -51,6 +52,33 @@ services: timeout: 10s retries: 5 + # ActiveMQ Artemis 消息队列 + # http://localhost:18161/console,使用 admin/admin 作为登录凭据 + bytedesk-artemis: + image: apache/activemq-artemis:latest + container_name: artemis-bytedesk + environment: + - ARTEMIS_USER=admin + - ARTEMIS_PASSWORD=admin + - ANONYMOUS_LOGIN=false + - EXTRA_ARGS=--http-host 0.0.0.0 --relax-jolokia + ports: + - "16161:61616" # JMS + - "16162:61617" # AMQP + - "18161:8161" # Web Console + - "15672:5672" # AMQP + - "16163:61613" # STOMP + - "11883:1883" # MQTT + volumes: + - artemis_data:/var/lib/artemis/data + networks: + - bytedesk-network + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8161/console/jolokia/read/org.apache.activemq.artemis:broker=\"0.0.0.0\""] + interval: 30s + timeout: 10s + retries: 5 + # ElasticSearch 向量数据库 bytedesk-elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.18.0 @@ -252,6 +280,8 @@ volumes: # name: bytedesk_chroma_data elasticsearch_data: name: bytedesk_elasticsearch_data + artemis_data: + name: bytedesk_artemis_data # weaviate_data: # name: bytedesk_weaviate_data # milvus_data: