diff --git a/modules/ai/src/main/java/com/bytedesk/ai/springai/dashscope/SpringAIDashscopeService.java b/modules/ai/src/main/java/com/bytedesk/ai/springai/dashscope/SpringAIDashscopeService.java index 628f419e55..827127c72f 100644 --- a/modules/ai/src/main/java/com/bytedesk/ai/springai/dashscope/SpringAIDashscopeService.java +++ b/modules/ai/src/main/java/com/bytedesk/ai/springai/dashscope/SpringAIDashscopeService.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2025-02-28 17:56:26 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-03-11 21:16:39 + * @LastEditTime: 2025-03-11 22:48:45 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. @@ -170,7 +170,7 @@ public class SpringAIDashscopeService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("error")); + .name("message")); emitter.complete(); } catch (Exception ex) { emitter.completeWithError(ex); @@ -184,7 +184,7 @@ public class SpringAIDashscopeService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("end")); + .name("message")); emitter.complete(); } catch (Exception e) { log.error("Error completing SSE", e); @@ -199,7 +199,7 @@ public class SpringAIDashscopeService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("error")); + .name("message")); emitter.complete(); } catch (Exception ex) { emitter.completeWithError(ex); diff --git a/modules/ai/src/main/java/com/bytedesk/ai/springai/deepseek/SpringAIDeepseekService.java b/modules/ai/src/main/java/com/bytedesk/ai/springai/deepseek/SpringAIDeepseekService.java index 63b34cb078..ac15303bb1 100644 --- a/modules/ai/src/main/java/com/bytedesk/ai/springai/deepseek/SpringAIDeepseekService.java +++ b/modules/ai/src/main/java/com/bytedesk/ai/springai/deepseek/SpringAIDeepseekService.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2025-02-28 11:44:03 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-03-11 21:17:25 + * @LastEditTime: 2025-03-11 22:48:27 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. @@ -149,7 +149,7 @@ public class SpringAIDeepseekService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("error")); + .name("message")); emitter.complete(); } catch (Exception ex) { emitter.completeWithError(ex); @@ -163,7 +163,7 @@ public class SpringAIDeepseekService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("end")); + .name("message")); emitter.complete(); } catch (Exception e) { log.error("Error completing SSE", e); @@ -178,7 +178,7 @@ public class SpringAIDeepseekService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("error")); + .name("message")); emitter.complete(); } catch (Exception ex) { emitter.completeWithError(ex); diff --git a/modules/ai/src/main/java/com/bytedesk/ai/springai/ollama/SpringAIOllamaService.java b/modules/ai/src/main/java/com/bytedesk/ai/springai/ollama/SpringAIOllamaService.java index 67a262307c..80eca864f4 100644 --- a/modules/ai/src/main/java/com/bytedesk/ai/springai/ollama/SpringAIOllamaService.java +++ b/modules/ai/src/main/java/com/bytedesk/ai/springai/ollama/SpringAIOllamaService.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2025-02-26 16:59:14 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-03-11 21:17:09 + * @LastEditTime: 2025-03-11 22:48:14 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. @@ -163,7 +163,7 @@ public class SpringAIOllamaService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("error")); + .name("message")); emitter.complete(); } catch (Exception e) { emitter.completeWithError(e); @@ -177,7 +177,7 @@ public class SpringAIOllamaService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("end")); + .name("message")); emitter.complete(); } catch (Exception e) { log.error("Error completing SSE", e); @@ -192,7 +192,7 @@ public class SpringAIOllamaService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("ollama_error")); + .name("message")); emitter.complete(); } catch (Exception e) { emitter.completeWithError(e); diff --git a/modules/ai/src/main/java/com/bytedesk/ai/springai/zhipuai/SpringAIZhipuaiService.java b/modules/ai/src/main/java/com/bytedesk/ai/springai/zhipuai/SpringAIZhipuaiService.java index 90bce50046..a7d2d9403e 100644 --- a/modules/ai/src/main/java/com/bytedesk/ai/springai/zhipuai/SpringAIZhipuaiService.java +++ b/modules/ai/src/main/java/com/bytedesk/ai/springai/zhipuai/SpringAIZhipuaiService.java @@ -2,7 +2,7 @@ * @Author: jackning 270580156@qq.com * @Date: 2025-02-26 16:58:56 * @LastEditors: jackning 270580156@qq.com - * @LastEditTime: 2025-03-11 21:38:34 + * @LastEditTime: 2025-03-11 22:47:30 * @Description: bytedesk.com https://github.com/Bytedesk/bytedesk * Please be aware of the BSL license restrictions before installing Bytedesk IM – * selling, reselling, or hosting Bytedesk IM as a service is a breach of the terms and automatically terminates your rights under the license. @@ -51,7 +51,7 @@ public class SpringAIZhipuaiService extends BaseSpringAIService { public SpringAIZhipuaiService( @Qualifier("bytedeskZhipuaiChatModel") ZhiPuAiChatModel bytedeskZhipuaiChatModel, Optional springAIVectorService, - IMessageSendService messageSendService, + IMessageSendService messageSendService, UidUtils uidUtils, RobotRestService robotRestService, ThreadRestService threadRestService) { @@ -65,34 +65,33 @@ public class SpringAIZhipuaiService extends BaseSpringAIService { @Override protected void processPrompt(Prompt prompt, MessageProtobuf messageProtobuf) { bytedeskZhipuaiChatModel.stream(prompt).subscribe( - response -> { - if (response != null) { - log.info("Zhipuai API response metadata: {}", response.getMetadata()); - List generations = response.getResults(); - for (Generation generation : generations) { - AssistantMessage assistantMessage = generation.getOutput(); - String textContent = assistantMessage.getText(); + response -> { + if (response != null) { + log.info("Zhipuai API response metadata: {}", response.getMetadata()); + List generations = response.getResults(); + for (Generation generation : generations) { + AssistantMessage assistantMessage = generation.getOutput(); + String textContent = assistantMessage.getText(); - messageProtobuf.setType(MessageTypeEnum.STREAM); - messageProtobuf.setContent(textContent); - messageSendService.sendProtobufMessage(messageProtobuf); + messageProtobuf.setType(MessageTypeEnum.STREAM); + messageProtobuf.setContent(textContent); + messageSendService.sendProtobufMessage(messageProtobuf); + } } - } - }, - error -> { - log.error("Zhipuai API error: ", error); - messageProtobuf.setType(MessageTypeEnum.ERROR); - messageProtobuf.setContent("服务暂时不可用,请稍后重试"); - messageSendService.sendProtobufMessage(messageProtobuf); - }, - () -> { - log.info("Chat stream completed"); - // 发送流结束标记 - messageProtobuf.setType(MessageTypeEnum.STREAM_END); - messageProtobuf.setContent(""); // 或者可以是任何结束标记 - messageSendService.sendProtobufMessage(messageProtobuf); - } - ); + }, + error -> { + log.error("Zhipuai API error: ", error); + messageProtobuf.setType(MessageTypeEnum.ERROR); + messageProtobuf.setContent("服务暂时不可用,请稍后重试"); + messageSendService.sendProtobufMessage(messageProtobuf); + }, + () -> { + log.info("Chat stream completed"); + // 发送流结束标记 + messageProtobuf.setType(MessageTypeEnum.STREAM_END); + messageProtobuf.setContent(""); // 或者可以是任何结束标记 + messageSendService.sendProtobufMessage(messageProtobuf); + }); } /** @@ -118,26 +117,44 @@ public class SpringAIZhipuaiService extends BaseSpringAIService { Flux responseFlux = bytedeskZhipuaiChatModel.stream(prompt); responseFlux.subscribe( - response -> { - try { - if (response != null) { - List generations = response.getResults(); - for (Generation generation : generations) { - AssistantMessage assistantMessage = generation.getOutput(); - String textContent = assistantMessage.getText(); - log.info("Zhipuai API response metadata: {}, text {}", response.getMetadata(), textContent); - // - messageProtobuf.setContent(textContent); - messageProtobuf.setType(MessageTypeEnum.STREAM); - // 发送SSE事件 - emitter.send(SseEmitter.event() - .data(JSON.toJSONString(messageProtobuf)) - .id(messageProtobuf.getUid()) - .name("message")); + response -> { + try { + if (response != null) { + List generations = response.getResults(); + for (Generation generation : generations) { + AssistantMessage assistantMessage = generation.getOutput(); + String textContent = assistantMessage.getText(); + log.info("Zhipuai API response metadata: {}, text {}", response.getMetadata(), + textContent); + // + messageProtobuf.setContent(textContent); + messageProtobuf.setType(MessageTypeEnum.STREAM); + // 发送SSE事件 + emitter.send(SseEmitter.event() + .data(JSON.toJSONString(messageProtobuf)) + .id(messageProtobuf.getUid()) + .name("message")); + } + } + } catch (Exception e) { + log.error("Zhipuai API Error sending SSE event 1:", e); + messageProtobuf.setType(MessageTypeEnum.ERROR); + messageProtobuf.setContent("服务暂时不可用,请稍后重试"); + // + try { + emitter.send(SseEmitter.event() + .data(JSON.toJSONString(messageProtobuf)) + .id(messageProtobuf.getUid()) + .name("message")); + emitter.complete(); + } catch (Exception ex) { + log.error("Zhipuai API SSE complete Error 1", ex); + emitter.completeWithError(ex); } } - } catch (Exception e) { - log.error("Zhipuai API Error sending SSE event 1:", e); + }, + error -> { + log.error("Zhipuai API SSE error 2:", error); messageProtobuf.setType(MessageTypeEnum.ERROR); messageProtobuf.setContent("服务暂时不可用,请稍后重试"); // @@ -145,46 +162,28 @@ public class SpringAIZhipuaiService extends BaseSpringAIService { emitter.send(SseEmitter.event() .data(JSON.toJSONString(messageProtobuf)) .id(messageProtobuf.getUid()) - .name("error1")); + .name("message")); emitter.complete(); } catch (Exception ex) { - log.error("Zhipuai API SSE complete Error 1", ex); + log.error("Zhipuai API SSE complete Error 2", ex); emitter.completeWithError(ex); } - } - }, - error -> { - log.error("Zhipuai API SSE error 2:", error); - messageProtobuf.setType(MessageTypeEnum.ERROR); - messageProtobuf.setContent("服务暂时不可用,请稍后重试"); - // - try { - emitter.send(SseEmitter.event() - .data(JSON.toJSONString(messageProtobuf)) - .id(messageProtobuf.getUid()) - .name("error2")); - emitter.complete(); - } catch (Exception ex) { - log.error("Zhipuai API SSE complete Error 2", ex); - emitter.completeWithError(ex); - } - }, - () -> { - log.error("Zhipuai API SSE complete"); - try { - // 发送流结束标记 - messageProtobuf.setType(MessageTypeEnum.STREAM_END); - messageProtobuf.setContent(""); // 或者可以是任何结束标记 - emitter.send(SseEmitter.event() - .data(JSON.toJSONString(messageProtobuf)) - .id(messageProtobuf.getUid()) - .name("end")); - emitter.complete(); - } catch (Exception e) { - log.error("Zhipuai API SSE complete Error completing SSE", e); - } - } - ); + }, + () -> { + log.info("Zhipuai API SSE complete"); + try { + // 发送流结束标记 + messageProtobuf.setType(MessageTypeEnum.STREAM_END); + messageProtobuf.setContent(""); // 或者可以是任何结束标记 + emitter.send(SseEmitter.event() + .data(JSON.toJSONString(messageProtobuf)) + .id(messageProtobuf.getUid()) + .name("message")); + emitter.complete(); + } catch (Exception e) { + log.error("Zhipuai API SSE complete Error completing SSE", e); + } + }); } @Override