This commit is contained in:
jack ning
2025-08-16 07:12:59 +08:00
parent 84abe39d38
commit daeb0b9220
31 changed files with 50 additions and 893 deletions

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-10-23 15:20:38
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-08-15 22:10:29
* @LastEditTime: 2025-08-16 06:53:35
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -11,7 +11,7 @@
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis;
package com.bytedesk.core.constant;
public class RedisConsts {
private RedisConsts() {}
@@ -34,4 +34,11 @@ public class RedisConsts {
public static final String LOGIN_LOCKED_PREFIX = BYTEDESK_REDIS_PREFIX + "login_locked:";
public static final String LOGIN_LOCKED_VALUE = "1";
// 转接超时相关常量
public static final String TRANSFER_TIMEOUT_PREFIX = BYTEDESK_REDIS_PREFIX + "transfer:timeout:";
// Redis监听频道相关常量
public static final String REDIS_KEYEVENT_EXPIRED_CHANNEL = "__keyevent@0__:expired";
public static final String REDIS_KEYEVENT_EXPIRED_PATTERN = "__keyevent@*__:expired";
}

View File

@@ -20,8 +20,8 @@ import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import com.bytedesk.core.config.properties.BytedeskProperties;
import com.bytedesk.core.constant.RedisConsts;
import com.bytedesk.core.enums.ChannelEnum;
import com.bytedesk.core.redis.RedisConsts;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

View File

@@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import com.bytedesk.core.redis.RedisConsts;
import com.bytedesk.core.constant.RedisConsts;
import jakarta.annotation.PostConstruct;
import lombok.AllArgsConstructor;

View File

@@ -19,6 +19,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import com.bytedesk.core.constant.RedisConsts;
import lombok.extern.slf4j.Slf4j;
/**

View File

@@ -19,6 +19,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import com.bytedesk.core.constant.RedisConsts;
// https://redis.io/docs/latest/develop/data-types/streams/
@Service
public class RedisService {

View File

@@ -1,44 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-11-20 17:25:10
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-11-20 17:32:43
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* 需要在redis配置文件中添加如下配置开启过期事件
* notify-keyspace-events Ex
* 或者在redis cli中执行如下命令
* CONFIG SET notify-keyspace-events Ex
*/
@Slf4j
@Component
public class RedisKeyExpirationListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
log.info("Expired key: {}", expiredKey);
// 处理过期的 clientId
handleExpiredClientId(expiredKey);
}
private void handleExpiredClientId(String clientId) {
// 处理逻辑,例如从 MqttSessionService 中移除过期的会话
log.info("Handling expired clientId: {}", clientId);
}
}

View File

@@ -1,90 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-23 17:13:03
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-11-20 17:30:56
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
// import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
// import com.bytedesk.core.redis.CustomRedisSerializer;
@Configuration
public class RedisPubsubConfig {
// @Value("${bytedesk.redis-pubsub-channel}")
// private String redisPubsubChannel;
// 监听object
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 订阅topic - subscribe
container.addMessageListener(listener, new PatternTopic(RedisPubsubConst.BYTEDESK_PUBSUB_CHANNEL_OBJECT));
return container;
}
/**
* 消息监听器使用MessageAdapter可实现自动化解码及方法代理
* // FIXME: 编码报错 SerializationException: Could not read JSON: Unexpected
* character ('¬' (code 172)): expected a valid value (JSON String, Number, Array,
* Object or token 'null', 'true' or 'false')
*
* @return
*/
@Bean
public MessageListenerAdapter listener(RedisPubsubObjectListener subscriber) {
//
MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");
// adapter.setSerializer(new GenericJackson2JsonRedisSerializer());
// adapter.setSerializer(new CustomRedisSerializer());
adapter.setSerializer(new JdkSerializationRedisSerializer());
adapter.afterPropertiesSet();
//
return adapter;
}
// 监听string
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
RedisPubsubStringListener redisStringListener) {
//
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
// 订阅topic - subscribe
redisMessageListenerContainer.addMessageListener(redisStringListener,
new ChannelTopic(RedisPubsubConst.BYTEDESK_PUBSUB_CHANNEL_STRING));
return redisMessageListenerContainer;
}
// 监听key过期事件
@Bean
RedisMessageListenerContainer redisKeyExpireListenerContainer(RedisConnectionFactory connectionFactory,
RedisKeyExpirationListener listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(RedisPubsubConst.BYTEDESK_PUBSUB_KEY_EXPIRE));
return container;
}
}

View File

@@ -1,24 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-23 17:29:34
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-11-20 17:29:43
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import com.bytedesk.core.redis.RedisConsts;
public class RedisPubsubConst {
private RedisPubsubConst() {}
public static final String BYTEDESK_PUBSUB_CHANNEL_STRING = RedisConsts.BYTEDESK_REDIS_PREFIX + "pubsub";
public static final String BYTEDESK_PUBSUB_CHANNEL_OBJECT = RedisConsts.BYTEDESK_REDIS_PREFIX + "pubsub_object";
public static final String BYTEDESK_PUBSUB_KEY_EXPIRE = RedisConsts.BYTEDESK_REDIS_PREFIX + "__keyevent@*__:expired";
}

View File

@@ -1,83 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-04-15 17:14:16
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-08-23 21:09:41
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import org.springframework.beans.factory.annotation.Autowired;
// import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.bytedesk.core.utils.JsonResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
/**
* Spring Data Redis - Pub/Sub Messaging
* https://docs.spring.io/spring-data/redis/reference/redis/pubsub.html
* https://redis.io/docs/latest/develop/interact/pubsub/
*/
@Slf4j
@RestController
@RequestMapping("/redis/pubsub")
@Tag(name = "Redis PubSub Management", description = "Redis publish/subscribe messaging APIs")
public class RedisPubsubController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
// @Autowired
// private RedisTemplate<String, Object> redisTemplate;
/**
* http://127.0.0.1:9003/redis/pubsub/send?message=hello
*
* @param message
* @return
*/
@Operation(summary = "Publish Text Message", description = "Publish a text message to Redis pub/sub channel")
@GetMapping("/send")
public JsonResult<String> publishText(@RequestParam String message) {
log.info("redisPubsub redisMessageSend: {}", message);
stringRedisTemplate.convertAndSend(
RedisPubsubConst.BYTEDESK_PUBSUB_CHANNEL_STRING,
message);
//
return new JsonResult<>("send string", 200, message);
}
/**
* http://127.0.0.1:9003/redis/pubsub/send/object?message=hello
*
* @param message
* @return
*/
// @GetMapping("/send/object")
// public JsonResult<String> publishObject(@RequestParam String message) {
// log.info("redisPubsub send: {}", message);
// //
// RedisPubsubMessage messageDto = RedisPubsubMessage.builder()
// .type("text")
// .fileUrl("pubsub").fileUid("").kbUid("")
// .build();
// redisTemplate.convertAndSend(RedisPubsubConst.BYTEDESK_PUBSUB_CHANNEL_OBJECT, messageDto);
// return new JsonResult<>("send object", 200, message);
// }
}

View File

@@ -1,37 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-04-15 17:13:01
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-08-30 16:05:07
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
// import lombok.extern.slf4j.Slf4j;
import lombok.NoArgsConstructor;
import lombok.Data;
// java发送python接收
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisPubsubMessage implements Serializable {
private static final long serialVersionUID = 1L;
private String type;
private String content;
}

View File

@@ -1,41 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-28 14:45:30
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-08-30 16:42:34
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
public enum RedisPubsubMessageType {
PARSE_FILE,
PARSE_FILE_SUCCESS,
PARSE_FILE_ERROR,
//
DELETE_FILE,
DELETE_FILE_SUCCESS,
DELETE_FILE_ERROR,
//
QUESTION,
ANSWER,
ANSWER_FINISHED,
;
// 根据字符串查找对应的枚举常量
public static RedisPubsubMessageType fromValue(String value) {
for (RedisPubsubMessageType type : RedisPubsubMessageType.values()) {
if (type.name().equalsIgnoreCase(value)) {
return type;
}
}
throw new IllegalArgumentException("No RedisPubsubMessageType constant with value: " + value);
}
}

