Disruptor 是LMAX公司开源的一个高效的内存无锁队列,一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件-监听模式的实现,直接称disruptor模式。disruptor最大特点是高性能,其LMAX架构可以获得每秒6百万订单,用1微秒的延迟获得吞吐量为100K+。
disruptor 的关键优化设计:
SingleProducerSequencer
与 MultiProducerSequencer
分别用于单生产者和多生产者的场景ProcessingSequenceBarrier
用于协调生产者与消费者(如果某个slot中的事件还没有被所有消费者消费完毕,那么这个slot是不能被复用的,需要等待)disruptor 简单使用如下:
Disruptor<Element> disruptor = new Disruptor<>(Element::new, 1024,
DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(
(EventHandler<Element>) (event, sequence, endOfBatch) -> System.out.println(event.get()));
disruptor.start();
disruptor.publishEvent((event, sequence) -> event.set(1));
// sleep一下 让消费者可以执行到 因为消费线程是守护线程
Thread.sleep(1000);
disruptor 一个参数完整的构造函数如下
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}
上面通过5个参数构造出了一个RingBuffer
和一个Executor
,而这两个组件构成了一个Disruptor。
这里的RingBuffer除了存储事件的职能(DataProvider)还承担着申请sequence和publish event的职能。
Executor作为消费者线程池,主要是运行消费逻辑的。
因此可以说,Disruptor串联起了生产者、消费者以及RingBuffer
RingBuffer 的创建:
public static <E> RingBuffer<E> create(
ProducerType producerType,
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
switch (producerType)
{
// 单生产者
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
// 多生产者
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
// 以 singleProducer 为例
public static <E> RingBuffer<E> createSingleProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
// 创建一个 RingBuffer 需要 EventFactory 和 Sequencer
// Sequencer 主要来维护 sequence,发布事件等
return new RingBuffer<E>(factory, sequencer);
}
RingBuffer(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
// 父类 RingBufferFields 的构造函数
super(eventFactory, sequencer);
}
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
// 用来定位数组下标
this.indexMask = bufferSize - 1;
// RingBuffer 底层存储, 除了存储元素还有 padding 填充, 用来解决伪共享
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
// 预填充数组元素
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
整体的逻辑还是比较清晰, 需要注意的是两点:
EventFactory 是一次性使用的类,在最开始的时候用来给RingBuffer预填充数据。
为了避免JAVA GC带来的性能影响,Disruptor采用的设计是在数组上预填充好对象,每次publish event的时候,只是通过RingBuffer.get(seq)拿到对象,更新对象的值,然后就发布出去了。整个生产消费过程中再也不会有event对象的创建和销毁。
sequence 是用来表达event序例号的对象。为了高并发下的可见性,肯定不能直接用long的,至少也是volatile long。但Disruptor觉得volatile long还是不够用,所以创造了Sequence类。
内部还是持有了 volatile long
, 除此之外还支持:
在整个框架中可以看到在不同的类里,不同场景下对sequence的表达,有时用long,有时用的Sequence类,这其实是背后对于效率和高并发可见性的考量。
比如在对EventProcessor.sequence的更新中都是用的order writes,不保证立即可见,但速度快很多。在这个场景里,造成的结果是显示的消费进度可能比实际上慢,导致生产者有可能在可以生产的情况下没有去生产。但生产者看的是多个消费者中最慢的那个消费进度,所以影响可能没有那么大。
生产者 Sequencer 是 Disruptor 框架的核心类。
生产者发布 event 的时候首先需要预定一个 sequence,Sequencer 就是计算和发布 sequence 的。它主要有2个实现类: SingleProducerSequencer
和 MultiProducerSequencer
生产者 publishEvent 步骤:
但因为RingBuffer是环形的,一个 size 16 的RingBuffer当你拿到 sequence 为16时相当于又要去写 RingBuffer[0]
的位置了,假如之前的数据还没被消费过就会被覆盖了。Sequencer是这样解决这个问题的,它在内部维护了一个:
volatile Sequence[] gatingSequences = new Sequence[0];
每个消费者会维护一个自己的 Sequence 对象,来记录自己已经消费到的序例位置。 每添加一个消费者,都会把消费者的Sequence引用添加到 gatingSequences 中。 通过访问 gatingSequences,Sequencer可以得知消费的最慢的消费者消费到了哪个位置。
gatingSequences=[7, 8, 9, 10, 3, 4, 5, 6, 11]
8个消费者的例子,最慢的消费完了3,此时可以写seq 19的数据,但不能写seq 20(会覆盖 seq 4 的位置, 还没消费)
在next(n)方法里,如果计算出的下一个event的Sequence值减去bufferSize.得出来的 wrapPoint > min(gatingSequences)
,说明即将写入的位置上,之前的event还有消费者没有消费,这时SingleProducerSequencer会等待并自旋。
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
// 计算 wrapPoint 来检查消费进度
long nextValue = this.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
// wrapPoint>cachedGatingSequence 将发生绕环, 生产者覆盖未消费
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 如果 warpPoint>最小消费位置, 那么自旋等待
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
// 缓存最慢的消费者进度
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
举个例子,gatingSequences=[7, 8, 9, 10, 3, 4, 5, 6, 11], RingBuffer size 16的情况下,如果算出来的nextSequence是20,wrapPoint是20-16=4, 这时gatingSequences里最小的是3。
说明下一个打算写入的位置是wrapPoint 4,但最慢的消费者才消费到3,你不能去覆盖之前4上的数据,这时只能等待,等消费者把之前的4消费掉。
为什么wrapPoint = nextSequence - bufferSize,而不是bufferSize的n倍呢,因为消费者只能落后生产者一圈,不然就已经存在数据覆盖了。
等到SingleProducerSequencer自旋到下一个位置所有人都消费过的时候,它就可以从next方法中返回,生产者拿着sequence就可以继续去发布。
MultiProducerSequencer 是在多个生产者的场合使用的,多个生产者的情况下存在竞争,导致它的实现更加复杂。
相较于单生产者, 主要多出来的数据结构是 availableBuffer, 来记录RingBuffer上哪些位置有数据可以读:
int[] availableBuffer;
int indexMask;
int indexShift;
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
Sequencer.next(n)说起,计算下一个数据位Sequence的逻辑是一样的,包括消费者落后导致Sequencer自旋等待的逻辑。不同的是因为有多个publisher同时访问Sequencer.next(n)方法,所以在确定最终位置的时候用了一个CAS操作,如果失败了就自旋再来一次。
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
// 获取当前游标值, 初始值是-1
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
// 这里逻辑与 singleProducer 一样, 主要处理消费落后的自旋等待
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
// 多个publisher同时访问Sequencer.next(n)方法,在确定最终位置的时候用了一个CAS操作,如果失败了就自旋再来一次。
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
另一个不同的地方是 publish(final long sequence) 方法,SingleProducer的版本很简单,就是移动了一下cursor:
public void publish(long sequence)
{
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
MultiProducer的版本则去设置availableBuffer的状态位了。给定一个sequence,先计算出对应的数组下标 index,然后计算出在那个index上要写的数据 availabilityFlag。 index 即是槽位, availabilityFlag 则是当前槽位的圈数
public void publish(final long sequence)
{
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence)
{
// 计算 index 和 flag
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
// 计算当前 sequence 的 index
private int calculateIndex(final long sequence)
{
return ((int) sequence) & indexMask;
}
// 计算当前 sequence 经过的圈数
private int calculateAvailabilityFlag(final long sequence)
{
return (int) (sequence >>> indexShift);
}
private void setAvailableBufferValue(int index, int flag)
{
// 使用 unsafe 更新需要先计算出内存位置对应的地址
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
availableBuffer 主要用于判断一个 sequence 下的数据是否可用, MultiProducerSequencer 的 isAvailable:
public boolean isAvailable(long sequence)
{
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
SingleProducerSequencer 的方法如下:
public boolean isAvailable(long sequence)
{
return sequence <= cursor.get();
}
在单个生产者的场景下,publishEvent的时候才会推进 cursor,所以只要 sequence<=cursor,就说明数据是可消费的。
多个生产者的场景下,在next(n)方法中,就已经通过 cursor.compareAndSet(current, next) 移动cursor了,此时event还没有publish,所以cursor所在的位置不能保证event一定可用。
在publish方法中是去setAvailable(sequence)了,所以 availableBuffer 是数据是否可用的标志。那为什么值要写成圈数呢,应该是避免把上一轮的数据当成这一轮的数据,错误判断sequence是否可用。
当调用 disruptor.handleEventsWith
设置消息的处理器时,Event Handler
会被包装为 BatchEventProcessor
.
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
// 如果传入多个事件, 这里就创建多个 EventProcessor
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
// 创建 BatchEventProcessor
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// consumerRepository 就包含了 EventProcessorInfo
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
Disruptor 启动后消费过程:
EventProcessor extends Runnable
可以理解为是一个消费者线程的接口.
主要实现类是 BatchEventProcessor
, 主要属性是
DataProvider<T> dataProvider; // 就是RingBuffer, event容器
SequenceBarrier sequenceBarrier; // 用来获取可用event的sequence
EventHandler<? super T> eventHandler; // 真正消费event的业务代码
Sequence sequence = new Sequence(-1); // 该消费线程消费完的sequence位置
run 方法中processEvent
是主要的逻辑:
private void processEvents()
{
T event = null;
// 获取下一个消费位置
long nextSequence = sequence.get() + 1L;
// 死循环处理事件
while (true)
{
try
{
// 当没有事件时候, 从ProcessingSequenceBarrier获取可用的 availableSequence
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
BatchEventProcessor.processEvent
会先调用 sequnceBarries.waitFor
等待事件的产生。 SequnceBarries
的实现类是 ProcessingSequenceBarrier
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// 调用等待策略, 获取最新的事件编号
// 具体等待策略下文有介绍
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
// 如果当前可用的最新事件编号小于传入的 sequence,就直接返回可用编号即可
if (availableSequence < sequence)
{
return availableSequence;
}
// 查询最高可用 event 编号的位置
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
拿到的availableSequence可能比要求的nextSequence大,意味着生产者生产出了很多可以消费的 event。这时就会一个个去消费,并且更新BatchEventProcessor的sequence至availableSequence。此时Sequencer上的gatingSequences因为是引用的关系也会被更新。
调用 sequenceBarrier.waitFor(nextSequence)
时可能不会立即有新的event,这时的行为由 waitStrategy 决定,有多种实现,比如 BlockingWaitStrategy。
Sequencer在构造的时候就会传入一个 waitStrategy,sequenceBarrier 是由 Sequencer 创建的,创建的时候把 Sequencer 的 waitStrategy 传递给 sequenceBarrier。Sequencer和SequenceBarrier持有同样的waitStrategy,相当于在两者间起到了 传递信息和回调 的作用。
消费者在没有可消费的event时会调用waitStrategy.waitFor陷入等待,生产者会在生产出新event后调用waitStrategy.signalAllWhenBlocking来唤醒消费者。
不同的 WaitStrategy 的实现会有不同的效率和性能。
// 等待 processorNotifyCondition.await();
// 唤醒 processorNotifyCondition.signalAll();
- SleepingWaitStrategy: 该实现是在性能和CPU占用之间的一种折中。该实现对负责调用唤醒方法的生产者比较友好,因为啥都不用做。相当于完全依赖消费者端的自旋retry。
inal int DEFAULT_RETRIES = 200; long DEFAULT_SLEEP = 100;
int retries; long sleepTimeNs;
// 等待的实现, counter 即 retries if (counter > 100) { –counter; } else if (counter > 0) { –counter; Thread.yield(); } else { LockSupport.parkNanos(sleepTimeNs); }
- YieldingWaitStrategy: 该实现和SleepingWaitStrategy很类似,只是它在等待的时候会吃掉100%的CPU。
// 等待的实现, 只有 counter==0 的时候才让出CPU,其他时候都在自旋。 if (0 == counter) { Thread.yield(); } else { –counter; }
- BusySpinWaitStrategy: 该实现的唤醒也是啥都不做。性能最好的实现,但对部署环境的要求也最高。消费者线程数应该要少于CPU的实际物理核心数。
// 等待的实现 ThreadHints.onSpinWait();
##### WorkerPool
Sequence workSequence = new Sequence(-1); WorkProcessor<?>[] workProcessors
WorkerPool 内部维护了一个 workSequence,代表着整个pool分配出去的event位置。
<=workSequence的event已经被分配给某个workProcessors了,但是不是一定已经被消费完。
这个设计和多生产者的情况下,先分配sequence到具体的某个生产者,然后再去填充,提交是一样的道理。
##### WorkProcessor
WorkProcessor 是基本的消费者线程,它保有workSequence的引用。
在它的run loop中,它会首先尝试CAS去抢 workSequence 的下一个位置,抢到了就会去消费。
如果没有可消费的event了,它就会调用 `sequenceBarrier.waitFor(nextSequence)` 陷入等待。但即使有了新的event被唤醒,它还是要靠CAS去抢下一个event的消费权。
while (true) { try { // if previous sequence was processed - fetch the next sequence and set // that we have successfully processed the previous sequence // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler if (processedSequence) // 这个if里面的代码都是为了CAS拿event { processedSequence = false; do { nextSequence = workSequence.get() + 1L; // 拿到下一个sequence sequence.set(nextSequence - 1L); // 更新这个WorkProcessor的消费位置,这个位置主要是反映到Sequencer的gatingSequence从而影响生产者是否继续生产。 // 但实际上(nextSequence - 1L)这个位置很有可能不是这个WorkProcessor消费掉的 } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); }
if (cachedAvailableSequence >= nextSequence) // 如果该nextSequence已经被生产出来
{
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
}
else
{
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); // 没有被生产出来就在这等待
}
}
// exception handler
} ```