This commit is contained in:
jack ning
2025-08-14 11:38:19 +08:00
parent 6f582672c7
commit 368c2c988d
6 changed files with 251 additions and 18 deletions

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-10-15 14:54:58
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-06-01 10:51:00
* @LastEditTime: 2025-08-14 11:37:44
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -19,6 +19,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
@@ -29,47 +30,134 @@ import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Session;
import lombok.RequiredArgsConstructor;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import lombok.extern.slf4j.Slf4j;
import jakarta.annotation.PostConstruct;
// http://localhost:8161/console/auth/login
// https://spring.io/guides/gs/messaging-jms
// https://docs.spring.io/spring-boot/reference/messaging/jms.html
// https://activemq.apache.org/components/artemis/documentation/latest/index.html
//
// 配置说明:
// 1. embedded模式Spring Boot自动启动内嵌Artemis broker自动创建ConnectionFactory
// 2. native模式连接外部Artemis broker手动创建ConnectionFactory
// 3. 监听器工厂和JmsTemplate配置适用于两种模式
@EnableJms
@Configuration
@RequiredArgsConstructor
@ConditionalOnClass(ActiveMQConnectionFactory.class)
@ConditionalOnProperty(name = "spring.artemis.mode")
@Slf4j
public class JmsArtemisConfig {
private final JmsErrorHandler jmsErrorHandler;
@Value("${spring.artemis.mode:embedded}")
private String artemisMode;
@Value("${spring.artemis.broker-url:tcp://127.0.0.1:16161}")
private String brokerUrl;
@Value("${spring.artemis.user:admin}")
private String username;
@Value("${spring.artemis.password:admin}")
private String password;
@PostConstruct
public void init() {
log.info("JmsArtemisConfig initialized with mode: {}", artemisMode);
if ("native".equals(artemisMode)) {
log.info("Native mode: will connect to external Artemis broker at {}", brokerUrl);
} else {
log.info("Embedded mode: using Spring Boot auto-configured Artemis broker");
}
}
@Bean
@ConditionalOnProperty(name = "spring.artemis.mode", havingValue = "native")
public ConnectionFactory connectionFactory() {
log.info("Creating Artemis Native ConnectionFactory with broker URL: {}", brokerUrl);
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connectionFactory.setUser(username);
connectionFactory.setPassword(password);
// 设置连接配置
connectionFactory.setConnectionTTL(30000L);
connectionFactory.setCallTimeout(10000L);
connectionFactory.setCallFailoverTimeout(10000L);
// 设置重连配置
connectionFactory.setRetryInterval(1000L);
connectionFactory.setMaxRetryInterval(30000L);
connectionFactory.setReconnectAttempts(10);
connectionFactory.setInitialConnectAttempts(3);
// 测试连接
try (var connection = connectionFactory.createConnection()) {
connection.start();
log.info("Successfully connected to Artemis broker at {}", brokerUrl);
}
return connectionFactory;
} catch (Exception e) {
log.error("Failed to create Artemis ConnectionFactory: {}", e.getMessage(), e);
throw new RuntimeException("Failed to create Artemis ConnectionFactory: " + e.getMessage(), e);
}
}
// 对于embedded模式Spring Boot会自动创建ConnectionFactory
// 我们只需要配置监听器工厂和JmsTemplate
// 这些bean会使用Spring Boot自动配置的ConnectionFactory
@Bean
public JmsListenerContainerFactory<?> jmsArtemisQueueFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
log.info("Creating JMS Queue Listener Container Factory");
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all auto-configured defaults to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some settings if necessary.
factory.setPubSubDomain(false);
// 设置确认模式为客户端确认,确保消息处理成功后才被确认
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
// 设置确认模式为自动确认,避免阻塞
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
// 设置并发数
factory.setConcurrency("5-20");
// 设置错误处理器
factory.setErrorHandler(jmsErrorHandler);
// 禁用事务模式
factory.setSessionTransacted(false);
// 启用自动启动
factory.setAutoStartup(true);
return factory;
}
@Bean
public JmsListenerContainerFactory<?> jmsArtemisPubsubFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
log.info("Creating JMS PubSub Listener Container Factory");
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all auto-configured defaults to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some settings if necessary.
factory.setPubSubDomain(true);
// 增加并发处理能力 - 设置3-10个并发消费者
factory.setConcurrency("3-10");
// 设置确认模式为客户端确认,确保消息处理成功后才被确认
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
// 增加并发处理能力 - 设置5-30个并发消费者
factory.setConcurrency("5-30");
// 设置确认模式为自动确认,避免阻塞
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
// 设置错误处理器
factory.setErrorHandler(jmsErrorHandler);
// 禁用事务模式
factory.setSessionTransacted(false);
// 设置接收超时时间为10秒
factory.setReceiveTimeout(10000L);
// 设置恢复间隔为5秒
factory.setRecoveryInterval(5000L);
// 启用自动启动
factory.setAutoStartup(true);
return factory;
}
@@ -97,4 +185,22 @@ public class JmsArtemisConfig {
}
};
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
log.info("Creating JmsTemplate with custom configuration");
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
// 设置消息转换器
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
// 设置目标解析器
jmsTemplate.setDestinationResolver(destinationResolver());
// 设置接收超时时间
jmsTemplate.setReceiveTimeout(10000L);
// 禁用事务
jmsTemplate.setSessionTransacted(false);
// 设置确认模式
jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
// 设置重试配置 - 通过连接工厂处理
return jmsTemplate;
}
}

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-10-15 16:49:35
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-05-30 10:47:45
* @LastEditTime: 2025-08-14 11:15:48
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -16,7 +16,7 @@ package com.bytedesk.core.jms;
public class JmsArtemisConstants {
// queue 为队列消息,每个实例轮流收取
public static final String QUEUE_PREFIX = "artemis.queue.";
public static final String QUEUE_PREFIX = "bytedesk.artemis.queue.";
public static final String QUEUE_STRING_NAME = QUEUE_PREFIX + "string";
@@ -25,7 +25,7 @@ public class JmsArtemisConstants {
public static final String QUEUE_TEST_NAME = QUEUE_PREFIX + "test";
// topic 为pubsub广播消息所有实例同时收取
public static final String TOPIC_PREFIX = "artemis.topic.";
public static final String TOPIC_PREFIX = "bytedesk.artemis.topic.";
public static final String TOPIC_STRING_NAME = TOPIC_PREFIX + "string";

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-10-17 10:22:01
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-10-17 10:31:07
* @LastEditTime: 2025-08-14 11:26:20
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.

View File

@@ -19,7 +19,6 @@ import org.springframework.stereotype.Component;
import com.bytedesk.core.config.BytedeskEventPublisher;
import com.bytedesk.core.utils.ApplicationContextHolder;
import jakarta.jms.JMSException;
import lombok.extern.slf4j.Slf4j;
@@ -52,16 +51,14 @@ public class JmsArtemisListener {
// topic pub sub
// @TabooJsonFilter(title = "敏感词", action = "JmsArtemisListener")
@JmsListener(destination = JmsArtemisConstants.TOPIC_STRING_NAME, containerFactory = "jmsArtemisPubsubFactory")
public void receiveTopicMessage(String json, jakarta.jms.Message message) throws JMSException {
public void receiveTopicMessage(String json) {
try {
log.info("JmsArtemisListener receiveTopicMessage string {}", json);
log.debug("JmsArtemisListener receiveTopicMessage string {}", json);
BytedeskEventPublisher bytedeskEventPublisher = ApplicationContextHolder.getBean(BytedeskEventPublisher.class);
bytedeskEventPublisher.publishMessageJsonEvent(json);
// 处理成功后,显式确认消息已被消费
message.acknowledge();
} catch (Exception e) {
log.error("JmsArtemisListener 处理Topic消息失败: {}", e.getMessage(), e);
throw e; // 重新抛出异常,让错误处理器处理
// 不再重新抛出异常,避免阻塞消息队列
}
}

View File

@@ -0,0 +1,106 @@
# JMS Artemis 配置说明
## 概述
本配置支持两种 Artemis 模式:
- **Embedded 模式**Spring Boot 自动启动内嵌 Artemis broker
- **Native 模式**:连接外部 Artemis broker
## 配置方式
### 1. Embedded 模式(开发测试推荐)
```properties
# 启用 embedded 模式
spring.artemis.mode=embedded
# embedded 模式详细配置
spring.artemis.embedded.enabled=true
spring.artemis.embedded.persistent=false
spring.artemis.embedded.data-directory=./artemis-data
spring.artemis.embedded.configuration=classpath:artemis-config.xml
# 注释掉 native 模式配置
# spring.artemis.broker-url=tcp://127.0.0.1:16161
# spring.artemis.user=admin
# spring.artemis.password=admin
```
**优点:**
- 无需额外安装 Artemis
- 配置简单
- 适合开发和测试环境
**缺点:**
- 性能相对较低
- 不适合生产环境
### 2. Native 模式(生产环境推荐)
```properties
# 启用 native 模式
spring.artemis.mode=native
# native 模式配置
spring.artemis.broker-url=tcp://127.0.0.1:16161
spring.artemis.user=admin
spring.artemis.password=admin
# 注释掉 embedded 模式配置
# spring.artemis.embedded.enabled=true
# spring.artemis.embedded.persistent=false
# spring.artemis.embedded.data-directory=./artemis-data
# spring.artemis.embedded.configuration=classpath:artemis-config.xml
```
**优点:**
- 性能更高
- 支持集群
- 适合生产环境
**缺点:**
- 需要额外安装和配置 Artemis
- 配置相对复杂
## 切换模式
要切换模式,请按以下步骤操作:
1. 修改 `spring.artemis.mode` 属性
2. 注释掉当前模式的配置
3. 取消注释目标模式的配置
4. 重启应用
## 注意事项
1. **ConnectionFactory 创建**
- Embedded 模式Spring Boot 自动创建
- Native 模式:手动创建并配置
2. **监听器工厂和 JmsTemplate**
- 两种模式都使用相同的配置
- 自动使用对应的 ConnectionFactory
3. **健康检查**
- 两种模式都支持健康检查
- 可通过 `/actuator/health` 查看状态
## 故障排除
### 连接失败
1. 检查 Artemis broker 是否运行
2. 验证连接参数URL、用户名、密码
3. 查看应用日志中的错误信息
### 性能问题
1. 调整连接池配置
2. 优化并发消费者数量
3. 配置适当的超时时间
## 相关链接
- [Spring Boot JMS 文档](https://docs.spring.io/spring-boot/reference/messaging/jms.html)
- [Apache Artemis 文档](https://activemq.apache.org/components/artemis/documentation/latest/index.html)
- [Spring JMS 指南](https://spring.io/guides/gs/messaging-jms/)