diff --git a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java index 7e4fba8535..06ee7bed52 100644 --- a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java +++ b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConfig.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2024-10-15 14:54:58 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-06-01 10:51:00 + * @LastEditTime: 2025-08-14 11:37:44 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. @@ -19,6 +19,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; +import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType; @@ -29,47 +30,134 @@ import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.Session; import lombok.RequiredArgsConstructor; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import lombok.extern.slf4j.Slf4j; +import jakarta.annotation.PostConstruct; // http://localhost:8161/console/auth/login // https://spring.io/guides/gs/messaging-jms // https://docs.spring.io/spring-boot/reference/messaging/jms.html // https://activemq.apache.org/components/artemis/documentation/latest/index.html +// +// 配置说明: +// 1. embedded模式:Spring Boot自动启动内嵌Artemis broker,自动创建ConnectionFactory +// 2. native模式:连接外部Artemis broker,手动创建ConnectionFactory +// 3. 监听器工厂和JmsTemplate配置适用于两种模式 @EnableJms @Configuration @RequiredArgsConstructor +@ConditionalOnClass(ActiveMQConnectionFactory.class) +@ConditionalOnProperty(name = "spring.artemis.mode") +@Slf4j public class JmsArtemisConfig { private final JmsErrorHandler jmsErrorHandler; + + @Value("${spring.artemis.mode:embedded}") + private String artemisMode; + + @Value("${spring.artemis.broker-url:tcp://127.0.0.1:16161}") + private String brokerUrl; + + @Value("${spring.artemis.user:admin}") + private String username; + + @Value("${spring.artemis.password:admin}") + private String password; + + @PostConstruct + public void init() { + log.info("JmsArtemisConfig initialized with mode: {}", artemisMode); + if ("native".equals(artemisMode)) { + log.info("Native mode: will connect to external Artemis broker at {}", brokerUrl); + } else { + log.info("Embedded mode: using Spring Boot auto-configured Artemis broker"); + } + } + + @Bean + @ConditionalOnProperty(name = "spring.artemis.mode", havingValue = "native") + public ConnectionFactory connectionFactory() { + log.info("Creating Artemis Native ConnectionFactory with broker URL: {}", brokerUrl); + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); + connectionFactory.setUser(username); + connectionFactory.setPassword(password); + // 设置连接配置 + connectionFactory.setConnectionTTL(30000L); + connectionFactory.setCallTimeout(10000L); + connectionFactory.setCallFailoverTimeout(10000L); + // 设置重连配置 + connectionFactory.setRetryInterval(1000L); + connectionFactory.setMaxRetryInterval(30000L); + connectionFactory.setReconnectAttempts(10); + connectionFactory.setInitialConnectAttempts(3); + + // 测试连接 + try (var connection = connectionFactory.createConnection()) { + connection.start(); + log.info("Successfully connected to Artemis broker at {}", brokerUrl); + } + + return connectionFactory; + } catch (Exception e) { + log.error("Failed to create Artemis ConnectionFactory: {}", e.getMessage(), e); + throw new RuntimeException("Failed to create Artemis ConnectionFactory: " + e.getMessage(), e); + } + } + + // 对于embedded模式,Spring Boot会自动创建ConnectionFactory + // 我们只需要配置监听器工厂和JmsTemplate + // 这些bean会使用Spring Boot自动配置的ConnectionFactory @Bean public JmsListenerContainerFactory jmsArtemisQueueFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { + log.info("Creating JMS Queue Listener Container Factory"); DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); // This provides all auto-configured defaults to this factory, including the message converter configurer.configure(factory, connectionFactory); // You could still override some settings if necessary. factory.setPubSubDomain(false); - // 设置确认模式为客户端确认,确保消息处理成功后才被确认 - factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + // 设置确认模式为自动确认,避免阻塞 + factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); + // 设置并发数 + factory.setConcurrency("5-20"); // 设置错误处理器 factory.setErrorHandler(jmsErrorHandler); + // 禁用事务模式 + factory.setSessionTransacted(false); + // 启用自动启动 + factory.setAutoStartup(true); return factory; } @Bean public JmsListenerContainerFactory jmsArtemisPubsubFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { + log.info("Creating JMS PubSub Listener Container Factory"); DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); // This provides all auto-configured defaults to this factory, including the message converter configurer.configure(factory, connectionFactory); // You could still override some settings if necessary. factory.setPubSubDomain(true); - // 增加并发处理能力 - 设置3-10个并发消费者 - factory.setConcurrency("3-10"); - // 设置确认模式为客户端确认,确保消息处理成功后才被确认 - factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + // 增加并发处理能力 - 设置5-30个并发消费者 + factory.setConcurrency("5-30"); + // 设置确认模式为自动确认,避免阻塞 + factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); // 设置错误处理器 factory.setErrorHandler(jmsErrorHandler); + // 禁用事务模式 + factory.setSessionTransacted(false); + // 设置接收超时时间为10秒 + factory.setReceiveTimeout(10000L); + // 设置恢复间隔为5秒 + factory.setRecoveryInterval(5000L); + // 启用自动启动 + factory.setAutoStartup(true); return factory; } @@ -97,4 +185,22 @@ public class JmsArtemisConfig { } }; } + + @Bean + public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) { + log.info("Creating JmsTemplate with custom configuration"); + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); + // 设置消息转换器 + jmsTemplate.setMessageConverter(jacksonJmsMessageConverter()); + // 设置目标解析器 + jmsTemplate.setDestinationResolver(destinationResolver()); + // 设置接收超时时间 + jmsTemplate.setReceiveTimeout(10000L); + // 禁用事务 + jmsTemplate.setSessionTransacted(false); + // 设置确认模式 + jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); + // 设置重试配置 - 通过连接工厂处理 + return jmsTemplate; + } } diff --git a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConstants.java b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConstants.java index 158b863f0b..778069ffba 100644 --- a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConstants.java +++ b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisConstants.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2024-10-15 16:49:35 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-05-30 10:47:45 + * @LastEditTime: 2025-08-14 11:15:48 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. @@ -16,7 +16,7 @@ package com.bytedesk.core.jms; public class JmsArtemisConstants { // queue 为队列消息,每个实例轮流收取 - public static final String QUEUE_PREFIX = "artemis.queue."; + public static final String QUEUE_PREFIX = "bytedesk.artemis.queue."; public static final String QUEUE_STRING_NAME = QUEUE_PREFIX + "string"; @@ -25,7 +25,7 @@ public class JmsArtemisConstants { public static final String QUEUE_TEST_NAME = QUEUE_PREFIX + "test"; // topic 为pubsub广播消息,所有实例同时收取 - public static final String TOPIC_PREFIX = "artemis.topic."; + public static final String TOPIC_PREFIX = "bytedesk.artemis.topic."; public static final String TOPIC_STRING_NAME = TOPIC_PREFIX + "string"; diff --git a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisController.java b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisController.java index ed2c6a2126..1f90595b8c 100644 --- a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisController.java +++ b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisController.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2024-10-17 10:22:01 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2024-10-17 10:31:07 + * @LastEditTime: 2025-08-14 11:26:20 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. diff --git a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisListener.java b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisListener.java index dd1a6f88d1..28a6aa7647 100644 --- a/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisListener.java +++ b/modules/core/src/main/java/com/bytedesk/core/jms/JmsArtemisListener.java @@ -19,7 +19,6 @@ import org.springframework.stereotype.Component; import com.bytedesk.core.config.BytedeskEventPublisher; import com.bytedesk.core.utils.ApplicationContextHolder; -import jakarta.jms.JMSException; import lombok.extern.slf4j.Slf4j; @@ -52,16 +51,14 @@ public class JmsArtemisListener { // topic pub sub // @TabooJsonFilter(title = "敏感词", action = "JmsArtemisListener") @JmsListener(destination = JmsArtemisConstants.TOPIC_STRING_NAME, containerFactory = "jmsArtemisPubsubFactory") - public void receiveTopicMessage(String json, jakarta.jms.Message message) throws JMSException { + public void receiveTopicMessage(String json) { try { - log.info("JmsArtemisListener receiveTopicMessage string {}", json); + log.debug("JmsArtemisListener receiveTopicMessage string {}", json); BytedeskEventPublisher bytedeskEventPublisher = ApplicationContextHolder.getBean(BytedeskEventPublisher.class); bytedeskEventPublisher.publishMessageJsonEvent(json); - // 处理成功后,显式确认消息已被消费 - message.acknowledge(); } catch (Exception e) { log.error("JmsArtemisListener 处理Topic消息失败: {}", e.getMessage(), e); - throw e; // 重新抛出异常,让错误处理器处理 + // 不再重新抛出异常,避免阻塞消息队列 } } diff --git a/modules/core/src/main/java/com/bytedesk/core/jms/README.md b/modules/core/src/main/java/com/bytedesk/core/jms/README.md new file mode 100644 index 0000000000..02002a5fab --- /dev/null +++ b/modules/core/src/main/java/com/bytedesk/core/jms/README.md @@ -0,0 +1,106 @@ +# JMS Artemis 配置说明 + +## 概述 + +本配置支持两种 Artemis 模式: +- **Embedded 模式**:Spring Boot 自动启动内嵌 Artemis broker +- **Native 模式**:连接外部 Artemis broker + +## 配置方式 + +### 1. Embedded 模式(开发测试推荐) + +```properties +# 启用 embedded 模式 +spring.artemis.mode=embedded + +# embedded 模式详细配置 +spring.artemis.embedded.enabled=true +spring.artemis.embedded.persistent=false +spring.artemis.embedded.data-directory=./artemis-data +spring.artemis.embedded.configuration=classpath:artemis-config.xml + +# 注释掉 native 模式配置 +# spring.artemis.broker-url=tcp://127.0.0.1:16161 +# spring.artemis.user=admin +# spring.artemis.password=admin +``` + +**优点:** +- 无需额外安装 Artemis +- 配置简单 +- 适合开发和测试环境 + +**缺点:** +- 性能相对较低 +- 不适合生产环境 + +### 2. Native 模式(生产环境推荐) + +```properties +# 启用 native 模式 +spring.artemis.mode=native + +# native 模式配置 +spring.artemis.broker-url=tcp://127.0.0.1:16161 +spring.artemis.user=admin +spring.artemis.password=admin + +# 注释掉 embedded 模式配置 +# spring.artemis.embedded.enabled=true +# spring.artemis.embedded.persistent=false +# spring.artemis.embedded.data-directory=./artemis-data +# spring.artemis.embedded.configuration=classpath:artemis-config.xml +``` + +**优点:** +- 性能更高 +- 支持集群 +- 适合生产环境 + +**缺点:** +- 需要额外安装和配置 Artemis +- 配置相对复杂 + +## 切换模式 + +要切换模式,请按以下步骤操作: + +1. 修改 `spring.artemis.mode` 属性 +2. 注释掉当前模式的配置 +3. 取消注释目标模式的配置 +4. 重启应用 + +## 注意事项 + +1. **ConnectionFactory 创建**: + - Embedded 模式:Spring Boot 自动创建 + - Native 模式:手动创建并配置 + +2. **监听器工厂和 JmsTemplate**: + - 两种模式都使用相同的配置 + - 自动使用对应的 ConnectionFactory + +3. **健康检查**: + - 两种模式都支持健康检查 + - 可通过 `/actuator/health` 查看状态 + +## 故障排除 + +### 连接失败 + +1. 检查 Artemis broker 是否运行 +2. 验证连接参数(URL、用户名、密码) +3. 查看应用日志中的错误信息 + +### 性能问题 + +1. 调整连接池配置 +2. 优化并发消费者数量 +3. 配置适当的超时时间 + +## 相关链接 + +- [Spring Boot JMS 文档](https://docs.spring.io/spring-boot/reference/messaging/jms.html) +- [Apache Artemis 文档](https://activemq.apache.org/components/artemis/documentation/latest/index.html) +- [Spring JMS 指南](https://spring.io/guides/gs/messaging-jms/) diff --git a/starter/src/main/resources/compose.yaml b/starter/src/main/resources/compose.yaml index 67d78a10fe..428e523c1b 100644 --- a/starter/src/main/resources/compose.yaml +++ b/starter/src/main/resources/compose.yaml @@ -62,7 +62,31 @@ services: - ARTEMIS_USER=admin - ARTEMIS_PASSWORD=admin - ANONYMOUS_LOGIN=false - - EXTRA_ARGS=--http-host 0.0.0.0 --relax-jolokia + # 基础配置 + - EXTRA_ARGS=--http-host 0.0.0.0 --relax-jolokia --global-max-size 100MB --journal-type NIO + # 地址策略配置 - 解决阻塞问题 + - ARTEMIS_ADDRESS_FULL_POLICY=PAGE + - ARTEMIS_MAX_SIZE_BYTES=52428800 + - ARTEMIS_PAGE_SIZE_BYTES=1048576 + - ARTEMIS_PAGE_MAX_CACHE_SIZE=20 + - ARTEMIS_AUTO_CREATE_QUEUES=true + - ARTEMIS_AUTO_CREATE_ADDRESSES=true + - ARTEMIS_AUTO_DELETE_QUEUES=false + - ARTEMIS_AUTO_DELETE_ADDRESSES=false + - ARTEMIS_MAX_REDELIVERY_ATTEMPTS=3 + - ARTEMIS_REDELIVERY_DELAY=0 + - ARTEMIS_MAX_REDELIVERY_DELAY=60000 + - ARTEMIS_REDELIVERY_DELAY_MULTIPLIER=2 + # 缓冲区配置 + - ARTEMIS_TCP_SEND_BUFFER_SIZE=1048576 + - ARTEMIS_TCP_RECEIVE_BUFFER_SIZE=1048576 + # AMQP配置 + - ARTEMIS_AMQP_CREDITS=1000 + - ARTEMIS_AMQP_LOW_CREDITS=300 + # 性能优化 + - ARTEMIS_USE_EPOLL=true + - ARTEMIS_MAX_CONSUMERS=-1 + - ARTEMIS_EXPIRY_DELAY=3600000 ports: - "16161:61616" # JMS - "16162:61617" # AMQP