1.添加新的批量队列

This commit is contained in:
Wang Chen Chen
2023-11-30 11:04:40 +08:00
parent c1da9dd631
commit d6b0160513
8 changed files with 521 additions and 21 deletions

View File

@@ -16,7 +16,7 @@ User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Fi
"username": "admin",
"password": "123456",
"codeKey": "5jXzuwcoUzbtnHNh",
"codeText": "vkmk"
"codeText": "k6uw"
}
> {%

View File

@@ -22,7 +22,7 @@ x-project-id: {{projectId}}
Authorization: Bearer {{tokenValue}}
{
"deviceName": "牛逼网关",
"deviceName": "牛逼网关213",
"status": "0"
}

View File

@@ -0,0 +1,290 @@
package com.xaaef.molly.common.fqueue;
import com.xaaef.molly.common.fqueue.enums.BatchReason;
import com.xaaef.molly.common.fqueue.options.BatchingOption;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class FQueue<E> {
/**
* FQueue global registry
*/
private final FQueueRegistry registry;
/**
* Explicit class object
*/
private final Class<E> clazz;
/**
* Receiving queue of FQueue
*/
private final LinkedBlockingQueue<E> queue;
/**
* Broadcaster object, useful to allow FQueueConsumer push objects in other FQueues
*/
private final FQueueBroadcast broadcaster;
/**
* Internal executor service. Every FQueue has it's own executor service
*/
protected ExecutorService executorService;
/**
* Counters for received, batched objects
*/
private final AtomicLong received = new AtomicLong();
private final AtomicLong batched = new AtomicLong();
/**
* Batching Option
*/
private BatchingOption<E> batchingOption;
/**
* FanOut settings. FQueue could be set in fan-out mode which means that current FQueue act as round robin dispatcher
* while others FQueue child will do the work. This increase parallelism balancing resources.
*/
private int fanOut = 1;
/**
* Collection that handle child FQueue when fanOut value is greater than 1.
*/
private final List<FQueue<E>> childFQueues = new ArrayList<>();
/**
* Exception handler, by default it prints the stacktrace.
*/
private Consumer<Exception> exceptionHandler = Throwable::printStackTrace;
/**
* NOOP handler, this is fired when flushing have 0 elements.
*/
private Consumer<Void> noopHandler = null;
public FQueue(Class<E> clazz, FQueueRegistry registry) {
this.broadcaster = Optional.ofNullable(registry).map(FQueueBroadcast::new).orElse(null);
this.registry = registry;
this.clazz = clazz;
this.queue = new LinkedBlockingQueue<>();
this.executorService = Executors.newFixedThreadPool(1);
BatchingOption.newBuilder(this).done();
}
/**
* Start batching option flow.
*
* @see BatchingOption
*/
public BatchingOption.Builder<E> batch() {
return BatchingOption.newBuilder(this);
}
/**
* Set batching options
*/
public void setBatchingOption(BatchingOption<E> batchingOption) {
this.batchingOption = batchingOption;
}
/**
* When this is greater than 1, the fan-out flow will build.
* - N FQueue child objects will be build based on fanOut number.
* - This instance will act as dispatcher, send objects to FQueue child in round-robin mode.
*/
public FQueue<E> fanOut(int num) {
fanOut = num;
return this;
}
/**
* Start consuming queue
* - N FQueue child objects will be build based on fanOut number.
* - This instance will act as dispatcher, send objects to FQueue child in round-robin mode.
*/
public FQueue<E> consume(Consumer<List<E>> consumer) {
if (fanOut == 1) {
executorService.submit(Objects.requireNonNull(consumeBatching(consumer)));
} else {
IntStream.range(0, fanOut)
.forEach(i -> {
FQueue<E> child = new FQueue<>(clazz, registry);
child.setBatchingOption(this.batchingOption);
child.withNoopHandler(noopHandler);
child.withRunningExceptionHandler(exceptionHandler);
child.consume(consumer);
childFQueues.add(child);
});
executorService.submit(Objects.requireNonNull(consumeDispatching()));
}
return this;
}
/**
* Input class
*/
public Class<E> getInputClass() {
return this.clazz;
}
/**
* Instance queue
*/
public BlockingQueue<E> getQueue() {
return this.queue;
}
/**
* Instance queue
*/
public void add(E val) {
this.getQueue().add(val);
}
/**
* Instance queue
*/
public void addAll(LinkedList<E> valList) {
this.getQueue().addAll(valList);
}
/**
* Get stats
*/
public List<String> getStats() {
List<String> base = new ArrayList<>();
base.add("FQueue<" + clazz.getSimpleName() + ">");
base.add("FQueue<" + clazz.getSimpleName() + "> a) QueueSize: " + queue.size());
base.add("FQueue<" + clazz.getSimpleName() + "> b) Received: " + received.get());
base.add("FQueue<" + clazz.getSimpleName() + "> c) Batched: " + batched.get());
base.add("FQueue<" + clazz.getSimpleName() + "> d) Produced: " + Optional.ofNullable(broadcaster).map(FQueueBroadcast::getProduced).orElse(0L));
if (fanOut != 1) {
base.addAll(childFQueues.stream().map(f -> "|-----> " + f.getStats()).collect(Collectors.toList()));
}
return base;
}
/**
* When Executor thread terminates or is interrupted an exception will be fired, this will react.
* By default it prints the stacktrace.
*/
public FQueue<E> withRunningExceptionHandler(Consumer<Exception> handler) {
exceptionHandler = handler;
return this;
}
/**
* NOOP handler, this is fired when flushing collection have 0 elements.
*/
public FQueue<E> withNoopHandler(Consumer<Void> handler) {
noopHandler = handler;
return this;
}
/**
* Destroy executor service
*/
public void destroy() {
if (fanOut != 1) {
childFQueues.forEach(FQueue::destroy);
}
executorService.shutdownNow();
}
/**
* Destroy executor service and await
*/
public void destroyAndAwait(Integer timeout, TimeUnit timeUnit) throws InterruptedException {
if (fanOut != 1) {
for (FQueue child : childFQueues) {
child.destroyAndAwait(timeout, timeUnit);
}
}
executorService.shutdownNow();
executorService.awaitTermination(timeout, timeUnit);
}
private Runnable consumeBatching(Consumer<List<E>> consumer) {
final int maxSize = batchingOption.getChunkSize();
final TimeUnit timeUnit = batchingOption.getFlushTimeUnit();
final int timeout = batchingOption.getFlushTimeout();
final Function<E, Integer> lengthFunction = batchingOption.getLengthFunction();
return () -> {
// This is thread safe.
while (!Thread.currentThread().isInterrupted()) {
try {
int threshold = 0;
E elm;
List<E> collection = new ArrayList<>();
long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
BatchReason reason = BatchReason.MAX_ELEMENT_REACHED;
do {
elm = queue.poll(1, TimeUnit.NANOSECONDS);
if (elm == null) { // not enough elements immediately available; will have to poll
elm = queue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
if (elm == null) {
reason = BatchReason.TIME_FLUSH;
break; // we already waited enough, and there are no more elements in sight
}
received.incrementAndGet();
collection.add(elm);
threshold += lengthFunction.apply(elm);
} else {
received.incrementAndGet();
collection.add(elm);
threshold += lengthFunction.apply(elm);
}
}
while (threshold < maxSize);
if (!collection.isEmpty()) {
batched.addAndGet(collection.size());
consumer.accept(collection);
} else {
Optional.ofNullable(noopHandler).ifPresent(handler -> handler.accept(null));
}
} catch (Exception ex) {
exceptionHandler.accept(ex);
}
}
};
}
private Runnable consumeDispatching() {
return () -> {
int counter = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
E elm = queue.take();
received.incrementAndGet();
childFQueues.get(counter++ % fanOut).getQueue().add(elm);
batched.incrementAndGet();
// INT OVERFLOW CHECK
if (counter >= fanOut) {
counter = 0;
}
} catch (Exception ex) {
exceptionHandler.accept(ex);
}
}
};
}
}