View File

@@ -1,33 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-04-15 17:13:01
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-11-20 17:23:20
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class RedisPubsubObjectListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("RedisPubsubObjectListener onMessage: " + new String(message.getBody()));
}
}

View File

@@ -1,36 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-30 17:38:05
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-08-30 17:39:46
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import org.springframework.context.ApplicationEvent;
import com.bytedesk.core.redis.pubsub.message.RedisPubsubMessageFile;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = false)
public class RedisPubsubParseFileErrorEvent extends ApplicationEvent {
private static final long serialVersionUID = 1L;
private RedisPubsubMessageFile messageFile;
public RedisPubsubParseFileErrorEvent(Object source, RedisPubsubMessageFile messageFile) {
super(source);
this.messageFile = messageFile;
}
}

View File

@@ -1,36 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-30 17:27:47
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-08-30 17:29:53
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import org.springframework.context.ApplicationEvent;
import com.bytedesk.core.redis.pubsub.message.RedisPubsubMessageFile;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = false)
public class RedisPubsubParseFileSuccessEvent extends ApplicationEvent {
private static final long serialVersionUID = 1L;
private RedisPubsubMessageFile messageFile;
public RedisPubsubParseFileSuccessEvent(Object source, RedisPubsubMessageFile messageFile) {
super(source);
this.messageFile = messageFile;
}
}

View File

@@ -1,89 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-23 19:25:35
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-08-31 08:06:12
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson2.JSON;
import com.bytedesk.core.redis.pubsub.message.RedisPubsubMessageFile;
import com.bytedesk.core.redis.pubsub.message.RedisPubsubMessageQa;
@Service
public class RedisPubsubService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void sendParseFileMessage(String fileUid, String fileUrl, String kbUid) {
//
RedisPubsubMessageFile messageFile = RedisPubsubMessageFile.builder()
.fileUid(fileUid)
.fileUrl(fileUrl)
.kbUid(kbUid)
.build();
//
RedisPubsubMessage messageObject = RedisPubsubMessage.builder()
.type(RedisPubsubMessageType.PARSE_FILE.name())
.content(JSON.toJSONString(messageFile))
.build();
//
stringRedisTemplate.convertAndSend(
RedisPubsubConst.BYTEDESK_PUBSUB_CHANNEL_STRING,
JSON.toJSONString(messageObject));
}
public void sendDeleteFileMessage(String fileUid, List<String> docIds) {
//
RedisPubsubMessageFile messageFile = RedisPubsubMessageFile.builder()
.fileUid(fileUid)
.docIds(docIds)
.build();
//
RedisPubsubMessage messageObject = RedisPubsubMessage.builder()
.type(RedisPubsubMessageType.DELETE_FILE.name())
.content(JSON.toJSONString(messageFile))
.build();
//
stringRedisTemplate.convertAndSend(
RedisPubsubConst.BYTEDESK_PUBSUB_CHANNEL_STRING,
JSON.toJSONString(messageObject));
}
public void sendQuestionMessage(String uid, String threadTopic, String kbUid, String question) {
//
RedisPubsubMessageQa messageQa = RedisPubsubMessageQa.builder()
.uid(uid)
.threadTopic(threadTopic)
.kbUid(kbUid)
.question(question)
.build();
//
RedisPubsubMessage messageObject = RedisPubsubMessage.builder()
.type(RedisPubsubMessageType.QUESTION.name())
.content(JSON.toJSONString(messageQa))
.build();
//
stringRedisTemplate.convertAndSend(
RedisPubsubConst.BYTEDESK_PUBSUB_CHANNEL_STRING,
JSON.toJSONString(messageObject));
}
}

View File

