:ambulance:分布式队列消费问题BUG修复

This commit is contained in:
zhipeng.zhang
2020-08-13 18:22:04 +08:00
parent ea5c862ab7
commit 34073c4b85
4 changed files with 115 additions and 97 deletions

View File

@@ -1,34 +1,38 @@
package com.itstyle.seckill.queue.activemq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import com.itstyle.seckill.common.entity.Result;
import com.itstyle.seckill.common.enums.SeckillStatEnum;
import com.itstyle.seckill.common.redis.RedisUtil;
import com.itstyle.seckill.common.webSocket.WebSocketServer;
import com.itstyle.seckill.service.ISeckillService;
@Service
public class ActiveMQConsumer {
@Autowired
private ISeckillService seckillService;
@Autowired
private RedisUtil redisUtil;
// 使用JmsListener配置消费者监听的队列其中text是接收到的消息
@JmsListener(destination = "seckill.queue")
public void receiveQueue(String message) {
//收到通道的消息之后执行秒杀操作(超卖)
String[] array = message.split(";");
Result result = seckillService.startSeckilDBPCC_TWO(Long.parseLong(array[0]), Long.parseLong(array[1]));
if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){
WebSocketServer.sendInfo(array[0], "秒杀成功");//推送给前台
}else{
WebSocketServer.sendInfo(array[0], "秒杀失败");//推送给前台
redisUtil.cacheValue(array[0], "ok");//秒杀结束
}
}
}
package com.itstyle.seckill.queue.activemq;
import com.itstyle.seckill.common.entity.Result;
import com.itstyle.seckill.common.enums.SeckillStatEnum;
import com.itstyle.seckill.common.redis.RedisUtil;
import com.itstyle.seckill.common.webSocket.WebSocketServer;
import com.itstyle.seckill.service.ISeckillService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
@Service
public class ActiveMQConsumer {
@Autowired
private ISeckillService seckillService;
@Autowired
private RedisUtil redisUtil;
/**
* 使用JmsListener配置消费者监听的队列其中text是接收到的消息
* @param message
*/
@JmsListener(destination = "seckill.queue")
public void receiveQueue(String message) {
/**
* 收到通道的消息之后执行秒杀操作(超卖)
*/
String[] array = message.split(";");
Result result = seckillService.startSeckilDBPCC_TWO(Long.parseLong(array[0]), Long.parseLong(array[1]));
if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){
WebSocketServer.sendInfo(array[0], "秒杀成功");
}else{
WebSocketServer.sendInfo(array[0], "秒杀失败");
redisUtil.cacheValue(array[0], "ok");
}
}
}

View File

@@ -1,19 +1,30 @@
package com.itstyle.seckill.queue.disruptor;
import com.itstyle.seckill.common.config.SpringUtil;
import com.itstyle.seckill.service.ISeckillService;
import com.lmax.disruptor.EventHandler;
/**
* 消费者(秒杀处理器)
* 创建者 科帮网
*/
public class SeckillEventConsumer implements EventHandler<SeckillEvent> {
private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");
@Override
public void onEvent(SeckillEvent seckillEvent, long seq, boolean bool) {
seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId());
}
}
package com.itstyle.seckill.queue.disruptor;
import com.itstyle.seckill.common.config.SpringUtil;
import com.itstyle.seckill.common.entity.Result;
import com.itstyle.seckill.common.enums.SeckillStatEnum;
import com.itstyle.seckill.queue.jvm.TaskRunner;
import com.itstyle.seckill.service.ISeckillService;
import com.lmax.disruptor.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 消费者(秒杀处理器)
* 创建者 科帮网
*/
public class SeckillEventConsumer implements EventHandler<SeckillEvent> {
private final static Logger LOGGER = LoggerFactory.getLogger(SeckillEventConsumer.class);
private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");
@Override
public void onEvent(SeckillEvent seckillEvent, long seq, boolean bool) {
Result result =
seckillService.startSeckilAopLock(seckillEvent.getSeckillId(), seckillEvent.getUserId());
if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){
LOGGER.info("用户:{}{}",seckillEvent.getUserId(),"秒杀成功");
}
}
}

