:ambulance:进程内队列打印秒杀成功问题

This commit is contained in:
zhipeng.zhang
2020-08-13 18:17:49 +08:00
parent b4a7a123c1
commit ea5c862ab7
5 changed files with 156 additions and 141 deletions

View File

@@ -1,59 +1,67 @@
package com.itstyle.seckill.queue.jvm;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.itstyle.seckill.common.entity.SuccessKilled;
/**
* 秒杀队列(固定长度为100)
* @author 科帮网 By https://blog.52itstyle.com
* 创建时间 2018年5月10日
*/
public class SeckillQueue {
//队列大小
static final int QUEUE_MAX_SIZE = 100;
/** 用于多线程间下单的队列 */
static BlockingQueue<SuccessKilled> blockingQueue = new LinkedBlockingQueue<SuccessKilled>(QUEUE_MAX_SIZE);
/**
* 私有的默认构造子,保证外界无法直接实例化
*/
private SeckillQueue(){};
/**
* 类级的内部类,也就是静态的成员式内部类,该内部类的实例与外部类的实例
* 没有绑定关系,而且只有被调用到才会装载,从而实现了延迟加载
*/
private static class SingletonHolder{
/**
* 静态初始化器由JVM来保证线程安全
*/
private static SeckillQueue queue = new SeckillQueue();
}
//单例队列
public static SeckillQueue getMailQueue(){
return SingletonHolder.queue;
}
/**
* 生产入队
* @param kill
* @throws InterruptedException
* add(e) 队列未满时返回true队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue
* put(e) 队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
* offer(e) 队列未满时返回true队列满时返回false。非阻塞立即返回。
* offer(e, time, unit) 设定等待的时间如果在指定时间内还不能往队列中插入数据则返回false插入成功返回true。
*/
public Boolean produce(SuccessKilled kill) throws InterruptedException {
return blockingQueue.offer(kill);
}
/**
* 消费出队
* poll() 获取并移除队首元素在指定的时间内去轮询队列看有没有首元素有则返回否者超时后返回null
* take() 与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒
*/
public SuccessKilled consume() throws InterruptedException {
return blockingQueue.take();
}
// 获取队列大小
public int size() {
return blockingQueue.size();
}
}
package com.itstyle.seckill.queue.jvm;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.itstyle.seckill.common.entity.SuccessKilled;
/**
* 秒杀队列(固定长度为100)
* @author 科帮网 By https://blog.52itstyle.com
* 创建时间 2018年5月10日
*/
public class SeckillQueue {
//队列大小
static final int QUEUE_MAX_SIZE = 100;
/** 用于多线程间下单的队列 */
static BlockingQueue<SuccessKilled> blockingQueue = new LinkedBlockingQueue<SuccessKilled>(QUEUE_MAX_SIZE);
/**
* 私有的默认构造子,保证外界无法直接实例化
*/
private SeckillQueue(){};
/**
* 类级的内部类,也就是静态的成员式内部类,该内部类的实例与外部类的实例
* 没有绑定关系,而且只有被调用到才会装载,从而实现了延迟加载
*/
private static class SingletonHolder{
/**
* 静态初始化器由JVM来保证线程安全
*/
private static SeckillQueue queue = new SeckillQueue();
}
/**
* 单例队列
* @return
*/
public static SeckillQueue getSkillQueue(){
return SingletonHolder.queue;
}
/**
* 生产入队
* @param kill
* @throws InterruptedException
* add(e) 队列未满时返回true队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue
* put(e) 队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
* offer(e) 队列未满时返回true队列满时返回false。非阻塞立即返回。
* offer(e, time, unit) 设定等待的时间如果在指定时间内还不能往队列中插入数据则返回false插入成功返回true。
*/
public Boolean produce(SuccessKilled kill) {
return blockingQueue.offer(kill);
}
/**
* 消费出队
* poll() 获取并移除队首元素在指定的时间内去轮询队列看有没有首元素有则返回否者超时后返回null
* take() 与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒
*/
public SuccessKilled consume() throws InterruptedException {
return blockingQueue.take();
}
/**
* 获取队列大小
* @return
*/
public int size() {
return blockingQueue.size();
}
}

View File