View File

@@ -0,0 +1,33 @@
package com.xaaef.molly.common.fqueue;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class FQueueBroadcast {
private final FQueueRegistry registry;
private final AtomicLong produced = new AtomicLong();
public FQueueBroadcast(FQueueRegistry registry) {
this.registry = registry;
}
public Long getProduced() {
return produced.get();
}
public <I> void sendBroadcast(I elm) {
produced.incrementAndGet();
FQueueRegistry.sendBroadcast(elm);
}
public <I> void sendBroadcast(Class<I> clazz, List<I> elm) {
produced.addAndGet(elm.size());
registry.sendBroadcast(clazz, elm);
}
public void incrementCounter(int size) {
produced.addAndGet(size);
}
}

View File

@@ -0,0 +1,68 @@
package com.xaaef.molly.common.fqueue;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class FQueueRegistry {
private final static Map<Class, Set<FQueue<?>>> serviceMap = new ConcurrentHashMap<>();
private final static FQueueRegistry registry = new FQueueRegistry();
/**
* Build FQueue and register it into registry
*/
public static <E> FQueue<E> buildFQueue(Class<E> clazz) {
FQueue<E> obj = new FQueue<>(clazz, registry);
registerObject(obj);
return obj;
}
/**
* Get global stats of all registered FQueue
*/
public static List<String> getStatuses() {
return serviceMap.values()
.stream()
.flatMap(Collection::stream)
.flatMap(q -> q.getStats().stream()).collect(Collectors.toList());
}
/**
* Send element to all FQueue which listening for it's class
*/
public static <E> void sendBroadcast(E obj) {
getMapSet(obj.getClass())
.ifPresent(l -> l.forEach(FQueue -> {
FQueue<E> q = (FQueue<E>) FQueue;
q.getQueue().add(obj);
}));
}
/**
* Send elements to all FQueue which listening for it's class
*/
public <E> void sendBroadcast(Class<E> clazz, List<E> obj) {
getMapSet(clazz)
.ifPresent(l -> l.forEach(FQueue -> {
FQueue<E> q = (FQueue<E>) FQueue;
q.getQueue().addAll(obj);
}));
}
private static <E> void registerObject(FQueue<E> obj) {
Set<FQueue<?>> set = serviceMap.getOrDefault(obj.getInputClass(), new HashSet<>());
set.add(obj);
serviceMap.putIfAbsent(obj.getInputClass(), set);
}
private static Optional<Set<FQueue<?>>> getMapSet(Class clazz) {
Set<FQueue<?>> services = serviceMap.get(clazz);
return Optional.ofNullable(services);
}
}

