@[toc] 注:学习这篇文章之前推荐先对Disruptor的使用有了解,否则我的代码中即使有非常详细的注释你也并不能理解这些注释的作用,以及为什么我要需要这样子编写代码。 同时,这将会是网关系列最后一篇文章,由于文章写的比较赶,所以很多细节并没有提到,我会在后续有空时对文章进行进一步的修改。 确保大家都能理解项目的意义
什么是缓冲区队列
JDK中提供的一些队列,他们之间包含了有锁的实现,也包含了无锁的实现,这意味着在并发情况下,如果是不支持线程安全的队列,则会出现线程不安全、线程覆盖、数据丢失等线程安全问题。
所以我们需要使用线程安全的队列来保证线程安全,如下是JDK中提供的线程安全的队列。
但是他们之间有一些问题,比如有锁队列性能稍差但是更安全,他是有界的,无锁队列性能好但是无界,无界意味着容易出现OOM等问题。所以我们肯定首先排除使用无界队列。
当然并不是说无界队列就没有用,只是在某些场景下我们需要剔除他们,不使用他们。
Queue Type | Data Structure | Key Technique | Has Lock | Bounded | Lock Type |
---|---|---|---|---|---|
ArrayBlockingQueue | Array | Reentrant | Yes | Yes | Lock |
LinkedBlockingQueue | Linked List | Reentrant | Yes | Yes | Lock |
LinkedTransferQueue | Linked List | CAS | No | No | CAS |
ConcurrentLinkedQueue | Linked List | CAS | No | No | CAS |
我们在开发中使用的比较多的就是ArrayBlockingQueue了,底层基于数组,使用的是ReentrantLock来提供线程安全的有锁访问。 当然,由于有锁,所以性能稍差一些,并且底层数组也意味着其容量受到了一定的限制。 所以,我们希望有更好的性能,并且希望队列无界的同时保证不出现OOM,那么是否存在这样的队列? 是的,这篇文章我就将基于Disruptor队列来优化项目性能。提供网关缓冲区。
首先明确一点,之所以要抛弃ArrayBlockingQueue的原因是因为使用ReentrantLock的性能小于CAS,而使用CAS的性能小于无锁性能。
所以我们至少应该将使用Lock锁的方式替换为CAS,毕竟如果获取锁失败,是需要进行等待的,那么此时线程就只能阻塞,同时还得保证底层不直接使用数组,因为使用数组意味着有界。并且扩容数组也是一部分的性能开销。
Disruptor高性能的原因
Disruptor在如下几点上进行了优化,使得其提供了一个高性能的队列。
-
无阻塞算法: Disruptor内部使用一系列的无锁(lock-free)算法,例如CAS(Compare and Swap)等,来实现高效的并发操作。这些算法的使用减少了竞争条件,提高了系统的并发性。
-
解决伪共享问题 在 Disruptor 的设计中,关键的优化是通过缓存行填充(Cache Line Padding)来避免伪共享。伪共享通常发生在多个线程同时修改共享缓存行内的不同变量,导致不必要的缓存同步。通过在缓存行内填充一些无关的变量,可以确保不同变量不共享同一个缓存行,从而减少了伪共享的影响。 具体来说,Disruptor 在设计 Ring Buffer(环形缓冲区)时,通过在每个槽(slot)之间填充 padding 变量,使得相邻的槽不会共享同一缓存行。这样,当一个线程修改一个槽时,不会影响到其他槽,减少了缓存同步的开销。 在 Disruptor 中,对于 Java 对象的数组,其大小通常是 2 的幂次方。这样,每个槽之间的距离正好是缓存行的大小。这种设计有效地解决了伪共享的问题,提高了 Disruptor 的性能。 需要注意的是,这种缓存行填充的做法可能在某些情况下会增加内存的消耗,但相对于性能提升而言,这是一个可以接受的权衡。
-
环形缓冲区(Ring Buffer): Disruptor内部使用环形缓冲区作为数据存储结构,这种数据结构的设计使得读写操作可以在不涉及锁的情况下高效进行。生产者和消费者可以在缓冲区上独立进行读写操作,减少了线程之间的竞争。
Disruptor实战
这里额外补充一个知识点,就是Disruptor的等待策略。
Disruptor 中的等待策略(Wait Strategy)是用于在消费者等待可用事件时决定其行为的一种机制。不同的等待策略在不同的场景中有不同的性能表现和行为特点。以下是 Disruptor 中常见的几种等待策略及其区别:
-
BlockingWaitStrategy(阻塞等待策略): BlockingWaitStrategy 是最基本的等待策略,它使用 Object.wait() 和 Object.notifyAll() 方法来进行线程间的通信。 当消费者等待事件时,会释放 CPU 资源,降低了消费者线程的活跃度,适合于线程数较少的场景。 SleepingWaitStrategy(自旋等待策略):
-
SleepingWaitStrategy 在消费者等待事件时使用自旋的方式,避免了阻塞,但在一定时间内如果没有获取到事件,会进入睡眠状态。 适用于对低延迟要求较高的场景,但可能会占用一定的 CPU 资源。 YieldingWaitStrategy(礼让等待策略):
-
YieldingWaitStrategy 在消费者等待事件时会尝试进行自旋,如果自旋一定次数后仍未获取到事件,则会进行线程礼让(Yield)。 适用于对低延迟要求高的场景,但可能占用较多的 CPU 资源。 BusySpinWaitStrategy(忙等待策略):
-
BusySpinWaitStrategy 是一种非常简单的等待策略,它会一直自旋等待事件的到来,不进行任何的线程礼让或睡眠。 适用于对延迟极为敏感的场景,但可能会占用大量的 CPU 资源。 PhasedBackoffWaitStrategy(分阶段退避等待策略):
-
PhasedBackoffWaitStrategy 是一种自适应的等待策略,会根据不同的等待阶段选择不同的等待方式,例如自旋、睡眠等。 可以在不同的场景中平衡延迟和 CPU 资源占用。
接下来我们开始实现使用Disruptor的一些必要条件: 我们先自定义一个事件监听器
java复制代码public interface EventListener<E> {
void onEvent(E event);
/**
*
* @param ex
* @param sequence 异常执行顺序
* @param event
*/
void onException(Throwable ex,long sequence,E event);
}
并且实现一个并发多线程使用的队列接口
java复制代码public interface ParallelQueue<E> {
/**
* 添加元素
* @param event
*/
void add(E event);
void add(E... event);
/**
* 添加多个元素 返回是否添加成功的标志
* @param event
* @return
*/
boolean tryAdd(E event);
boolean tryAdd(E... event);
/**
* 启动
*/
void start();
/**
* 销毁
*/
void shutDown();
/**
* 判断是否已经销毁
*/
boolean isShutDown();
}
之后,我们基于Disruptor的要求,实现核心代码
java复制代码
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @param <E> 队列中存储的事件类型
* @author: ZhangBlossom
* @date: 2023/11/13 18:57
* @contact: QQ:4602197553
* @contact: WX:qczjhczs0114
* @blog: https://blog.csdn.net/Zhangsama1
* @github: https://github.com/ZhangBlossom
* 基于Disruptor实现的多生产者多消费组无锁队列
* 这个类的主要作用是基于 Disruptor 实现的多生产者多消费者无锁队列,
* 通过 Builder 模式进行灵活的参数配置。其中使用了 Disruptor 的一些核心概念,如
* RingBuffer、WaitStrategy、WorkerPool 等,以实现高性能的事件处理。
*/
public class ParallelQueueHandler<E> implements ParallelQueue<E> {
/**
* 环形缓冲区 内部缓冲区存放我们的事件Holder类
*/
private RingBuffer<Holder> ringBuffer;
/**
* 事件监听器
*/
private EventListener<E> eventListener;
/**
* 工作线程池
*/
private WorkerPool<Holder> workerPool;
/**
* 线程池
*/
private ExecutorService executorService;
/**
* Disruptor 框架中的一个接口,用于在事件发布(publish)时将数据填充到事件对象中
*/
private EventTranslatorOneArg<Holder, E> eventTranslator;
/**
* 构造方法,通过 Builder 模式初始化 Disruptor 队列
*
* @param builder Disruptor 队列的构建器
*/
public ParallelQueueHandler(Builder<E> builder) {
this.executorService = Executors.newFixedThreadPool(builder.threads,
new ThreadFactoryBuilder().setNameFormat("ParallelQueueHandler" + builder.namePrefix + "-pool-%d").build());
this.eventListener = builder.listener;
this.eventTranslator = new HolderEventTranslator();
// 创建 RingBuffer
RingBuffer<Holder> ringBuffer = RingBuffer.create(builder.producerType, new HolderEventFactory(),
builder.bufferSize, builder.waitStrategy);
// 通过 RingBuffer 创建屏障 (固定流程)
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// 创建多个消费者组
WorkHandler<Holder>[] workHandlers = new WorkHandler[builder.threads];
for (int i = 0; i < workHandlers.length; i++) {
workHandlers[i] = new HolderWorkHandler();
}
// 创建多消费者线程池
WorkerPool<Holder> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, new HolderExceptionHandler(),
workHandlers);
// 设置多消费者的 Sequence 序号,主要用于统计消费进度,
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
this.workerPool = workerPool;
}
/**
* 将事件添加到队列
*
* @param event 要添加的事件
*/
@Override
public void add(E event) {
final RingBuffer<Holder> holderRing = ringBuffer;
if (holderRing == null) {
process(this.eventListener, new IllegalStateException("ParallelQueueHandler is close"), event);
}
try {
ringBuffer.publishEvent(this.eventTranslator, event);
} catch (NullPointerException e) {
process(this.eventListener, new IllegalStateException("ParallelQueueHandler is close"), event);
}
}
/**
* 将多个事件添加到队列
*
* @param events 要添加的事件数组
*/
@Override
public void add(E... events) {
final RingBuffer<Holder> holderRing = ringBuffer;
if (holderRing == null) {
process(this.eventListener, new IllegalStateException("ParallelQueueHandler is close"), events);
}
try {
ringBuffer.publishEvents(this.eventTranslator, events);
} catch (NullPointerException e) {
process(this.eventListener, new IllegalStateException("ParallelQueueHandler is close"), events);
}
}
/**
* 尝试将事件添加到队列
*
* @param event 要添加的事件
* @return 是否成功添加
*/
@Override
public boolean tryAdd(E event) {
final RingBuffer<Holder> holderRing = ringBuffer;
if (holderRing == null) {
return false;
}
try {
return ringBuffer.tryPublishEvent(this.eventTranslator, event);
} catch (NullPointerException e) {
return false;
}
}
/**
* 尝试将多个事件添加到队列
*
* @param events 要添加的事件数组
* @return 是否成功添加
*/
@Override
public boolean tryAdd(E... events) {
final RingBuffer<Holder> holderRing = ringBuffer;
if (holderRing == null) {
return false;
}
try {
return ringBuffer.tryPublishEvents(this.eventTranslator, events);
} catch (NullPointerException e) {
return false;
}
}
/**
* 启动队列
*/
@Override
public void start() {
this.ringBuffer = workerPool.start(executorService);
}
/**
* 关闭队列
*/
@Override
public void shutDown() {
RingBuffer<Holder> holder = ringBuffer;
ringBuffer = null;
if (holder == null) {
return;
}
if (workerPool != null) {
workerPool.drainAndHalt();
}
if (executorService != null) {
executorService.shutdown();
}
}
/**
* 判断队列是否关闭
*
* @return 队列是否关闭
*/
@Override
public boolean isShutDown() {
return ringBuffer == null;
}
/**
* 处理异常的静态方法,用于调用事件监听器的异常处理方法
*
* @param listener 事件监听器
* @param e 异常
* @param event 事件
* @param <E> 事件类型
*/
private static <E> void process(EventListener<E> listener, Throwable e, E event) {
listener.onException(e, -1, event);
}
/**
* 处理异常的静态方法,用于调用事件监听器的异常处理方法
*
* @param listener 事件监听器
* @param e 异常
* @param events 事件数组
* @param <E> 事件类型
*/
private static <E> void process(EventListener<E> listener, Throwable e, E... events) {
for (E event : events) {
process(listener, e, event);
}
}
/**
* Builder 建造者模式
*
* @param <E> 队列中存储的事件类型
*/
public static class Builder<E> {
/**
* 生产者类型 默认使用多生产者类型
*/
private ProducerType producerType = ProducerType.MULTI;
/**
* 线程队列大小
*/
private int bufferSize = 1024 * 16;
/**
* 工作线程默认为1
*/
private int threads = 1;
/**
* 前缀 定位模块
*/
private String namePrefix = "";
/**
* 等待策略
*/
private WaitStrategy waitStrategy = new BlockingWaitStrategy();
/**
* 监听器
*/
private EventListener<E> listener;
// 设置生产者类型,默认为多生产者类型
public Builder<E> setProducerType(ProducerType producerType) {
Preconditions.checkNotNull(producerType);
this.producerType = producerType;
return this;
}
// 设置线程队列大小,要求是2的幂次方
public Builder<E> setBufferSize(int bufferSize) {
Preconditions.checkArgument(Integer.bitCount(bufferSize) == 1);
this.bufferSize = bufferSize;
return this;
}
// 设置工作线程数
public Builder<E> setThreads(int threads) {
Preconditions.checkArgument(threads > 0);
this.threads = threads;
return this;
}
// 设置线程名前缀
public Builder<E> setNamePrefix(String namePrefix) {
Preconditions.checkNotNull(namePrefix);
this.namePrefix = namePrefix;
return this;
}
// 设置等待策略,默认为 BlockingWaitStrategy
public Builder<E> setWaitStrategy(WaitStrategy waitStrategy) {
Preconditions.checkNotNull(waitStrategy);
this.waitStrategy = waitStrategy;
return this;
}
// 设置事件监听器
public Builder<E> setListener(EventListener<E> listener) {
Preconditions.checkNotNull(listener);
this.listener = listener;
return this;
}
// 构建 ParallelQueueHandler 对象
public ParallelQueueHandler<E> build() {
return new ParallelQueueHandler<>(this);
}
}
/**
* 事件对象
*/
public class Holder {
/**
* 事件
*/
private E event;
// 设置事件的值
public void setValue(E event) {
this.event = event;
}
// 重写 toString 方法,用于调试时打印事件信息
@Override
public String toString() {
return "Holder{" + "event=" + event + '}';
}
}
// 异常处理器
private class HolderExceptionHandler implements ExceptionHandler<Holder> {
@Override
public void handleEventException(Throwable throwable, long l, Holder event) {
Holder holder = (Holder) event;
try {
eventListener.onException(throwable, l, holder.event);
} catch (Exception e) {
// 异常处理时出现异常的话,可以在这里进行额外的处理
} finally {
holder.setValue(null);
}
}
@Override
public void handleOnStartException(Throwable throwable) {
throw new UnsupportedOperationException(throwable);
}
@Override
public void handleOnShutdownException(Throwable throwable) {
throw new UnsupportedOperationException(throwable);
}
}
// 消费者工作处理器
private class HolderWorkHandler implements WorkHandler<Holder> {
@Override
public void onEvent(Holder holder) throws Exception {
// 调用事件监听器的处理事件方法
eventListener.onEvent(holder.event);
// 处理完事件后,将事件置为空,帮助 GC 回收资源
holder.setValue(null);
}
}
// 事件工厂,用于创建事件对象
private class HolderEventFactory implements EventFactory<Holder> {
@Override
public Holder newInstance() {
return new Holder();
}
}
// 事件翻译器,用于将事件数据填充到事件对象中
private class HolderEventTranslator implements EventTranslatorOneArg<Holder, E> {
@Override
public void translateTo(Holder holder, long l, E e) {
// 将事件数据填充到 Holder 对象中
holder.setValue(e);
}
}
}
这一套代码中,我们基于对Disruptor的了解提供了一些使用Disruptor中必须用到的一些配置,比如我们的RingBuffer。 之后,我们对原先的NettyCoreProcessor进行修改。 添加一个新的Netty处理器,并且整合Disruptor
java复制代码package blossom.project.core.netty.processor;
import blossom.project.common.enums.ResponseCode;
import blossom.project.core.Config;
import blossom.project.core.context.HttpRequestWrapper;
import blossom.project.core.disruptor.EventListener;
import blossom.project.core.disruptor.ParallelQueueHandler;
import blossom.project.core.helper.ResponseHelper;
import com.lmax.disruptor.dsl.ProducerType;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
/**
* @author: ZhangBlossom
* @date: 2023/11/13 23:57
* @contact: QQ:4602197553
* @contact: WX:qczjhczs0114
* @blog: https://blog.csdn.net/Zhangsama1
* @github: https://github.com/ZhangBlossom
*
*/
/**
* DisruptorNettyCoreProcessor 使用 Disruptor 提高性能的 Netty 处理器。
* 这个处理器是一个缓存层,通过 Disruptor 来异步处理 HTTP 请求,减轻 Netty 核心处理器的负担。
*/
@Slf4j
public class DisruptorNettyCoreProcessor implements NettyProcessor {
/**
* 线程前缀
*/
private static final String THREAD_NAME_PREFIX = "gateway-queue-";
private Config config;
/**
* Disruptor 只是缓存依然需要使用到 Netty 核心处理器
*/
private NettyCoreProcessor nettyCoreProcessor;
/**
* 处理类
*/
private ParallelQueueHandler<HttpRequestWrapper> parallelQueueHandler;
/**
* 构造方法,初始化 DisruptorNettyCoreProcessor。
*
* @param config 配置信息对象。
* @param nettyCoreProcessor Netty 核心处理器。
*/
public DisruptorNettyCoreProcessor(Config config, NettyCoreProcessor nettyCoreProcessor) {
this.config = config;
this.nettyCoreProcessor = nettyCoreProcessor;
// 使用 Disruptor 创建并配置处理队列。
ParallelQueueHandler.Builder<HttpRequestWrapper> builder = new ParallelQueueHandler.Builder<HttpRequestWrapper>()
.setBufferSize(config.getBufferSize())
.setThreads(config.getProcessThread())
.setProducerType(ProducerType.MULTI)
.setNamePrefix(THREAD_NAME_PREFIX)
.setWaitStrategy(config.getWaitStrategy());
// 监听事件处理类
BatchEventListenerProcessor batchEventListenerProcessor = new BatchEventListenerProcessor();
builder.setListener(batchEventListenerProcessor);
this.parallelQueueHandler = builder.build();
}
/**
* 处理 HTTP 请求,将请求添加到 Disruptor 处理队列中。
*
* @param wrapper HttpRequestWrapper 包装类。
*/
@Override
public void process(HttpRequestWrapper wrapper) {
this.parallelQueueHandler.add(wrapper);
}
/**
* 监听处理类,处理从 Disruptor 处理队列中取出的事件。
*/
public class BatchEventListenerProcessor implements EventListener<HttpRequestWrapper> {
@Override
public void onEvent(HttpRequestWrapper event) {
// 使用 Netty 核心处理器处理事件。
nettyCoreProcessor.process(event);
}
@Override
public void onException(Throwable ex, long sequence, HttpRequestWrapper event) {
HttpRequest request = event.getRequest();
ChannelHandlerContext ctx = event.getCtx();
try {
log.error("BatchEventListenerProcessor onException 请求写回失败,request:{}, errMsg:{} ", request, ex.getMessage(), ex);
// 构建响应对象
FullHttpResponse fullHttpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR);
if (!HttpUtil.isKeepAlive(request)) {
ctx.writeAndFlush(fullHttpResponse).addListener(ChannelFutureListener.CLOSE);
} else {
fullHttpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(fullHttpResponse);
}
} catch (Exception e) {
log.error("BatchEventListenerProcessor onException 请求写回失败,request:{}, errMsg:{} ", request, e.getMessage(), e);
}
}
}
/**
* 启动 DisruptorNettyCoreProcessor,启动处理队列。
*/
@Override
public void start() {
parallelQueueHandler.start();
}
/**
* 关闭 DisruptorNettyCoreProcessor,关闭处理队列。
*/
@Override
public void shutDown() {
parallelQueueHandler.shutDown();
}
}
原始的 NettyCoreProcessor 直接处理每个 HTTP 请求,而 DisruptorNettyCoreProcessor 使用了 Disruptor 框架,将 HTTP 请求异步地添加到一个处理队列中,然后由 BatchEventListenerProcessor 来处理这个队列中的事件。
Disruptor 是一个高性能的异步事件处理框架,它采用了无锁的设计,通过利用 RingBuffer 的结构,实现了高效的事件发布和消费。在这里,使用 Disruptor 的好处是可以提高并发处理能力,减轻了 Netty 核心处理器的负担。因为网络请求通常是 I/O 密集型的操作,通过异步处理可以提高系统的吞吐量。
同时,我们要在创建容器的时候使用我们的新NettyCoreProcessor,代码变更如下:
java复制代码@Override
public void init() {
NettyCoreProcessor nettyCoreProcessor = new NettyCoreProcessor();
//如果启动要使用多生产者多消费组 那么我们读取配置
if(BUFFER_TYPE_PARALLEL.equals(config.getBufferType())){
//开启配置的情况下使用Disruptor
this.nettyProcessor = new DisruptorNettyCoreProcessor(config,nettyCoreProcessor);
}else{
this. nettyProcessor = nettyCoreProcessor;
}
this.nettyHttpServer = new NettyHttpServer(config, nettyProcessor);
this.nettyHttpClient = new NettyHttpClient(config,
nettyHttpServer.getEventLoopGroupWoker());
}
@Override
public void start() {
nettyProcessor.start();
nettyHttpServer.start();;
nettyHttpClient.start();
log.info("api gateway started!");
}
@Override
public void shutdown() {
nettyProcessor.shutDown();
nettyHttpServer.shutdown();
nettyHttpClient.shutdown();
}
到此位置,我们就已经成功的对网关项目整合Disruptor来进一步提升网关性能。 到此位置,网关系列全文结束。 感谢你能看到这里。