@@ -1,42 +1,48 @@
package com.itstyle.seckill.queue.jvm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.itstyle.seckill.common.entity.SuccessKilled;
import com.itstyle.seckill.service.ISeckillService;
/**
* 消费秒杀队列
* 创建者 科帮网
* 创建时间 2018年4月3日
*/
@Component
public class TaskRunner implements ApplicationRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(TaskRunner.class);
@Autowired
private ISeckillService seckillService;
@Override
public void run(ApplicationArguments var){
new Thread(() -> {
LOGGER.info("提醒队列启动成功");
while(true){
try {
//进程内队列
SuccessKilled kill = SeckillQueue.getMailQueue().consume();
if(kill!=null){
seckillService.startSeckil(kill.getSeckillId(), kill.getUserId());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
package com.itstyle.seckill.queue.jvm;
import com.itstyle.seckill.common.entity.Result;
import com.itstyle.seckill.common.enums.SeckillStatEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.itstyle.seckill.common.entity.SuccessKilled;
import com.itstyle.seckill.service.ISeckillService;
/**
* 消费秒杀队列
* 创建者 科帮网
* 创建时间 2018年4月3日
*/
@Component
public class TaskRunner implements ApplicationRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(TaskRunner.class);
@Autowired
private ISeckillService seckillService;
@Override
public void run(ApplicationArguments var){
new Thread(() -> {
LOGGER.info("提醒队列启动成功");
while(true){
try {
//进程内队列
SuccessKilled kill = SeckillQueue.getSkillQueue().consume();
if(kill!=null){
Result result =
seckillService.startSeckilAopLock(kill.getSeckillId(), kill.getUserId());
if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){
LOGGER.info("用户:{}{}",kill.getUserId(),"秒杀成功");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

View File

@@ -9,14 +9,11 @@ import com.itstyle.seckill.common.enums.SeckillStatEnum;
import com.itstyle.seckill.common.exception.RrException;
import com.itstyle.seckill.repository.SeckillRepository;
import com.itstyle.seckill.service.ISeckillService;
import net.bytebuddy.implementation.bytecode.Throw;
import org.apache.tomcat.jni.Thread;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Timestamp;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -26,8 +23,9 @@ public class SeckillServiceImpl implements ISeckillService {
/**
* 思考为什么不用synchronized
* service 默认是单例的并发下lock只有一个实例
* 互斥锁 参数默认false不公平锁
*/
private Lock lock = new ReentrantLock(true);//互斥锁 参数默认false不公平锁
private Lock lock = new ReentrantLock(true);
@Autowired
private DynamicQuery dynamicQuery;
@@ -51,7 +49,7 @@ public class SeckillServiceImpl implements ISeckillService {
return ((Number) object).longValue();
}
@Override
@Transactional
@Transactional(rollbackFor = Exception.class)
public void deleteSeckill(long seckillId) {
String nativeSql = "DELETE FROM success_killed WHERE seckill_id=?";
dynamicQuery.nativeExecuteUpdate(nativeSql, new Object[]{seckillId});
@@ -59,7 +57,7 @@ public class SeckillServiceImpl implements ISeckillService {
dynamicQuery.nativeExecuteUpdate(nativeSql, new Object[]{seckillId});
}
@Override
@Transactional
@Transactional(rollbackFor = Exception.class)
public Result startSeckil(long seckillId,long userId) {
//校验库存
String nativeSql = "SELECT number FROM seckill WHERE seckill_id=?";
@@ -74,8 +72,7 @@ public class SeckillServiceImpl implements ISeckillService {
killed.setSeckillId(seckillId);
killed.setUserId(userId);
killed.setState((short)0);
Timestamp createTime = new Timestamp(new Date().getTime());
killed.setCreateTime(createTime);
killed.setCreateTime(new Timestamp(System.currentTimeMillis()));
dynamicQuery.save(killed);
/**
* 这里仅仅是分表而已,提供一种思路,供参考,测试的时候自行建表
@@ -87,7 +84,7 @@ public class SeckillServiceImpl implements ISeckillService {
*/
String table = "success_killed_"+userId%8;
nativeSql = "INSERT INTO "+table+" (seckill_id, user_id,state,create_time)VALUES(?,?,?,?)";
Object[] params = new Object[]{seckillId,userId,(short)0,createTime};
Object[] params = new Object[]{seckillId,userId,(short)0,new Timestamp(System.currentTimeMillis())};
dynamicQuery.nativeExecuteUpdate(nativeSql,params);
//支付
return Result.ok(SeckillStatEnum.SUCCESS);
@@ -96,10 +93,10 @@ public class SeckillServiceImpl implements ISeckillService {
}
}
@Override
@Transactional
@Transactional(rollbackFor = Exception.class)
public Result startSeckilLock(long seckillId, long userId) {
try {
lock.lock();
lock.lock();
try {
/**
* 1)这里、不清楚为啥、总是会被超卖101、难道锁不起作用、lock是同一个对象
* 2)来自热心网友 zoain 的细心测试思考、然后自己总结了一下,事物未提交之前,锁已经释放(事物提交是在整个方法执行完),导致另一个事物读取到了这个事物未提交的数据,也就是传说中的脏读。建议锁上移
@@ -117,7 +114,7 @@ public class SeckillServiceImpl implements ISeckillService {
killed.setSeckillId(seckillId);
killed.setUserId(userId);
killed.setState(Short.parseShort(number+""));
killed.setCreateTime(new Timestamp(new Date().getTime()));
killed.setCreateTime(new Timestamp(System.currentTimeMillis()));
dynamicQuery.save(killed);
}else{
return Result.error(SeckillStatEnum.END);
@@ -131,7 +128,7 @@ public class SeckillServiceImpl implements ISeckillService {
}
@Override
@Servicelock
@Transactional
@Transactional(rollbackFor = Exception.class)
public Result startSeckilAopLock(long seckillId, long userId) {
//来自码云码友<马丁的早晨>的建议 使用AOP + 锁实现
String nativeSql = "SELECT number FROM seckill WHERE seckill_id=?";
@@ -144,7 +141,7 @@ public class SeckillServiceImpl implements ISeckillService {
killed.setSeckillId(seckillId);
killed.setUserId(userId);
killed.setState(Short.parseShort(number+""));
killed.setCreateTime(new Timestamp(new Date().getTime()));
killed.setCreateTime(new Timestamp(System.currentTimeMillis()));
dynamicQuery.save(killed);
}else{
return Result.error(SeckillStatEnum.END);
@@ -153,7 +150,7 @@ public class SeckillServiceImpl implements ISeckillService {
}
//注意这里 限流注解 可能会出现少买 自行调整
@Override
@Transactional
@Transactional(rollbackFor = Exception.class)
public Result startSeckilDBPCC_ONE(long seckillId, long userId) {
//单用户抢购一件商品或者多件都没有问题
String nativeSql = "SELECT number FROM seckill WHERE seckill_id=? FOR UPDATE";
@@ -166,7 +163,7 @@ public class SeckillServiceImpl implements ISeckillService {
killed.setSeckillId(seckillId);
killed.setUserId(userId);
killed.setState((short)0);
killed.setCreateTime(new Timestamp(new Date().getTime()));
killed.setCreateTime(new Timestamp(System.currentTimeMillis()));
dynamicQuery.save(killed);
return Result.ok(SeckillStatEnum.SUCCESS);
}else{
@@ -178,7 +175,7 @@ public class SeckillServiceImpl implements ISeckillService {
* 如果发现锁争用比较严重如InnoDB_row_lock_waits和InnoDB_row_lock_time_avg的值比较高
*/
@Override
@Transactional
@Transactional(rollbackFor = Exception.class)
public Result startSeckilDBPCC_TWO(long seckillId, long userId) {
//单用户抢购一件商品没有问题、但是抢购多件商品不建议这种写法
String nativeSql = "UPDATE seckill SET number=number-1 WHERE seckill_id=? AND number>0";//UPDATE锁表
@@ -188,7 +185,7 @@ public class SeckillServiceImpl implements ISeckillService {
killed.setSeckillId(seckillId);
killed.setUserId(userId);
killed.setState((short)0);
killed.setCreateTime(new Timestamp(new Date().getTime()));
killed.setCreateTime(new Timestamp(System.currentTimeMillis()));
dynamicQuery.save(killed);
return Result.ok(SeckillStatEnum.SUCCESS);
}else{
@@ -196,11 +193,13 @@ public class SeckillServiceImpl implements ISeckillService {
}
}
@Override
@Transactional
@Transactional(rollbackFor = Exception.class)
public Result startSeckilDBOCC(long seckillId, long userId, long number) {
Seckill kill = seckillRepository.findOne(seckillId);
//if(kill.getNumber()>0){
if(kill.getNumber()>=number){//剩余的数量应该要大于等于秒杀的数量
/**
* 剩余的数量应该要大于等于秒杀的数量
*/
if(kill.getNumber()>=number){
//乐观锁
String nativeSql = "UPDATE seckill SET number=number-?,version=version+1 WHERE seckill_id=? AND version = ?";
int count = dynamicQuery.nativeExecuteUpdate(nativeSql, new Object[]{number,seckillId,kill.getVersion()});
@@ -209,7 +208,7 @@ public class SeckillServiceImpl implements ISeckillService {
killed.setSeckillId(seckillId);
killed.setUserId(userId);
killed.setState((short)0);
killed.setCreateTime(new Timestamp(new Date().getTime()));
killed.setCreateTime(new Timestamp(System.currentTimeMillis()));
dynamicQuery.save(killed);
return Result.ok(SeckillStatEnum.SUCCESS);
}else{

View File

@@ -164,7 +164,7 @@ public class RedPacketController {
@PostMapping("/startThree")
public Result startThree(long redPacketId){
int skillNum = 9;
final CountDownLatch latch = new CountDownLatch(skillNum);//N个抢红包
final CountDownLatch latch = new CountDownLatch(skillNum);
/**
* 初始化红包数据,抢红包拦截
*/

View File

@@ -1,5 +1,6 @@
package com.itstyle.seckill.web;
import com.itstyle.seckill.common.enums.SeckillStatEnum;
import com.itstyle.seckill.common.exception.RrException;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@@ -35,7 +36,9 @@ public class SeckillController {
private final static Logger LOGGER = LoggerFactory.getLogger(SeckillController.class);
private static int corePoolSize = Runtime.getRuntime().availableProcessors();
//创建线程池 调整队列数 拒绝服务
/**
* 创建线程池 调整队列数 拒绝服务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000));
@@ -46,7 +49,7 @@ public class SeckillController {
@PostMapping("/start")
public Result start(long seckillId){
int skillNum = 10;
final CountDownLatch latch = new CountDownLatch(skillNum);//N个购买者
final CountDownLatch latch = new CountDownLatch(skillNum);
seckillService.deleteSeckill(seckillId);
final long killId = seckillId;
LOGGER.info("开始秒杀一(会出现超卖)");
@@ -90,7 +93,7 @@ public class SeckillController {
@PostMapping("/startLock")
public Result startLock(long seckillId){
int skillNum = 1000;
final CountDownLatch latch = new CountDownLatch(skillNum);//N个购买者
final CountDownLatch latch = new CountDownLatch(skillNum);
seckillService.deleteSeckill(seckillId);
final long killId = seckillId;
LOGGER.info("开始秒杀二(正常)");
@@ -116,7 +119,7 @@ public class SeckillController {
@PostMapping("/startAopLock")
public Result startAopLock(long seckillId){
int skillNum = 1000;
final CountDownLatch latch = new CountDownLatch(skillNum);//N个购买者
final CountDownLatch latch = new CountDownLatch(skillNum);
seckillService.deleteSeckill(seckillId);
final long killId = seckillId;
LOGGER.info("开始秒杀三(正常)");
@@ -142,7 +145,7 @@ public class SeckillController {
@PostMapping("/startDBPCC_ONE")
public Result startDBPCC_ONE(long seckillId){
int skillNum = 1000;
final CountDownLatch latch = new CountDownLatch(skillNum);//N个购买者
final CountDownLatch latch = new CountDownLatch(skillNum);
seckillService.deleteSeckill(seckillId);
final long killId = seckillId;
LOGGER.info("开始秒杀四(正常)");
@@ -168,7 +171,7 @@ public class SeckillController {
@PostMapping("/startDPCC_TWO")
public Result startDPCC_TWO(long seckillId){
int skillNum = 1000;
final CountDownLatch latch = new CountDownLatch(skillNum);//N个购买者
final CountDownLatch latch = new CountDownLatch(skillNum);
seckillService.deleteSeckill(seckillId);
final long killId = seckillId;
LOGGER.info("开始秒杀五(正常、数据库锁最优实现)");
@@ -194,7 +197,7 @@ public class SeckillController {
@PostMapping("/startDBOCC")
public Result startDBOCC(long seckillId){
int skillNum = 1000;
final CountDownLatch latch = new CountDownLatch(skillNum);//N个购买者
final CountDownLatch latch = new CountDownLatch(skillNum);
seckillService.deleteSeckill(seckillId);
final long killId = seckillId;
LOGGER.info("开始秒杀六(正常、数据库锁最优实现)");
@@ -230,16 +233,14 @@ public class SeckillController {
SuccessKilled kill = new SuccessKilled();
kill.setSeckillId(killId);
kill.setUserId(userId);
try {
Boolean flag = SeckillQueue.getMailQueue().produce(kill);
if(flag){
LOGGER.info("用户:{}{}",kill.getUserId(),"秒杀成功");
}else{
LOGGER.info("用户:{}{}",userId,"秒杀失败");
}
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.info("用户:{}{}",userId,"秒杀失败");
Boolean flag = SeckillQueue.getSkillQueue().produce(kill);
/**
* 虽然进入了队列,但是不一定能秒杀成功 进队列出队有间隙
*/
if(flag){
//LOGGER.info("用户:{}{}",kill.getUserId(),"秒杀成功");
}else{
//LOGGER.info("用户:{}{}",userId,"秒杀失败");
}
};
executor.execute(task);
@@ -278,4 +279,5 @@ public class SeckillController {
}
return Result.ok();
}
}