@@ -1,120 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-04-15 17:13:01
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-04-14 18:43:19
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson2.JSON;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@AllArgsConstructor
public class RedisPubsubStringListener implements MessageListener {
// private final BytedeskEventPublisher eventPublisher;
// private final MessageCache messageCache;
// private final IMessageSendService messageSendService;
// private final Queue<RedisPubsubMessage> messageQueue = new LinkedList<>();
@Override
public void onMessage(Message message, byte[] pattern) {
// log.info("onMessage,{}", message.toString());
String channel = new String(message.getChannel());
String messageContext = new String(message.getBody());
//
RedisPubsubMessage redisPubsubMessage = JSON.parseObject(messageContext, RedisPubsubMessage.class);
log.info("redisPubsub receiveString: {}, type {}", channel, redisPubsubMessage.getType());
//
// if (redisPubsubMessage.getType().equals(RedisPubsubMessageType.PARSE_FILE_SUCCESS.name())) {
// // 解析成功
// log.info("parse file success, content {}", redisPubsubMessage.getContent());
// RedisPubsubMessageFile messageFile = JSON.parseObject(redisPubsubMessage.getContent(),
// RedisPubsubMessageFile.class);
// log.info("fileUid {}, docIds {}", messageFile.getFileUid(), messageFile.getDocIds());
// //
// eventPublisher.publishEvent(new RedisPubsubParseFileSuccessEvent(this, messageFile));
// } else if (redisPubsubMessage.getType().equals(RedisPubsubMessageType.PARSE_FILE_ERROR.name())) {
// // 解析失败
// log.info("parse file error, content {}", redisPubsubMessage.getContent());
// RedisPubsubMessageFile messageFile = JSON.parseObject(redisPubsubMessage.getContent(),
// RedisPubsubMessageFile.class);
// log.info("fileUid {}", messageFile.getFileUid());
// // 发送事件,通知前端更新文件状态
// eventPublisher.publishEvent(new RedisPubsubParseFileErrorEvent(this, messageFile));
// } else if (redisPubsubMessage.getType().equals(RedisPubsubMessageType.DELETE_FILE_SUCCESS.name())) {
// // TODO: 删除成功
// log.info("delete file success, content {}", redisPubsubMessage.getContent());
// } else if (redisPubsubMessage.getType().equals(RedisPubsubMessageType.DELETE_FILE_ERROR.name())) {
// // TODO: 删除失败
// log.info("delete file error, content {}", redisPubsubMessage.getContent());
// } else if (redisPubsubMessage.getType().equals(RedisPubsubMessageType.ANSWER.name())) {
// // 回答
// // answer {"threadTopic": "org/robot/df_rt_uid/1463055175142405", "kbUid":
// // "1461090177253570", "question": "\u4f60\u597d", "answer":
// // "\u53ef\u4ee5\u5e2e\u52a9", "model": "glm-4-flash", "created": 1725063232}
// log.info("answer {}", redisPubsubMessage.getContent());
// messageQueue.add(redisPubsubMessage); // 添加消息到队列
// // sendMessage(redisPubsubMessage);
// processMessageQueue();
// //
// } else if (redisPubsubMessage.getType().equals(RedisPubsubMessageType.ANSWER_FINISHED.name())) {
// // TODO: 回答结束
// // answer finished {"threadTopic": "org/robot/df_rt_uid/1463055175142405",
// // "kbUid": "1461090177253570", "question": "\u4f60\u597d", "answer": "",
// // "model": "glm-4-flash", "created": 1725063232, "promptTokens": 525,
// // "completionTokens": 9, "totalTokens": 534}
// log.info("answer finished {}", redisPubsubMessage.getContent());
// messageQueue.add(redisPubsubMessage); // 添加消息到队列
// // sendMessage(redisPubsubMessage);
// processMessageQueue();
// }
}
// FIXME: 直接调用sendMessage会导致消息乱序增加messageQueue还是消息乱序未解决
// private void processMessageQueue() {
// while (!messageQueue.isEmpty()) {
// RedisPubsubMessage redisPubsubMessage = messageQueue.poll();
// sendMessage(redisPubsubMessage);
// }
// }
// private void sendMessage(RedisPubsubMessage redisPubsubMessage) {
// log.info("sendMessage, messageQa content {}", redisPubsubMessage.getContent());
// RedisPubsubMessageQa messageQa = JSON.parseObject(redisPubsubMessage.getContent(),
// RedisPubsubMessageQa.class);
// log.info("sendMessage, messageQa Id {}", messageQa.getId());
// MessageProtobuf messageProtobuf = messageCache.get(messageQa.getUid());
// if (messageProtobuf == null) {
// log.error("message not found, uid {}", messageQa.getUid());
// return;
// }
// //
// messageProtobuf.setType(MessageTypeEnum.STREAM);
// messageProtobuf.setContent(messageQa.getAnswer());
// // MessageUtils.notifyUser(messageProtobuf);
// messageSendService.sendProtobufMessage(messageProtobuf);
// }
}

