From 1ec6a8a60ac4999b0d7c6dba69916180c4a8a7b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=9F=922012?= <345849402@qq.com> Date: Tue, 29 May 2018 17:57:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=95=E5=85=A5WebSocketServer=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E6=9C=8D=E5=8A=A1=E7=AB=AF=E7=A7=92=E6=9D=80=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 + .../common/webSocket/WebSocketConfig.java | 17 +++ .../common/webSocket/WebSocketServer.java | 120 ++++++++++++++++++ .../seckill/queue/kafka/KafkaConsumer.java | 13 +- .../seckill/queue/redis/RedisConsumer.java | 13 +- src/main/resources/logback-spring.xml | 2 + src/main/resources/static/js/webSocket.js | 29 +++++ src/main/resources/templates/index.html | 2 + 8 files changed, 199 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/itstyle/seckill/common/webSocket/WebSocketConfig.java create mode 100644 src/main/java/com/itstyle/seckill/common/webSocket/WebSocketServer.java create mode 100644 src/main/resources/static/js/webSocket.js diff --git a/pom.xml b/pom.xml index f24447f..93af8aa 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,11 @@ disruptor 3.4.1 + + + org.springframework.boot + spring-boot-starter-websocket + spring-boot-seckill diff --git a/src/main/java/com/itstyle/seckill/common/webSocket/WebSocketConfig.java b/src/main/java/com/itstyle/seckill/common/webSocket/WebSocketConfig.java new file mode 100644 index 0000000..717968c --- /dev/null +++ b/src/main/java/com/itstyle/seckill/common/webSocket/WebSocketConfig.java @@ -0,0 +1,17 @@ +package com.itstyle.seckill.common.webSocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; +/** + * WebSocket配置 + * 创建者 科帮网 + * 创建时间 2018年5月29日 + */ +@Configuration +public class WebSocketConfig { + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/src/main/java/com/itstyle/seckill/common/webSocket/WebSocketServer.java b/src/main/java/com/itstyle/seckill/common/webSocket/WebSocketServer.java new file mode 100644 index 0000000..a884baf --- /dev/null +++ b/src/main/java/com/itstyle/seckill/common/webSocket/WebSocketServer.java @@ -0,0 +1,120 @@ +package com.itstyle.seckill.common.webSocket; + +import java.io.IOException; +import java.util.concurrent.CopyOnWriteArraySet; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@ServerEndpoint("/websocket/{userId}") +@Component +public class WebSocketServer { + private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class); + //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 + private static int onlineCount = 0; + //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 + private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet(); + + //与某个客户端的连接会话,需要通过它来给客户端发送数据 + private Session session; + + //接收userId + private String userId=""; + /** + * 连接建立成功调用的方法*/ + @OnOpen + public void onOpen(Session session,@PathParam("userId") String userId) { + this.session = session; + webSocketSet.add(this); //加入set中 + addOnlineCount(); //在线数加1 + log.info("有新窗口开始监听:"+userId+",当前在线人数为" + getOnlineCount()); + this.userId=userId; + try { + sendMessage("连接成功"); + } catch (IOException e) { + log.error("websocket IO异常"); + } + } + + /** + * 连接关闭调用的方法 + */ + @OnClose + public void onClose() { + webSocketSet.remove(this); //从set中删除 + subOnlineCount(); //在线数减1 + log.info("有一连接关闭!当前在线人数为" + getOnlineCount()); + } + + /** + * 收到客户端消息后调用的方法 + * @param message 客户端发送过来的消息*/ + @OnMessage + public void onMessage(String message, Session session) { + log.info("收到来自窗口"+userId+"的信息:"+message); + //群发消息 + for (WebSocketServer item : webSocketSet) { + try { + item.sendMessage(message); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * @param session + * @param error + */ + @OnError + public void onError(Session session, Throwable error) { + log.error("发生错误"); + error.printStackTrace(); + } + /** + * 实现服务器主动推送 + */ + public void sendMessage(String message) throws IOException { + this.session.getBasicRemote().sendText(message); + } + /** + * 群发自定义消息 + * */ + public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException { + log.info("推送消息到窗口"+userId+",推送内容:"+message); + for (WebSocketServer item : webSocketSet) { + try { + //这里可以设定只推送给这个userId的,为null则全部推送 + if(userId==null) { + item.sendMessage(message); + }else if(item.userId.equals(userId)){ + item.sendMessage(message); + } + } catch (IOException e) { + continue; + } + } + } + + public static synchronized int getOnlineCount() { + return onlineCount; + } + + public static synchronized void addOnlineCount() { + WebSocketServer.onlineCount++; + } + + public static synchronized void subOnlineCount() { + WebSocketServer.onlineCount--; + } + +} diff --git a/src/main/java/com/itstyle/seckill/queue/kafka/KafkaConsumer.java b/src/main/java/com/itstyle/seckill/queue/kafka/KafkaConsumer.java index e972f11..753363a 100644 --- a/src/main/java/com/itstyle/seckill/queue/kafka/KafkaConsumer.java +++ b/src/main/java/com/itstyle/seckill/queue/kafka/KafkaConsumer.java @@ -1,9 +1,13 @@ package com.itstyle.seckill.queue.kafka; +import java.io.IOException; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import com.itstyle.seckill.common.entity.Result; +import com.itstyle.seckill.common.webSocket.WebSocketServer; import com.itstyle.seckill.service.ISeckillService; /** * 消费者 spring-kafka 2.0 + 依赖JDK8 @@ -21,6 +25,13 @@ public class KafkaConsumer { public void receiveMessage(String message){ //收到通道的消息之后执行秒杀操作 String[] array = message.split(";"); - seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1])); + Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1])); + if(result.equals(Result.ok())){ + try { + WebSocketServer.sendInfo(array[0].toString(), array[1].toString());//推送给前台 + } catch (IOException e) { + e.printStackTrace(); + } + } } } \ No newline at end of file diff --git a/src/main/java/com/itstyle/seckill/queue/redis/RedisConsumer.java b/src/main/java/com/itstyle/seckill/queue/redis/RedisConsumer.java index 308be46..060912a 100644 --- a/src/main/java/com/itstyle/seckill/queue/redis/RedisConsumer.java +++ b/src/main/java/com/itstyle/seckill/queue/redis/RedisConsumer.java @@ -1,8 +1,12 @@ package com.itstyle.seckill.queue.redis; +import java.io.IOException; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import com.itstyle.seckill.common.entity.Result; +import com.itstyle.seckill.common.webSocket.WebSocketServer; import com.itstyle.seckill.service.ISeckillService; /** * 消费者 @@ -17,6 +21,13 @@ public class RedisConsumer { public void receiveMessage(String message) { //收到通道的消息之后执行秒杀操作(超卖) String[] array = message.split(";"); - seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1])); + Result result = seckillService.startSeckilAopLock(Long.parseLong(array[0]), Long.parseLong(array[1])); + if(result.equals(Result.ok())){ + try { + WebSocketServer.sendInfo(array[0].toString(), array[1].toString());//推送给前台 + } catch (IOException e) { + e.printStackTrace(); + } + } } } \ No newline at end of file diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index be6b1fb..4a7f283 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -68,6 +68,8 @@ INFO + + diff --git a/src/main/resources/static/js/webSocket.js b/src/main/resources/static/js/webSocket.js new file mode 100644 index 0000000..1f5c5e6 --- /dev/null +++ b/src/main/resources/static/js/webSocket.js @@ -0,0 +1,29 @@ +$(function(){ + socket.init(); +}); +var basePath = "ws://localhost:8080/seckill/"; +socket = { + webSocket : "", + init : function() { + //userId:自行追加 + if ('WebSocket' in window) { + webSocket = new WebSocket(basePath+'websocket/1'); + } + else if ('MozWebSocket' in window) { + webSocket = new MozWebSocket(basePath+"websocket/1"); + } + else { + webSocket = new SockJS(basePath+"sockjs/websocket"); + } + webSocket.onerror = function(event) { + alert("websockt连接发生错误,请刷新页面重试!") + }; + webSocket.onopen = function(event) { + + }; + webSocket.onmessage = function(event) { + var message = event.data; + alert("恭喜你秒杀成功、赶紧付款吧!!!") + }; + } +} \ No newline at end of file diff --git a/src/main/resources/templates/index.html b/src/main/resources/templates/index.html index 751dea8..36bbaa4 100644 --- a/src/main/resources/templates/index.html +++ b/src/main/resources/templates/index.html @@ -9,6 +9,8 @@ 分布式秒杀系统 + +