大家好,又见面了,我是你们的朋友全栈君。
https://github.com/LMAX-Exchange/disruptor/blob/master/README.md https://github.com/LMAX-Exchange/disruptor/wiki/Introduction https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
本质就是一个可重用的 FIFO 队列
(图自官网:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction)
tracking the progress of the ring buffer and event processors
Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field.
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
// 调用 sequenceBarrier.waitFor(nextSequence) 来确定当前可消费的数据位点
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
// com/lmax/disruptor/ProcessingSequenceBarrier.java
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// 可见 SequenceBarrier 的核心是用 waitStrategy 去 waitFor 数据
// 下面的变量,
// sequence: 记录 consumer 消费位置的
// cursorSequence: 记录 ringBuffer 生产位置的
// dependentSequence: 记录当前 consumer 依赖的其他 consumer 的消费位置的(如果当前 consumer 只从 ringBuffer 读取数据,而不依赖于其他 consumer,那么 dependentSequence 就和 cursorSequence 是同一个,参考 ProcessingSequenceBarrier 的构造函数)
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
// 当 Producer 往 ringBuffer 写入了新数据之后,是怎么通知 Consumer 的呢?
// 这个逻辑就在 waitStrategy 里。
// 以 BlockingWaitStrategy 为例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence)
{
synchronized (mutex)
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
// 如果没有数据,调用 mutex (就是一个普通 Object )的wait,这里代码阻塞。直到有 notify/ notifyAll 被调用时,代码继续执行。
// 通过 wait 方法阻塞一个线程时,这个线程会放弃 CPU 时间片。
// 那么 notify / notifyAll 被谁调用呢?答案,BlockingWaitStrategy 有 signalAllWhenBlocking 方法调用 notifyAll,这个方法在 Producer 调用 ringBuffer 的 publish 时被调用。
mutex.wait();
}
}
}
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
}
public void signalAllWhenBlocking()
{
synchronized (mutex)
{
mutex.notifyAll();
}
}
The main event loop for handling events from the Disruptor and has ownership of consumer's Sequence
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 获取数据,并处理
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
......
}
}
官方文档:http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html
http://ifeve.com/dissecting-disruptor-whats-so-special/
http://ifeve.com/locks-are-bad/ 锁为什么慢,以及 Disruptor 如何避免
http://ifeve.com/disruptor-cacheline-padding/ cache-line-padding
http://ifeve.com/disruptor-memory-barrier/ disruptor-memory-barrier
简单说:
个人觉得最重要的设计就是:
// 我本地起了 2 个 Producer。
LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducer producer2 = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
producer2.onData(bb);
Thread.sleep(1000);
}
// com/lmax/disruptor/shicaiExample/LongEventProducer.java
public void onData(ByteBuffer bb)
{
// 抢占 ringBuffer 的最新空位,以便把自己的数据写入。
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
// 那么 ringBuffer.next() 是如何在多个 producer 之间协调的呢?
// com/lmax/disruptor/RingBuffer.java
* Increment and return the next sequence for the ring buffer. Calls of this
* method should ensure that they always publish the sequence afterward. E.g.
public long next()
{
// 这个 sequencer 是 com/lmax/disruptor/MultiProducerSequencer.java
return sequencer.next();
}
// com/lmax/disruptor/MultiProducerSequencer.java 这个就是 disruptor 的核心,Sequencer
public long next(int n)
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
long current;
long next;
do
{
// cursor 就是一个 sequence,多个 Producer 公用一个 sequence 进行写控制
current = cursor.get();
next = current + n;
// 判断 ringBuffer 是否满了
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// gatingSequences 是什么?
// 实际就是所有 consumer 的 seuqence 的集合。创建 consumer 时,通过 updateGatingSequencesForNextInChain 函数把它注册到 MultiProducerSequencer 的。
// 这里做的,是从各个 consumer 的 sequence 中,找到最小的哪个(就是消费最慢的那个)。
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 如果 buffer 已经满了,就一直自旋等待 consumer 消费。当 consumer 消费后,gatingSequence 就更新,从而 gatingSequenceCache 更新,从而从新判断 wrapPoint > cachedGatingSequence, 从而有可能 Producer 获得 buffer 中可写入的位置。
// 为什么有一个 gatingSequenceCache, 这个只是为了减少 getMinimuSequence 的次数,真实逻辑和没有这个 cache 一样。
// 回答了问题 2。
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
// 通过 CAS 来确保多个 Producer 能够正确写入,且不冲突。
// 回答了问题 1。
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
// GatingSequence 是由各个 Consumer 在启动 EventProcessor 时添加的
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
// MultiProducerSequencer 的 publish 做了什么?
public void publish(final long sequence)
{
setAvailable(sequence); // 设置被 publish 的位置为 not available
waitStrategy.signalAllWhenBlocking(); // 通知所有 consumer
}
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// com/lmax/disruptor/dsl/Disruptor.java
* <p>Set up event handlers to handle events from the ring buffer. These handlers will process events
* as soon as they become available, in parallel.</p>
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// 把 consumer 加入 consumer 库中。其实就是一个 观察者模式,ringBuffer 有数据后,通知各个 consumer 线程。
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
// 两个 sequence,一个是 barrierSequence, 一个是 processorSequence
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
SequenceBarrier 是用来控制 consumer 读取进度的。
* Wait for the given sequence to be available for consumption.
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
* Get the current cursor value that can be read.
long getCursor();
// 再往里看。sequence
// com/lmax/disruptor/ProcessingSequenceBarrier.java
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
@Override
public long getCursor()
{
// 返回当前位置,dependentSequence 是一个 Sequence 对象
return dependentSequence.get();
}
// ringBuffer 有了新数据时,disruptor 怎么通知各 consumer 的?
* <p>Starts the event processors and returns the fully configured ring buffer.</p>
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
// 为每个 consumer 启动一个线程,线程逻辑由 BatchEventProcessor 控制
// BatchEventProcessor 有一个 eventHandler 字段,那就是我们自己写处理代码。
consumerInfo.start(executor);
}
return ringBuffer;
}
public class LongEvent {
private long value;
public void set(long value)
{
this.value = value;
}
public long getValue() {
return value;
}
}
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
try {
Thread.sleep(5000);
} catch (Exception e) {
}
System.out.println("Event: " + event.getValue());
}
}
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 2;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducer producer2 = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
producer2.onData(bb);
Thread.sleep(1000);
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/195345.html原文链接:https://javaforall.cn