mirror of
https://gitee.com/52itstyle/spring-boot-seckill.git
synced 2025-12-30 10:22:26 +00:00
引入WebSocketServer实现服务端秒杀消息推送
This commit is contained in:
5
pom.xml
5
pom.xml
@@ -100,6 +100,11 @@
|
||||
<artifactId>disruptor</artifactId>
|
||||
<version>3.4.1</version>
|
||||
</dependency>
|
||||
<!-- webSocket 秒杀通知 -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<finalName>spring-boot-seckill</finalName><plugins>
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.itstyle.seckill.common.webSocket;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||
/**
|
||||
* WebSocket配置
|
||||
* 创建者 科帮网
|
||||
* 创建时间 2018年5月29日
|
||||
*/
|
||||
@Configuration
|
||||
public class WebSocketConfig {
|
||||
@Bean
|
||||
public ServerEndpointExporter serverEndpointExporter() {
|
||||
return new ServerEndpointExporter();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
package com.itstyle.seckill.common.webSocket;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
import javax.websocket.OnClose;
|
||||
import javax.websocket.OnError;
|
||||
import javax.websocket.OnMessage;
|
||||
import javax.websocket.OnOpen;
|
||||
import javax.websocket.Session;
|
||||
import javax.websocket.server.PathParam;
|
||||
import javax.websocket.server.ServerEndpoint;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ServerEndpoint("/websocket/{userId}")
|
||||
@Component
|
||||
public class WebSocketServer {
|
||||
private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
|
||||
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
|
||||
private static int onlineCount = 0;
|
||||
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
|
||||
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
|
||||
|
||||
//与某个客户端的连接会话,需要通过它来给客户端发送数据
|
||||
private Session session;
|
||||
|
||||
//接收userId
|
||||
private String userId="";
|
||||
/**
|
||||
* 连接建立成功调用的方法*/
|
||||
@OnOpen
|
||||
public void onOpen(Session session,@PathParam("userId") String userId) {
|
||||
this.session = session;
|
||||
webSocketSet.add(this); //加入set中
|
||||
addOnlineCount(); //在线数加1
|
||||
log.info("有新窗口开始监听:"+userId+",当前在线人数为" + getOnlineCount());
|
||||
this.userId=userId;
|
||||
try {
|
||||
sendMessage("连接成功");
|
||||
} catch (IOException e) {
|
||||
log.error("websocket IO异常");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接关闭调用的方法
|
||||
*/
|
||||
@OnClose
|
||||
public void onClose() {
|
||||
webSocketSet.remove(this); //从set中删除
|
||||
subOnlineCount(); //在线数减1
|
||||
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* 收到客户端消息后调用的方法
|
||||
* @param message 客户端发送过来的消息*/
|
||||
@OnMessage
|
||||
public void onMessage(String message, Session session) {
|
||||
log.info("收到来自窗口"+userId+"的信息:"+message);
|
||||
//群发消息
|
||||
for (WebSocketServer item : webSocketSet) {
|
||||
try {
|
||||
item.sendMessage(message);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param session
|
||||
* @param error
|
||||
*/
|
||||
@OnError
|
||||
public void onError(Session session, Throwable error) {
|
||||
log.error("发生错误");
|
||||
error.printStackTrace();
|
||||
}
|
||||
/**
|
||||
* 实现服务器主动推送
|
||||
*/
|
||||
public void sendMessage(String message) throws IOException {
|
||||
this.session.getBasicRemote().sendText(message);
|
||||
}
|
||||
/**
|
||||
* 群发自定义消息
|
||||
* */
|
||||
public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
|
||||
log.info("推送消息到窗口"+userId+",推送内容:"+message);
|
||||
for (WebSocketServer item : webSocketSet) {
|
||||
try {
|
||||
//这里可以设定只推送给这个userId的,为null则全部推送
|
||||
if(userId==null) {
|
||||
item.sendMessage(message);
|
||||
}else if(item.userId.equals(userId)){
|
||||
item.sendMessage(message);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static synchronized int getOnlineCount() {
|
||||
return onlineCount;
|
||||
}
|
||||
|
||||
public static synchronized void addOnlineCount() {
|
||||
WebSocketServer.onlineCount++;
|
||||
}
|
||||
|
||||
public static synchronized void subOnlineCount() {
|
||||
WebSocketServer.onlineCount--;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,9 +1,13 @@
|
||||
package com.itstyle.seckill.queue.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.itstyle.seckill.common.entity.Result;
|
||||
import com.itstyle.seckill.common.webSocket.WebSocketServer;
|
||||
import com.itstyle.seckill.service.ISeckillService;
|
||||
/**
|
||||
* 消费者 spring-kafka 2.0 + 依赖JDK8
|
||||
@@ -21,6 +25,13 @@ public class KafkaConsumer {
|
||||
public void receiveMessage(String message){
|
||||
//收到通道的消息之后执行秒杀操作
|
||||
String[] array = message.split(";");
|
||||
seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));
|
||||
Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));
|
||||
if(result.equals(Result.ok())){
|
||||
try {
|
||||
WebSocketServer.sendInfo(array[0].toString(), array[1].toString());//推送给前台
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,12 @@
|
||||
package com.itstyle.seckill.queue.redis;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.itstyle.seckill.common.entity.Result;
|
||||
import com.itstyle.seckill.common.webSocket.WebSocketServer;
|
||||
import com.itstyle.seckill.service.ISeckillService;
|
||||
/**
|
||||
* 消费者
|
||||
@@ -17,6 +21,13 @@ public class RedisConsumer {
|
||||
public void receiveMessage(String message) {
|
||||
//收到通道的消息之后执行秒杀操作(超卖)
|
||||
String[] array = message.split(";");
|
||||
seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));
|
||||
Result result = seckillService.startSeckilAopLock(Long.parseLong(array[0]), Long.parseLong(array[1]));
|
||||
if(result.equals(Result.ok())){
|
||||
try {
|
||||
WebSocketServer.sendInfo(array[0].toString(), array[1].toString());//推送给前台
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -68,6 +68,8 @@
|
||||
<level>INFO</level>
|
||||
</filter>
|
||||
</appender>
|
||||
<!-- 屏蔽kafka的警告 -->
|
||||
<logger name="org.apache.kafka" level="ERROR"/>
|
||||
<!-- additivity 避免执行2次 -->
|
||||
<logger name="com.itstyle" level="INFO" additivity="false">
|
||||
<appender-ref ref="STDOUT"/>
|
||||
|
||||
29
src/main/resources/static/js/webSocket.js
Normal file
29
src/main/resources/static/js/webSocket.js
Normal file
@@ -0,0 +1,29 @@
|
||||
$(function(){
|
||||
socket.init();
|
||||
});
|
||||
var basePath = "ws://localhost:8080/seckill/";
|
||||
socket = {
|
||||
webSocket : "",
|
||||
init : function() {
|
||||
//userId:自行追加
|
||||
if ('WebSocket' in window) {
|
||||
webSocket = new WebSocket(basePath+'websocket/1');
|
||||
}
|
||||
else if ('MozWebSocket' in window) {
|
||||
webSocket = new MozWebSocket(basePath+"websocket/1");
|
||||
}
|
||||
else {
|
||||
webSocket = new SockJS(basePath+"sockjs/websocket");
|
||||
}
|
||||
webSocket.onerror = function(event) {
|
||||
alert("websockt连接发生错误,请刷新页面重试!")
|
||||
};
|
||||
webSocket.onopen = function(event) {
|
||||
|
||||
};
|
||||
webSocket.onmessage = function(event) {
|
||||
var message = event.data;
|
||||
alert("恭喜你秒杀成功、赶紧付款吧!!!")
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,8 @@
|
||||
<meta name="author" content="小柒2012" />
|
||||
<meta name="site" content="https://blog.52itstyle.com" />
|
||||
<title>分布式秒杀系统</title>
|
||||
<script th:src="@{/goods/js/jquery-1.9.1.min.js}"></script>
|
||||
<script th:src="@{/js/webSocket.js}"></script>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container" style="margin-top: 1%;">
|
||||
|
||||
Reference in New Issue
Block a user