View File

@@ -0,0 +1,11 @@
package com.xaaef.molly.common.fqueue.enums;
public enum BatchReason {
MAX_ELEMENT_REACHED,
TIME_FLUSH,
NOOP
}

View File

@@ -0,0 +1,87 @@
package com.xaaef.molly.common.fqueue.options;
import com.xaaef.molly.common.fqueue.FQueue;
import lombok.Getter;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@Getter
public class BatchingOption<E> {
/**
* Batch chunkSize: the max size of chunk based on lengthFunction (default is simple element count
*/
private final int chunkSize;
/**
* Flush timeout: Unit of flush timeout.
* Example: if this is 5 and flushTimeUnit is SECONDS, it means 5 SECONDS.
*/
private final int flushTimeout;
/**
* Flush time unit: Time unit for calculate flush
*/
private final TimeUnit flushTimeUnit;
/**
* Custom length function:
* by default is 1 which means that count is based on elements
*/
private final Function<E, Integer> lengthFunction;
private BatchingOption(Builder builder) {
chunkSize = builder.chunkSize;
flushTimeout = builder.flushTimeout;
flushTimeUnit = builder.flushTimeUnit;
lengthFunction = builder.lengthFunction;
}
public static <E> Builder<E> newBuilder(FQueue<E> caller) {
return new Builder<>(caller);
}
@SuppressWarnings("PMD.AvoidFieldNameMatchingMethodName")
public static final class Builder<E> {
private FQueue<E> caller;
private int chunkSize = 1;
private int flushTimeout = 10;
private TimeUnit flushTimeUnit = TimeUnit.MILLISECONDS;
private Function<E, Integer> lengthFunction = e -> 1;
private Builder(FQueue<E> caller) {
this.caller = caller;
}
public Builder<E> withChunkSize(int val) {
chunkSize = val;
return this;
}
public Builder<E> withFlushTimeout(int val) {
flushTimeout = val;
return this;
}
public Builder<E> withFlushTimeUnit(TimeUnit val) {
flushTimeUnit = val;
return this;
}
public Builder<E> withLengthFunction(Function<E, Integer> val) {
lengthFunction = val;
return this;
}
public FQueue<E> done() {
caller.setBatchingOption(new BatchingOption<>(this));
return caller;
}
}
}

