码友<tukangzheng>的建议 引入disruptor高效队列

This commit is contained in:
小柒2012
2018-05-22 19:22:06 +08:00
parent 3c820872f0
commit 9fdf460eb4
11 changed files with 277 additions and 1 deletions

View File

@@ -89,6 +89,12 @@
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
<!-- disruptor 高效队列-->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
<build>
<finalName>spring-boot-seckill</finalName><plugins>

View File

@@ -0,0 +1,40 @@
package com.itstyle.seckill.common.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(SpringUtil.applicationContext == null) {
SpringUtil.applicationContext = applicationContext;
}
System.out.println("========科帮网(https://blog.52itstyle.com)========");
}
//获取applicationContext
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
//通过name获取 Bean.
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
//通过class获取Bean.
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
//通过name,以及Clazz返回指定的Bean
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}

View File

@@ -0,0 +1,40 @@
package com.itstyle.seckill.common.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 案例测试
* @author 科帮网
*/
public class LockDemo {
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
lockDemo();
}
/**
* 10万以下数据synchronized优于Lock
* 10万以上数据Lock优于synchronized
*/
public static void lockDemo(){
long start = System.currentTimeMillis();
for(int i=0;i<1000000;i++){
final int num = i;
new Runnable() {
@Override
public void run() {
sync(num);
}
}.run();
}
long end = System.currentTimeMillis();
System.out.println(end-start);
}
public static void lock(int i){
lock.lock();
lock.unlock();
}
public static synchronized void sync(int i){
}
}

View File

@@ -0,0 +1,32 @@
package com.itstyle.seckill.queue.disruptor;
import java.util.concurrent.ThreadFactory;
import org.springframework.stereotype.Component;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
@Component
public class DisruptorUtil {
static Disruptor<SeckillEvent> disruptor = null;
static{
SeckillEventFactory factory = new SeckillEventFactory();
int ringBufferSize = 1024;
ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable runnable) {
return new Thread(runnable);
}
};
disruptor = new Disruptor<SeckillEvent>(factory, ringBufferSize, threadFactory);
disruptor.handleEventsWith(new SeckillEventConsumer());
disruptor.start();
}
public static void producer(SeckillEvent kill){
RingBuffer<SeckillEvent> ringBuffer = disruptor.getRingBuffer();
SeckillEventProducer producer = new SeckillEventProducer(ringBuffer);
producer.seckill(kill.getSeckillId(),kill.getUserId());
}
}

View File

@@ -0,0 +1,34 @@
package com.itstyle.seckill.queue.disruptor;
import java.io.Serializable;
/**
* 事件对象(秒杀事件)
* 创建者 科帮网
*/
public class SeckillEvent implements Serializable {
private static final long serialVersionUID = 1L;
private long seckillId;
private long userId;
public SeckillEvent(){
}
public long getSeckillId() {
return seckillId;
}
public void setSeckillId(long seckillId) {
this.seckillId = seckillId;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
}

View File

@@ -0,0 +1,18 @@
package com.itstyle.seckill.queue.disruptor;
import com.itstyle.seckill.common.config.SpringUtil;
import com.itstyle.seckill.service.ISeckillService;
import com.lmax.disruptor.EventHandler;
/**
* 秒杀消费者
* 创建者 科帮网
*/
public class SeckillEventConsumer implements EventHandler<SeckillEvent> {
private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");
public void onEvent(SeckillEvent seckillEvent, long seq, boolean bool) throws Exception {
seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId());
}
}

View File

@@ -0,0 +1,15 @@
package com.itstyle.seckill.queue.disruptor;
import com.lmax.disruptor.EventFactory;
/**
* 事件生成工厂(用来初始化预分配事件对象)
* 创建者 科帮网
*/
public class SeckillEventFactory implements EventFactory<SeckillEvent> {
public SeckillEvent newInstance() {
return new SeckillEvent();
}
}

View File

@@ -0,0 +1,33 @@
package com.itstyle.seckill.queue.disruptor;
import java.util.concurrent.ThreadFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
/**
* 測試類
* 创建者 科帮网
*/
public class SeckillEventMain {
public static void main(String[] args) {
producerWithTranslator();
}
public static void producerWithTranslator(){
SeckillEventFactory factory = new SeckillEventFactory();
int ringBufferSize = 1024;
ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable runnable) {
return new Thread(runnable);
}
};
Disruptor<SeckillEvent> disruptor = new Disruptor<SeckillEvent>(factory, ringBufferSize, threadFactory);
disruptor.handleEventsWith(new SeckillEventConsumer());
disruptor.start();
RingBuffer<SeckillEvent> ringBuffer = disruptor.getRingBuffer();
SeckillEventProducer producer = new SeckillEventProducer(ringBuffer);
for(long i = 0; i<10; i++){
producer.seckill(i, i);
}
}
}

View File

@@ -0,0 +1,28 @@
package com.itstyle.seckill.queue.disruptor;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;
/**
* 使用translator方式生产者
* 创建者 科帮网
*/
public class SeckillEventProducer {
private final static EventTranslatorVararg<SeckillEvent> translator = new EventTranslatorVararg<SeckillEvent>() {
public void translateTo(SeckillEvent seckillEvent, long seq, Object... objs) {
seckillEvent.setSeckillId((Long) objs[0]);
seckillEvent.setUserId((Long) objs[1]);
}
};
private final RingBuffer<SeckillEvent> ringBuffer;
public SeckillEventProducer(RingBuffer<SeckillEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}
public void seckill(long seckillId, long userId){
this.ringBuffer.publishEvent(translator, seckillId, userId);
}
}

View File

@@ -18,7 +18,7 @@ import com.itstyle.seckill.common.entity.SuccessKilled;
import com.itstyle.seckill.common.enums.SeckillStatEnum;
import com.itstyle.seckill.repository.SeckillRepository;
import com.itstyle.seckill.service.ISeckillService;
@Service
@Service("seckillService")
public class SeckillServiceImpl implements ISeckillService {
/**
* 思考为什么不用synchronized

View File

@@ -16,6 +16,8 @@ import org.springframework.web.bind.annotation.RestController;
import com.itstyle.seckill.common.entity.Result;
import com.itstyle.seckill.common.entity.SuccessKilled;
import com.itstyle.seckill.queue.disruptor.DisruptorUtil;
import com.itstyle.seckill.queue.disruptor.SeckillEvent;
import com.itstyle.seckill.queue.jvm.SeckillQueue;
import com.itstyle.seckill.service.ISeckillService;
@Api(tags ="秒杀")
@@ -226,4 +228,32 @@ public class SeckillController {
}
return Result.ok();
}
@ApiOperation(value="秒杀柒(Disruptor队列)",nickname="科帮网")
@PostMapping("/startDisruptorQueue")
public Result startDisruptorQueue(long seckillId){
seckillService.deleteSeckill(seckillId);
final long killId = seckillId;
LOGGER.info("开始秒杀八(正常)");
for(int i=0;i<1000;i++){
final long userId = i;
Runnable task = new Runnable() {
@Override
public void run() {
SeckillEvent kill = new SeckillEvent();
kill.setSeckillId(killId);
kill.setUserId(userId);
DisruptorUtil.producer(kill);
}
};
executor.execute(task);
}
try {
Thread.sleep(10000);
Long seckillCount = seckillService.getSeckillCount(seckillId);
LOGGER.info("一共秒杀出{}件商品",seckillCount);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Result.ok();
}
}