View File

@@ -1,43 +1,44 @@
package com.itstyle.seckill.queue.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.itstyle.seckill.common.entity.Result;
import com.itstyle.seckill.common.redis.RedisUtil;
import com.itstyle.seckill.common.webSocket.WebSocketServer;
import com.itstyle.seckill.service.ISeckillService;
/**
* 消费者 spring-kafka 2.0 + 依赖JDK8
* @author 科帮网 By https://blog.52itstyle.com
*/
@Component
public class KafkaConsumer {
@Autowired
private ISeckillService seckillService;
private static RedisUtil redisUtil = new RedisUtil();
/**
* 监听seckill主题,有消息就读取
* @param message
*/
@KafkaListener(topics = {"seckill"})
public void receiveMessage(String message){
//收到通道的消息之后执行秒杀操作
String[] array = message.split(";");
if(redisUtil.getValue(array[0])==null){//control层已经判断了其实这里不需要再判断了这个接口有限流 注意一下
Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));
//可以注释掉上面的使用这个测试
//Result result = seckillService.startSeckilDBPCC_TWO(Long.parseLong(array[0]), Long.parseLong(array[1]));
if(result.equals(Result.ok())){
WebSocketServer.sendInfo(array[0].toString(), "秒杀成功");//推送给前台
}else{
WebSocketServer.sendInfo(array[0].toString(), "秒杀失败");//推送给前台
redisUtil.cacheValue(array[0], "ok");//秒杀结束
}
}else{
WebSocketServer.sendInfo(array[0].toString(), "秒杀失败");//推送给前台
}
}
package com.itstyle.seckill.queue.kafka;
import com.itstyle.seckill.common.enums.SeckillStatEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.itstyle.seckill.common.entity.Result;
import com.itstyle.seckill.common.redis.RedisUtil;
import com.itstyle.seckill.common.webSocket.WebSocketServer;
import com.itstyle.seckill.service.ISeckillService;
/**
* 消费者 spring-kafka 2.0 + 依赖JDK8
* @author 科帮网 By https://blog.52itstyle.com
*/
@Component
public class KafkaConsumer {
@Autowired
private ISeckillService seckillService;
private static RedisUtil redisUtil = new RedisUtil();
/**
* 监听seckill主题,有消息就读取
* @param message
*/
@KafkaListener(topics = {"seckill"})
public void receiveMessage(String message){
/**
* 收到通道的消息之后执行秒杀操作
*/
String[] array = message.split(";");
if(redisUtil.getValue(array[0])==null){
Result result = seckillService.startSeckilAopLock(Long.parseLong(array[0]), Long.parseLong(array[1]));
if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){
WebSocketServer.sendInfo(array[0], "秒杀成功");
}else{
WebSocketServer.sendInfo(array[0], "秒杀失败");
redisUtil.cacheValue(array[0], "ok");
}
}else{
WebSocketServer.sendInfo(array[0], "秒杀失败");
}
}
}

View File

@@ -177,8 +177,10 @@ public class SeckillServiceImpl implements ISeckillService {
@Override
@Transactional(rollbackFor = Exception.class)
public Result startSeckilDBPCC_TWO(long seckillId, long userId) {
//单用户抢购一件商品没有问题、但是抢购多件商品不建议这种写法
String nativeSql = "UPDATE seckill SET number=number-1 WHERE seckill_id=? AND number>0";//UPDATE锁表
/**
* 单用户抢购一件商品没有问题、但是抢购多件商品不建议这种写法 UPDATE锁表
*/
String nativeSql = "UPDATE seckill SET number=number-1 WHERE seckill_id=? AND number>0";
int count = dynamicQuery.nativeExecuteUpdate(nativeSql, new Object[]{seckillId});
if(count>0){
SuccessKilled killed = new SuccessKilled();