View File

@@ -1,36 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-30 16:07:09
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-08-30 18:38:52
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub.message;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
@AllArgsConstructor
public class RedisPubsubMessageFile {
private String fileUid;
private String fileUrl;
private String kbUid;
private List<String> docIds;
private String errorMsg;
}

View File

@@ -1,56 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-30 16:09:19
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2024-08-31 10:19:59
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.redis.pubsub.message;
import jakarta.validation.constraints.NotEmpty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
@AllArgsConstructor
public class RedisPubsubMessageQa {
private Integer id;
@NotEmpty
private String uid;
private String threadTopic;
//
private String kbUid;
private String fileUid;
//
private String question;
private String answer;
private String model;
private Integer created;
private String finishReason;
//
private String promptTokens;
private String completionTokens;
private String totalTokens;
}

View File

@@ -21,7 +21,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import com.bytedesk.core.redis.RedisConsts;
import com.bytedesk.core.constant.RedisConsts;
import lombok.AllArgsConstructor;

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-08-04 10:44:09
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-06-16 07:45:19
* @LastEditTime: 2025-08-16 07:07:46
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -20,7 +20,7 @@ import com.bytedesk.core.quartz.event.QuartzOneMinEvent;
import com.bytedesk.core.socket.mqtt.event.MqttConnectedEvent;
import com.bytedesk.core.socket.mqtt.event.MqttDisconnectedEvent;
import com.bytedesk.core.topic.TopicCacheService;
import com.bytedesk.core.topic.TopicService;
import com.bytedesk.core.topic.TopicRestService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
@AllArgsConstructor
public class MqttEventListener {
private final TopicService topicService;
private final TopicRestService topicRestService;
private final TopicCacheService topicCacheService;
@@ -61,7 +61,7 @@ public class MqttEventListener {
public void onMqttSubscribeEvent(MqttSubscribeEvent event) {
log.info("topic onMqttSubscribeEvent {}", event);
//
topicService.subscribe(event.getTopic(), event.getClientId());
topicRestService.subscribe(event.getTopic(), event.getClientId());
}
@EventListener

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-06-28 13:32:23
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-07-09 17:49:19
* @LastEditTime: 2025-08-16 07:08:31
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -25,7 +25,6 @@ import com.bytedesk.core.thread.event.ThreadCloseEvent;
import com.bytedesk.core.topic.TopicCacheService;
import com.bytedesk.core.topic.TopicRequest;
import com.bytedesk.core.topic.TopicRestService;
import com.bytedesk.core.topic.TopicService;
import com.bytedesk.core.topic.TopicUtils;
import lombok.AllArgsConstructor;
@@ -36,8 +35,6 @@ import lombok.extern.slf4j.Slf4j;
@AllArgsConstructor
public class ThreadEventListener {
private final TopicService topicService;
private final TopicCacheService topicCacheService;
private final TopicRestService topicRestService;
@@ -68,7 +65,7 @@ public class ThreadEventListener {
.build();
request.getTopics().add(topic);
request.getTopics().add(topicInternal);
topicService.create(request);
topicRestService.create(request);
} else {
// 文件助手、系统通知会话延迟订阅topic
TopicRequest request = TopicRequest.builder()
@@ -99,7 +96,7 @@ public class ThreadEventListener {
.topic(thread.getTopic())
.userUid(user.getUid())
.build();
topicService.create(request);
topicRestService.create(request);
} else if (thread.getType().equals(ThreadTypeEnum.WORKGROUP.name())) {
// 工作组会话需要订阅topic
// 重新订阅
@@ -107,14 +104,14 @@ public class ThreadEventListener {
.topic(thread.getTopic())
.userUid(user.getUid())
.build();
topicService.create(request);
topicRestService.create(request);
} else if (thread.getType().equals(ThreadTypeEnum.MEMBER.name())) {
// 会员会话需要订阅topic
TopicRequest request = TopicRequest.builder()
.topic(thread.getTopic())
.userUid(user.getUid())
.build();
topicService.create(request);
topicRestService.create(request);
} else {
// 文件助手、系统通知会话延迟订阅topic
TopicRequest request = TopicRequest.builder()

View File

@@ -20,10 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import com.bytedesk.core.redis.RedisConsts;
// import com.github.benmanes.caffeine.cache.Cache;
// import com.github.benmanes.caffeine.cache.CacheLoader;
// import com.github.benmanes.caffeine.cache.Caffeine;
import com.bytedesk.core.constant.RedisConsts;
import jakarta.annotation.PostConstruct;
import lombok.AllArgsConstructor;

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-02-21 10:01:12
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-08-15 17:36:04
* @LastEditTime: 2025-08-16 06:32:47
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -19,7 +19,6 @@ import java.util.List;
import com.bytedesk.core.base.BaseRequest;
import com.bytedesk.core.rbac.user.UserProtobuf;
import com.bytedesk.core.thread.enums.ThreadInviteStatusEnum;
import com.bytedesk.core.thread.enums.ThreadSummaryStatusEnum;
import com.bytedesk.core.thread.enums.ThreadTransferStatusEnum;
import lombok.Builder;

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-02-21 10:01:27
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-08-15 17:37:52
* @LastEditTime: 2025-08-16 06:32:54
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -20,8 +20,6 @@ import com.bytedesk.core.base.BaseResponse;
import com.bytedesk.core.rbac.user.UserProtobuf;
import com.bytedesk.core.thread.enums.ThreadInviteStatusEnum;
import com.bytedesk.core.thread.enums.ThreadTransferStatusEnum;
import com.bytedesk.core.thread.enums.ThreadSummaryStatusEnum;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;

View File

@@ -2,7 +2,7 @@
* @Author: import java.util.HashSet;
* @Date: 2024-05-29 15:11:57
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-07-09 17:49:06
* @LastEditTime: 2025-08-16 07:09:54
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j;
@AllArgsConstructor
public class TopicEventListener {
private final TopicService topicService;
private final TopicRestService topicRestService;
private final TopicCacheService topicCacheService;
@@ -66,14 +66,14 @@ public class TopicEventListener {
topicRequestList.forEach(item -> {
// log.info("topic onQuartzFiveSecondEvent {}", item);
TopicRequest topicRequest = JSON.parseObject(item, TopicRequest.class);
topicService.create(topicRequest);
topicRestService.create(topicRequest);
});
}
List<String> clientIdList = topicCacheService.getClientIdList();
if (clientIdList!= null) {
clientIdList.forEach(item -> {
// log.info("topic onQuartzFiveSecondEvent {}", item);
topicService.addClientId(item);
topicRestService.addClientId(item);
});
}
}
@@ -110,7 +110,7 @@ public class TopicEventListener {
log.info("topic onQuartzDay0Event: 开始清理已结束的会话topics");
// 获取所有的 TopicEntity
List<TopicEntity> allTopics = topicService.findAll();
List<TopicEntity> allTopics = topicRestService.findAll();
for (TopicEntity topicEntity : allTopics) {
Set<String> topics = topicEntity.getTopics();
@@ -144,7 +144,7 @@ public class TopicEventListener {
// 从topics集合中移除符合条件的topic
for (String topicToRemove : topicsToRemove) {
topicService.remove(topicToRemove, topicEntity.getUserUid());
topicRestService.remove(topicToRemove, topicEntity.getUserUid());
log.info("成功删除topic: {} 从 userUid: {}", topicToRemove, topicEntity.getUserUid());
}
}

View File

@@ -2,7 +2,7 @@
* @Author: jackning 270580156@qq.com
* @Date: 2024-11-20 11:16:56
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-07-05 15:52:46
* @LastEditTime: 2025-08-16 07:11:35
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
@@ -104,6 +104,14 @@ public class TopicRestService extends BaseRestService<TopicEntity, TopicRequest,
return topicRepository.existsByUid(uid);
}
public void create(String topic, String userUid) {
TopicRequest request = TopicRequest.builder()
.topic(topic)
.userUid(userUid)
.build();
create(request);
}
@Transactional
@Override
public TopicResponse create(TopicRequest request) {

View File

@@ -1,127 +0,0 @@
/*
* @Author: jackning 270580156@qq.com
* @Date: 2024-04-13 16:14:36
* @LastEditors: jackning 270580156@qq.com
* @LastEditTime: 2025-06-16 09:10:58
* @Description: bytedesk.com https://github.com/Bytedesk/bytedesk
* Please be aware of the BSL license restrictions before installing Bytedesk IM
* selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license.
* Business Source License 1.1: https://github.com/Bytedesk/bytedesk/blob/main/LICENSE
* contact: 270580156@qq.com
* 联系270580156@qq.com
* Copyright (c) 2024 by bytedesk.com, All Rights Reserved.
*/
package com.bytedesk.core.topic;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class TopicService {
private final TopicRestService topicRestService;
public void create(String topic, String userUid) {
TopicRequest request = TopicRequest.builder()
.topic(topic)
.userUid(userUid)
.build();
create(request);
}
// 创建topic
@Transactional
public void create(TopicRequest request) {
topicRestService.create(request);
}
// 删除topic
// @Transactional
// public void remove(TopicRequest topicRequest) {
// topicRestService.remove(topicRequest);
// }
// 删除topic
@Transactional
public void remove(String topic, String userUid) {
topicRestService.remove(topic, userUid);
}
// 订阅topic
@Transactional
public void subscribe(String topic, String clientId) {
topicRestService.subscribe(topic, clientId);
}
// 取消订阅topic
@Transactional
public void unsubscribe(String topic, String clientId) {
topicRestService.unsubscribe(topic, clientId);
}
@Transactional
public void addClientId(String clientId) {
topicRestService.addClientId(clientId);
}
@Transactional
public void removeClientId(String clientId) {
topicRestService.removeClientId(clientId);
}
/**
* 查询所有的 TopicEntity
*
* @return 所有的 TopicEntity 列表
*/
public List<TopicEntity> findAll() {
return topicRestService.findAll();
}
// @Cacheable(value = "topic", key = "#uid")
// public Optional<TopicEntity> findByUid(String uid) {
// return topicRestService.findByUid(uid);
// }
// @Cacheable(value = "topic", key = "#clientId", unless = "#result == null")
// public Optional<TopicEntity> findByClientId(String clientId) {
// // 用户clientId格式: userUid/client/deviceUid
// final String userUid = clientId.split("/")[0];
// return findByUserUid(userUid);
// }
// @Cacheable(value = "topic", key = "#uid", unless = "#result == null")
// public Optional<TopicEntity> findByUserUid(String uid) {
// return topicRestService.findByUid(uid);
// }
// @Cacheable(value = "topic", key = "#topic", unless="#result == null")
// public Set<TopicEntity> findByTopic(String topic) {
// return topicRestService.findByTopic(topic);
// }
// public void update(String uid, String userUid) {
// Optional<TopicEntity> optionalTopic = findByUid(uid);
// optionalTopic.ifPresent(topic -> {
// topic.setUserUid(userUid);
// topicRestService.save(topic);
// });
// }
// @CacheEvict(value = "topic", key = "#topic.userUid")
// public void delete(TopicEntity topic) {
// topicRestService.deleteByUid(topic.getUid());
// }
// public TopicResponse convertToTopicResponse(TopicEntity topic) {
// return topicRestService.convertToResponse(topic);
// }
}