View File

@@ -1,6 +1,7 @@
package com.xaaef.molly.monitor.api.impl;
import com.xaaef.molly.common.util.BatchQueueUtils;
import com.xaaef.molly.common.fqueue.FQueue;
import com.xaaef.molly.common.fqueue.FQueueRegistry;
import com.xaaef.molly.common.util.JsonUtils;
import com.xaaef.molly.internal.api.ApiLogStorageService;
import com.xaaef.molly.internal.dto.LoginLogDTO;
@@ -14,8 +15,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -33,16 +33,21 @@ import java.util.stream.Collectors;
@Service
public class ApiLogStorageServiceImpl implements ApiLogStorageService {
private final BatchQueueUtils<LoginLogDTO> loginLogQueueHandler;
private final FQueue<LoginLogDTO> loginLogQueueHandler;
private final BatchQueueUtils<OperLogDTO> operLogQueueHandler;
private final FQueue<OperLogDTO> operLogQueueHandler;
public ApiLogStorageServiceImpl(LmsLoginLogService loginLogService, LmsOperLogService operLogService) {
var executorService = Executors.newFixedThreadPool(1);
loginLogQueueHandler = BatchQueueUtils.bufferRate(
100, Duration.ofSeconds(10), executorService,
list -> {
// 批量操作队列。100条数据或者10秒钟提交一次
loginLogQueueHandler = FQueueRegistry.buildFQueue(LoginLogDTO.class)
.fanOut(2)
.batch()
.withChunkSize(100)
.withFlushTimeout(10)
.withFlushTimeUnit(TimeUnit.SECONDS)
.done()
.consume(list -> {
var start = System.currentTimeMillis();
// 根据 索引名称 分组
list.stream()
@@ -62,21 +67,27 @@ public class ApiLogStorageServiceImpl implements ApiLogStorageService {
log.info("批量保存登录日志 数量: {} , 耗时: {} ms", list.size(), end);
});
operLogQueueHandler = BatchQueueUtils.bufferRate(
100, Duration.ofSeconds(10), executorService,
list -> {
operLogQueueHandler = FQueueRegistry.buildFQueue(OperLogDTO.class)
.fanOut(2)
.batch()
.withChunkSize(100)
.withFlushTimeout(10)
.withFlushTimeUnit(TimeUnit.SECONDS)
.done()
.consume(list -> {
var start = System.currentTimeMillis();
// 根据 索引名称 分组
list.stream()
.collect(Collectors.groupingBy(OperLogDTO::getTenantId))
.forEach((tenantId, values) -> {
var collect = values.stream().map(source -> {
var target = new LmsOperLog();
BeanUtils.copyProperties(source, target);
target.setMethodArgs(JsonUtils.toJson(source.getMethodArgs()));
target.setResponseResult(JsonUtils.toJson(source.getResponseResult()));
return target;
}).collect(Collectors.toSet());
var collect = values.stream()
.map(source -> {
var target = new LmsOperLog();
BeanUtils.copyProperties(source, target);
target.setMethodArgs(JsonUtils.toJson(source.getMethodArgs()));
target.setResponseResult(JsonUtils.toJson(source.getResponseResult()));
return target;
}).collect(Collectors.toSet());
if (!collect.isEmpty()) {
DelegateUtils.delegate(tenantId, () -> operLogService.saveBatch(collect));
}