https://github.com/LMAX-Exchange/disruptor/blob/master/README.md https://github.com/LMAX-Exchange/disruptor/wiki/Introduction https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
本质就是一个可重用的 FIFO 队列
tracking the progress of the ring buffer and event processors
Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field.
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents()
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
// 调用 sequenceBarrier.waitFor(nextSequence) 来确定当前可消费的数据位点
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence)
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
while (nextSequence <= availableSequence)
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
catch (final TimeoutException e)
catch (final AlertException ex)
if (running.get() != RUNNING)
catch (final Throwable ex)
exceptionHandler.handleEventException(ex, nextSequence, event);
// com/lmax/disruptor/ProcessingSequenceBarrier.java
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
// 可见 SequenceBarrier 的核心是用 waitStrategy 去 waitFor 数据
// 下面的变量,
// sequence: 记录 consumer 消费位置的
// cursorSequence: 记录 ringBuffer 生产位置的
// dependentSequence: 记录当前 consumer 依赖的其他 consumer 的消费位置的(如果当前 consumer 只从 ringBuffer 读取数据,而不依赖于其他 consumer,那么 dependentSequence 就和 cursorSequence 是同一个,参考 ProcessingSequenceBarrier 的构造函数)
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
return availableSequence;
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
// 当 Producer 往 ringBuffer 写入了新数据之后,是怎么通知 Consumer 的呢?
// 这个逻辑就在 waitStrategy 里。
// 以 BlockingWaitStrategy 为例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
long availableSequence;
if (cursorSequence.get() < sequence)
synchronized (mutex)
while (cursorSequence.get() < sequence)
// 如果没有数据,调用 mutex (就是一个普通 Object )的wait,这里代码阻塞。直到有 notify/ notifyAll 被调用时,代码继续执行。
// 通过 wait 方法阻塞一个线程时,这个线程会放弃 CPU 时间片。
// 那么 notify / notifyAll 被谁调用呢?答案,BlockingWaitStrategy 有 signalAllWhenBlocking 方法调用 notifyAll,这个方法在 Producer 调用 ringBuffer 的 publish 时被调用。
while ((availableSequence = dependentSequence.get()) < sequence)
return availableSequence;
public void signalAllWhenBlocking()
synchronized (mutex)
The main event loop for handling events from the Disruptor and has ownership of consumer's Sequence
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents()
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence)
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
// 获取数据,并处理
while (nextSequence <= availableSequence)
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
http://ifeve.com/locks-are-bad/ 锁为什么慢,以及 Disruptor 如何避免
http://ifeve.com/disruptor-cacheline-padding/ cache-line-padding
http://ifeve.com/disruptor-memory-barrier/ disruptor-memory-barrier
// 我本地起了 2 个 Producer。
LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducer producer2 = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
bb.putLong(0, l);
// com/lmax/disruptor/shicaiExample/LongEventProducer.java
public void onData(ByteBuffer bb)
// 抢占 ringBuffer 的最新空位,以便把自己的数据写入。
long sequence = ringBuffer.next(); // Grab the next sequence
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
// 那么 ringBuffer.next() 是如何在多个 producer 之间协调的呢?
// com/lmax/disruptor/RingBuffer.java
* Increment and return the next sequence for the ring buffer. Calls of this
* method should ensure that they always publish the sequence afterward. E.g.
public long next()
// 这个 sequencer 是 com/lmax/disruptor/MultiProducerSequencer.java
return sequencer.next();
// com/lmax/disruptor/MultiProducerSequencer.java 这个就是 disruptor 的核心,Sequencer
public long next(int n)
if (n < 1 || n > bufferSize)
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
long current;
long next;
// cursor 就是一个 sequence,多个 Producer 公用一个 sequence 进行写控制
current = cursor.get();
next = current + n;
// 判断 ringBuffer 是否满了
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
// gatingSequences 是什么?
// 实际就是所有 consumer 的 seuqence 的集合。创建 consumer 时,通过 updateGatingSequencesForNextInChain 函数把它注册到 MultiProducerSequencer 的。
// 这里做的,是从各个 consumer 的 sequence 中,找到最小的哪个(就是消费最慢的那个)。
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 如果 buffer 已经满了,就一直自旋等待 consumer 消费。当 consumer 消费后,gatingSequence 就更新,从而 gatingSequenceCache 更新,从而从新判断 wrapPoint > cachedGatingSequence, 从而有可能 Producer 获得 buffer 中可写入的位置。
// 为什么有一个 gatingSequenceCache, 这个只是为了减少 getMinimuSequence 的次数,真实逻辑和没有这个 cache 一样。
// 回答了问题 2。
if (wrapPoint > gatingSequence)
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
// 通过 CAS 来确保多个 Producer 能够正确写入,且不冲突。
// 回答了问题 1。
else if (cursor.compareAndSet(current, next))
while (true);
return next;
// GatingSequence 是由各个 Consumer 在启动 EventProcessor 时添加的
public final void addGatingSequences(Sequence... gatingSequences)
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
// MultiProducerSequencer 的 publish 做了什么?
public void publish(final long sequence)
setAvailable(sequence); // 设置被 publish 的位置为 not available
waitStrategy.signalAllWhenBlocking(); // 通知所有 consumer
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// com/lmax/disruptor/dsl/Disruptor.java
* <p>Set up event handlers to handle events from the ring buffer. These handlers will process events
* as soon as they become available, in parallel.</p>
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
return createEventProcessors(new Sequence[0], handlers);
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
// 把 consumer 加入 consumer 库中。其实就是一个 观察者模式,ringBuffer 有数据后,通知各个 consumer 线程。
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
// 两个 sequence,一个是 barrierSequence, 一个是 processorSequence
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
SequenceBarrier 是用来控制 consumer 读取进度的。
* Wait for the given sequence to be available for consumption.
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
* Get the current cursor value that can be read.
long getCursor();
// 再往里看。sequence
// com/lmax/disruptor/ProcessingSequenceBarrier.java
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
return availableSequence;
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
public long getCursor()
// 返回当前位置,dependentSequence 是一个 Sequence 对象
return dependentSequence.get();
// ringBuffer 有了新数据时,disruptor 怎么通知各 consumer 的?
* <p>Starts the event processors and returns the fully configured ring buffer.</p>
public RingBuffer<T> start()
for (final ConsumerInfo consumerInfo : consumerRepository)
// 为每个 consumer 启动一个线程,线程逻辑由 BatchEventProcessor 控制
// BatchEventProcessor 有一个 eventHandler 字段,那就是我们自己写处理代码。
return ringBuffer;
public class LongEvent {
private long value;
public void set(long value)
this.value = value;
public long getValue() {
return value;
public class LongEventHandler implements EventHandler<LongEvent>
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
try {
} catch (Exception e) {
System.out.println("Event: " + event.getValue());
public class LongEventProducer
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
this.ringBuffer = ringBuffer;
public void onData(ByteBuffer bb)
long sequence = ringBuffer.next(); // Grab the next sequence
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
public class LongEventMain
public static void main(String[] args) throws Exception
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 2;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducer producer2 = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
bb.putLong(0, l);
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。