Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>( () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE); disruptor.setDefaultExceptionHandler(new MyExceptionHandler()); disruptor .handleEventsWith(new ProcessingEventHandler()) .then(new ClearingObjectHandler());
一般调用disruptor的hadleEventsWith方法添加event handler处理消费到的event。
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释
* /** * 添加多个消费者,每一个EventHandler都是独立的消费者,相互之间没有影响 * <p>Set up event handlers to handle events from the ring buffer. These handlers will process events * as soon as they become available, in parallel.</p> * * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must * process events before handler <code>B</code>:</p> * <pre><code>dw.handleEventsWith(A).then(B);</code></pre> * * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p> * * @param handlers the event handlers that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */ @SuppressWarnings("varargs") @SafeVarargs public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } /** * 添加并行消费者,每一个EventProcessorFactory创建一个EventProcessor映射为一个消费者。 * 这些消费者之间是并行关系,相互之间互不影响 * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will * automatically start these processors when {@link #start()} is called.</p> * * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must * process events before handler <code>B</code>:</p> * <pre><code>dw.handleEventsWith(A).then(B);</code></pre> * * <p>Since this is the start of the chain, the processor factories will always be passed an empty <code>Sequence</code> * array, so the factory isn't necessary in this case. This method is provided for consistency with * {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)} * which do have barrier sequences to provide.</p> * * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p> * * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */ @SafeVarargs public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories) { final Sequence[] barrierSequences = new Sequence[0]; return createEventProcessors(barrierSequences, eventProcessorFactories); } /** * 添加并行消费者,每一个EventProcessor映射为一个消费者。 * 这些消费者之间是并行关系,相互之间不影响 * * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will * automatically start this processors when {@link #start()} is called.</p> * * <p>This method can be used as the start of a chain. For example if the processor <code>A</code> must * process events before handler <code>B</code>:</p> * <pre><code>dw.handleEventsWith(A).then(B);</code></pre> * * @param processors the event processors that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */ public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors) { for (final EventProcessor processor : processors) { consumerRepository.add(processor); } final Sequence[] sequences = new Sequence[processors.length]; for (int i = 0; i < processors.length; i++) { sequences[i] = processors[i].getSequence(); } ringBuffer.addGatingSequences(sequences); return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors)); } /** * 添加一个多线程的消费者。该消费者会将event分发到各个WorkHandler。每一个event只会被其中一个WorkHandler处理。具体的要看WorkerPool的处理逻辑 * * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads. * Each event will only be processed by one of the work handlers. * The Disruptor will automatically start this processors when {@link #start()} is called. * * @param workHandlers the work handlers that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */ @SafeVarargs @SuppressWarnings("varargs") public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); } /** * 添加event异常处理器 * <p>Specify an exception handler to be used for any future event handlers.</p> * * <p>Note that only event handlers set up after calling this method will use the exception handler.</p> * * @param exceptionHandler the exception handler to use for any future {@link EventProcessor}. * @deprecated This method only applies to future event handlers. Use setDefaultExceptionHandler instead which applies to existing and new event handlers. */ public void handleExceptionsWith(final ExceptionHandler<? super T> exceptionHandler) { this.exceptionHandler = exceptionHandler; }
*/
其中EventHandler与EventProcessor的关系是,EventProcessor中代理了EventHandler,这一点可以看下BatchEventProcessor的实现:
/** * 创建事件处理器 * * @param barrierSequences 屏障sequences, {@link com.lmax.disruptor.ProcessingSequenceBarrier#dependentSequence} * 消费者的消费进度需要慢于它的前驱消费者 * @param eventHandlers 事件处理方法 每一个EventHandler都会被包装为{@link BatchEventProcessor},是通过BatchEventProcessor来代理EventHandler的工作 * @return */ EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { // 组织消费者之间的关系只能在启动之前 checkNotStarted(); // 用一个数组保存添加进来的的消费者的序号 final Sequence[] processorSequences = new Sequence[eventHandlers.length]; // 本次添加进来的消费者使用的屏障 final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); // 创建单线程消费者(BatchEventProcessor) for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; //用BatchEventProcessor包装eventHandler final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } // 添加到消费者仓库中 consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } // 更新gatingSequences(生产者只需要关注所有的末端消费者节点的序列) updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }
每个消费者的sequence都会添加进processorSequences列表中去。
/** * 在往消费者链后面添加新的节点时,需要更新gatingSequence * @param barrierSequences 往消费者节点链上添加新节点时,之前节点的屏障序列(消费者节点链的开头部分的序列)就可以移除了 * @param processorSequences 新增加的节点的序列,新增的在消费者链的末端,因此它们的序列就是新增的gatingSequences */ private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { // 将新增加的消费者节点序列添加到gatingSequences中 ringBuffer.addGatingSequences(processorSequences); //从gatingSequences中移除之前的barrierSequences for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } // 将之前的屏障序列标记为不再是消费者链的最后节点 consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } }
/** * 添加消费者序列到消费者链末端的消费者序列后面 * Add the specified gating sequences to this instance of the Disruptor. They will * safely and atomically added to the list of gating sequences. * * @param gatingSequences The sequences to add. */ public void addGatingSequences(Sequence... gatingSequences) { sequencer.addGatingSequences(gatingSequences); }
/** * 添加Sequence到gatingSequences 链末端 * @see Sequencer#addGatingSequences(Sequence...) */ @Override public final void addGatingSequences(Sequence... gatingSequences) { SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences); }
继续看com.lmax.disruptor.SequenceGroups#addSequences: 入参主要有:
/** * 原子方式更新 追踪的Sequences */ private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
它是通过sequencer的对象引用来原子更新sequencer对象中的gatingSequences属性。
/** * 原子方式添加Sequence,并将要添加的Sequence进度设置为生产者最新的值 * @param holder 域所属的对象 * @param updater 域原子更新器 * @param cursor 生产者光标(进度) * @param sequencesToAdd 要添加的Sequences * @param <T> */ static <T> void addSequences( final T holder, final AtomicReferenceFieldUpdater<T, Sequence[]> updater, final Cursored cursor, final Sequence... sequencesToAdd) { long cursorSequence; Sequence[] updatedSequences; Sequence[] currentSequences; do { currentSequences = updater.get(holder);//获取当前sequences updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length); cursorSequence = cursor.getCursor(); int index = currentSequences.length; for (Sequence sequence : sequencesToAdd)//添加sequencesToAdd到updatedSequences列表中去 { sequence.set(cursorSequence); updatedSequences[index++] = sequence; } } while (!updater.compareAndSet(holder, currentSequences, updatedSequences));//原子更新currentSequences cursorSequence = cursor.getCursor(); for (Sequence sequence : sequencesToAdd) { sequence.set(cursorSequence); } }
注意:这里更新的其实是sequencer也就是本例中的MultiProducerSequencer的gatingSequences列表。
com.lmax.disruptor.SequenceGroups#removeSequence也和addSequence的操作类似
/** * 启动Disruptor,其实就是启动消费者为。 * 为每一个EventProcessor创建一个独立的线程。 * <p> * 这个方法必须在消费者(event processors/event handler)被添加到disruptor中之后调用 * * <p>Starts the event processors and returns the fully configured ring buffer.</p> * * <p>The ring buffer is set up to prevent overwriting any entry that is yet to * be processed by the slowest event processor.</p> * * <p>This method must only be called once after all event processors have been added.</p> * * @return the configured ring buffer. */ public RingBuffer<T> start() { checkOnlyStartedOnce();//检查一下保存只启动一次 for (final ConsumerInfo consumerInfo : consumerRepository) {//遍历消费者仓库,用executor为每个消费者提供一个启动线程 consumerInfo.start(executor); } return ringBuffer; }
@Override public void start(final Executor executor) { executor.execute(eventprocessor); }
可以看到executor.execute传入的是实现了Runnable接口的eventprocessor,也就是每个消费者线程,这里代表的是com.lmax.disruptor.BatchEventProcessor的run方法:
/** * 暂停以后交给下一个线程继续执行是线程安全的 * It is ok to have another thread rerun this method after a halt(). * @throws IllegalStateException if this object instance is already running in a thread */ @Override public void run() { // 原子变量,当能从IDLE切换到RUNNING状态时,前一个线程一定退出了run() // 具备happens-before原则,是线程安全的 if (running.compareAndSet(IDLE, RUNNING)) { sequenceBarrier.clearAlert(); notifyStart(); try { if (running.get() == RUNNING) { processEvents(); } } finally { notifyShutdown(); // 在退出的时候会恢复到IDLE状态,且是原子变量,具备happens-before原则 // 由volatile支持 running.set(IDLE); } } else { // This is a little bit of guess work. The running state could of changed to HALTED by // this point. However, Java does not have compareAndExchange which is the only way // to get it exactly correct. if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { earlyExit(); } } }
上面的sequenceBarrier的来处是:
ringbuffer.newBarrier调用的是这个:
它的构造方法如下:
ProcessingSequenceBarrier( final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; // 如果没有和我绑定的事件处理器,那么只需要与生产者的进度进行协调 if (0 == dependentSequences.length) { dependentSequence = cursorSequence; } else { dependentSequence = new FixedSequenceGroup(dependentSequences); } }
我们的例子中传入的dependentSequences即为上层方法中的barrierSequences,是一个空的sequence数组,所以只需要与生产者的进度进行协调。
private void processEvents() { T event = null; //获取到下一个序列的编号,然后去barrier中申请 //-1是不需要消费的,第一个要消费的是0 long nextSequence = sequence.get() + 1L; while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }
@Override public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); // 目标sequence还未发布,超时了 if (availableSequence < sequence) { return availableSequence; } // 目标sequence已经发布了,这里获取真正的最大序号(和生产者模型有关) return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
com.lmax.disruptor.BlockingWaitStrategy#waitFor:
@Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; // 步骤1.确保等待的序号的数据已经发布(协调与生产者之间的关系) // double check if (cursorSequence.get() < sequence) { synchronized (mutex) { // 循环中检测,避免虚假唤醒 while (cursorSequence.get() < sequence) { barrier.checkAlert(); mutex.wait(); } } } // 步骤2.确保该序号已经被我前面的消费者消费(协调与其他消费者的关系) //如果前面的消费者还没有消费,则等待 //如果前面的消费者已经消费,则返回已经消费到的序列 while ((availableSequence = dependentSequence.get()) < sequence) { // 可理解为返回之前检查中断 barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; }
com.lmax.disruptor.MultiProducerSequencer#getHighestPublishedSequence:
/** * 查询 nextSequence-availableSequence 区间段之间连续发布的最大序号。多生产者模式下sequence可能是不连续的。 * 多生产者模式下{@link Sequencer#next(int)} next是预分配的,因此可能部分数据还未被填充。 * * @param lowerBound 期望消费的最小序号,前面的一定都已经填充并被当前消费者消费 * @param availableSequence The sequence to scan to.从已发布的最大序号 * 多生产者模式下,已发布的数据可能是不连续的,因此不能直接该序号进行消费。 * 必须顺序的消费,不能跳跃 * @return */ @Override public long getHighestPublishedSequence(long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { // 这里中断了,不是连续发布的,需要剪断 if (!isAvailable(sequence)) { return sequence - 1; } } return availableSequence; }
当前消费者执行部分:
while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence);
当nextSequence小于前面消费者消费到的最新序列的时候则执行消费操作,回调用户传入的eventHandler的onEvent方法,并更新当前eventProcessor对应的sequence的序列号。
public static void main(String[] args) throws Exception { // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); // Connect the handler disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler()); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; l <= 5; l++) { bb.putLong(0, l); producer.onData(bb); Thread.sleep(1000); }}public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(Thread.currentThread().getName() + " Event: " + event); }}
输出结果为: