前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >disruptor源码分析四之consumer流程

disruptor源码分析四之consumer流程

作者头像
山行AI
发布2019-07-12 15:16:43
1.2K0
发布2019-07-12 15:16:43
举报
文章被收录于专栏:山行AI

1. 常用的在disruptor中添加消费者的方法:

代码语言:javascript
复制
 Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(                () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);        disruptor.setDefaultExceptionHandler(new MyExceptionHandler());        disruptor                .handleEventsWith(new ProcessingEventHandler())                .then(new ClearingObjectHandler());

一般调用disruptor的hadleEventsWith方法添加event handler处理消费到的event。

2. disruptor中添加event handler的方法

代码语言:javascript
复制
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* /**     * 添加多个消费者,每一个EventHandler都是独立的消费者,相互之间没有影响     * <p>Set up event handlers to handle events from the ring buffer. These handlers will process events     * as soon as they become available, in parallel.</p>     *     * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must     * process events before handler <code>B</code>:</p>     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>     *     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>     *     * @param handlers the event handlers that will process events.     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.     */    @SuppressWarnings("varargs")    @SafeVarargs    public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)    {        return createEventProcessors(new Sequence[0], handlers);    }    /**     * 添加并行消费者,每一个EventProcessorFactory创建一个EventProcessor映射为一个消费者。     * 这些消费者之间是并行关系,相互之间互不影响     * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will     * automatically start these processors when {@link #start()} is called.</p>     *     * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must     * process events before handler <code>B</code>:</p>     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>     *     * <p>Since this is the start of the chain, the processor factories will always be passed an empty <code>Sequence</code>     * array, so the factory isn't necessary in this case. This method is provided for consistency with     * {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)}     * which do have barrier sequences to provide.</p>     *     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>     *     * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.     */    @SafeVarargs    public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)    {        final Sequence[] barrierSequences = new Sequence[0];        return createEventProcessors(barrierSequences, eventProcessorFactories);    }    /**     * 添加并行消费者,每一个EventProcessor映射为一个消费者。     * 这些消费者之间是并行关系,相互之间不影响     *     * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will     * automatically start this processors when {@link #start()} is called.</p>     *     * <p>This method can be used as the start of a chain. For example if the processor <code>A</code> must     * process events before handler <code>B</code>:</p>     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>     *     * @param processors the event processors that will process events.     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.     */    public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)    {        for (final EventProcessor processor : processors)        {            consumerRepository.add(processor);        }        final Sequence[] sequences = new Sequence[processors.length];        for (int i = 0; i < processors.length; i++)        {            sequences[i] = processors[i].getSequence();        }        ringBuffer.addGatingSequences(sequences);        return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));    }    /**     * 添加一个多线程的消费者。该消费者会将event分发到各个WorkHandler。每一个event只会被其中一个WorkHandler处理。具体的要看WorkerPool的处理逻辑     *     * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads.     * Each event will only be processed by one of the work handlers.     * The Disruptor will automatically start this processors when {@link #start()} is called.     *     * @param workHandlers the work handlers that will process events.     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.     */    @SafeVarargs    @SuppressWarnings("varargs")    public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)    {        return createWorkerPool(new Sequence[0], workHandlers);    }    /**     * 添加event异常处理器     * <p>Specify an exception handler to be used for any future event handlers.</p>     *     * <p>Note that only event handlers set up after calling this method will use the exception handler.</p>     *     * @param exceptionHandler the exception handler to use for any future {@link EventProcessor}.     * @deprecated This method only applies to future event handlers. Use setDefaultExceptionHandler instead which applies to existing and new event handlers.     */    public void handleExceptionsWith(final ExceptionHandler<? super T> exceptionHandler)    {        this.exceptionHandler = exceptionHandler;    }
*/

其中EventHandler与EventProcessor的关系是,EventProcessor中代理了EventHandler,这一点可以看下BatchEventProcessor的实现:

2.1 EventHandler(EventProcessor也与此类同,com.lmax.disruptor.dsl.Disruptor#createWorkerPool部分会单独用一篇文章讲解)

  • 从createEventProcessors(new Sequence0, handlers)方法为入口去看:
代码语言:javascript
复制
/**     * 创建事件处理器     *     * @param barrierSequences 屏障sequences, {@link com.lmax.disruptor.ProcessingSequenceBarrier#dependentSequence}     *                         消费者的消费进度需要慢于它的前驱消费者     * @param eventHandlers    事件处理方法 每一个EventHandler都会被包装为{@link BatchEventProcessor},是通过BatchEventProcessor来代理EventHandler的工作     * @return     */    EventHandlerGroup<T> createEventProcessors(            final Sequence[] barrierSequences,            final EventHandler<? super T>[] eventHandlers) {        // 组织消费者之间的关系只能在启动之前        checkNotStarted();        // 用一个数组保存添加进来的的消费者的序号        final Sequence[] processorSequences = new Sequence[eventHandlers.length];        // 本次添加进来的消费者使用的屏障        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);        // 创建单线程消费者(BatchEventProcessor)        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {            final EventHandler<? super T> eventHandler = eventHandlers[i];            //用BatchEventProcessor包装eventHandler            final BatchEventProcessor<T> batchEventProcessor =                    new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);            if (exceptionHandler != null) {                batchEventProcessor.setExceptionHandler(exceptionHandler);            }            // 添加到消费者仓库中            consumerRepository.add(batchEventProcessor, eventHandler, barrier);            processorSequences[i] = batchEventProcessor.getSequence();        }        // 更新gatingSequences(生产者只需要关注所有的末端消费者节点的序列)        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);    }

每个消费者的sequence都会添加进processorSequences列表中去。

  • 更新updateGatingSequencesForNextInChain(barrierSequences, processorSequences)方法:
代码语言:javascript
复制
/**     * 在往消费者链后面添加新的节点时,需要更新gatingSequence     * @param barrierSequences   往消费者节点链上添加新节点时,之前节点的屏障序列(消费者节点链的开头部分的序列)就可以移除了     * @param processorSequences 新增加的节点的序列,新增的在消费者链的末端,因此它们的序列就是新增的gatingSequences     */    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {        if (processorSequences.length > 0) {            // 将新增加的消费者节点序列添加到gatingSequences中            ringBuffer.addGatingSequences(processorSequences);            //从gatingSequences中移除之前的barrierSequences            for (final Sequence barrierSequence : barrierSequences) {                ringBuffer.removeGatingSequence(barrierSequence);            }            // 将之前的屏障序列标记为不再是消费者链的最后节点            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);        }    }
  • 接下来我们看一看ringBuffer.addGatingSequences方法:
代码语言:javascript
复制
  /**     * 添加消费者序列到消费者链末端的消费者序列后面     * Add the specified gating sequences to this instance of the Disruptor.  They will     * safely and atomically added to the list of gating sequences.     *     * @param gatingSequences The sequences to add.     */    public void addGatingSequences(Sequence... gatingSequences)    {        sequencer.addGatingSequences(gatingSequences);    }
  • 然后调用的是sequencer.addGatingSequences方法,这个sequencer我们继续看看ringbuffer初始化时默认使用的MultiProducerSequencer,实际上调用的是com.lmax.disruptor.AbstractSequencer#addGatingSequences:
代码语言:javascript
复制
  /**     * 添加Sequence到gatingSequences 链末端     * @see Sequencer#addGatingSequences(Sequence...)     */    @Override    public final void addGatingSequences(Sequence... gatingSequences)    {        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);    }

继续看com.lmax.disruptor.SequenceGroups#addSequences: 入参主要有:

  • holder:在上面方法中传入的是MultiProducerSequencer,看过上一篇推文的童鞋会知道这个MultiProducerSequencer实例也是生产者申请ringbuffer空间时使用的Sequencer;
  • updater: AtomicReferenceFieldUpdater是jdk的一个用于原子更新域的类,在MultiProducerSequencer中定义的原子更新域的属性定义为:
代码语言:javascript
复制
/**     * 原子方式更新 追踪的Sequences     */    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

它是通过sequencer的对象引用来原子更新sequencer对象中的gatingSequences属性。

  • cursor: 在这里传入的是MultiProducerSequencer,它是实现了Cursored接口的,代表的是生产者移动的光标,也就是生产者的当前进度。
  • Sequence... sequencesToAdd 需要添加进来的消费者Sequence列表
  • 源码部分如下:
代码语言:javascript
复制
    /**     * 原子方式添加Sequence,并将要添加的Sequence进度设置为生产者最新的值     * @param holder 域所属的对象     * @param updater 域原子更新器     * @param cursor 生产者光标(进度)     * @param sequencesToAdd 要添加的Sequences     * @param <T>     */    static <T> void addSequences(        final T holder,        final AtomicReferenceFieldUpdater<T, Sequence[]> updater,        final Cursored cursor,        final Sequence... sequencesToAdd)    {        long cursorSequence;        Sequence[] updatedSequences;        Sequence[] currentSequences;        do        {            currentSequences = updater.get(holder);//获取当前sequences            updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);            cursorSequence = cursor.getCursor();            int index = currentSequences.length;            for (Sequence sequence : sequencesToAdd)//添加sequencesToAdd到updatedSequences列表中去            {                sequence.set(cursorSequence);                updatedSequences[index++] = sequence;            }        }        while (!updater.compareAndSet(holder, currentSequences, updatedSequences));//原子更新currentSequences        cursorSequence = cursor.getCursor();        for (Sequence sequence : sequencesToAdd)        {            sequence.set(cursorSequence);        }    }

注意:这里更新的其实是sequencer也就是本例中的MultiProducerSequencer的gatingSequences列表。

com.lmax.disruptor.SequenceGroups#removeSequence也和addSequence的操作类似

2.2 disruptor的启动过程

  • com.lmax.disruptor.dsl.Disruptor#start方法:
代码语言:javascript
复制
/**     * 启动Disruptor,其实就是启动消费者为。     * 为每一个EventProcessor创建一个独立的线程。     * <p>     * 这个方法必须在消费者(event processors/event handler)被添加到disruptor中之后调用     *     * <p>Starts the event processors and returns the fully configured ring buffer.</p>     *     * <p>The ring buffer is set up to prevent overwriting any entry that is yet to     * be processed by the slowest event processor.</p>     *     * <p>This method must only be called once after all event processors have been added.</p>     *     * @return the configured ring buffer.     */    public RingBuffer<T> start() {        checkOnlyStartedOnce();//检查一下保存只启动一次        for (final ConsumerInfo consumerInfo : consumerRepository) {//遍历消费者仓库,用executor为每个消费者提供一个启动线程            consumerInfo.start(executor);        }        return ringBuffer;    }
  • consumerRepository是上面添加消费者时存放消费者信息的仓库;
  • executor为disruptor初始化时用户提供的执行器,默认使用BasicExecutor,关于为什么默认使用BasicExecutor的部分,在之前的推文中也有介绍过,请不明白的童鞋自行翻阅。
  • consumerInfo.start:
代码语言:javascript
复制
  @Override    public void start(final Executor executor)    {        executor.execute(eventprocessor);    }

可以看到executor.execute传入的是实现了Runnable接口的eventprocessor,也就是每个消费者线程,这里代表的是com.lmax.disruptor.BatchEventProcessor的run方法:

代码语言:javascript
复制
/**     * 暂停以后交给下一个线程继续执行是线程安全的     * It is ok to have another thread rerun this method after a halt().     * @throws IllegalStateException if this object instance is already running in a thread     */    @Override    public void run()    {        // 原子变量,当能从IDLE切换到RUNNING状态时,前一个线程一定退出了run()        // 具备happens-before原则,是线程安全的        if (running.compareAndSet(IDLE, RUNNING))        {            sequenceBarrier.clearAlert();            notifyStart();            try            {                if (running.get() == RUNNING)                {                    processEvents();                }            }            finally            {                notifyShutdown();                // 在退出的时候会恢复到IDLE状态,且是原子变量,具备happens-before原则                // 由volatile支持                running.set(IDLE);            }        }        else        {            // This is a little bit of guess work.  The running state could of changed to HALTED by            // this point.  However, Java does not have compareAndExchange which is the only way            // to get it exactly correct.            if (running.get() == RUNNING)            {                throw new IllegalStateException("Thread is already running");            }            else            {                earlyExit();            }        }    }

上面的sequenceBarrier的来处是:

ringbuffer.newBarrier调用的是这个:

它的构造方法如下:

代码语言:javascript
复制
 ProcessingSequenceBarrier(        final Sequencer sequencer,        final WaitStrategy waitStrategy,        final Sequence cursorSequence,        final Sequence[] dependentSequences)    {        this.sequencer = sequencer;        this.waitStrategy = waitStrategy;        this.cursorSequence = cursorSequence;        // 如果没有和我绑定的事件处理器,那么只需要与生产者的进度进行协调        if (0 == dependentSequences.length)        {            dependentSequence = cursorSequence;        }        else        {            dependentSequence = new FixedSequenceGroup(dependentSequences);        }    }

我们的例子中传入的dependentSequences即为上层方法中的barrierSequences,是一个空的sequence数组,所以只需要与生产者的进度进行协调。

2.3 消费者也即eventProcessor的核心处理逻辑:

代码语言:javascript
复制
private void processEvents()    {        T event = null;        //获取到下一个序列的编号,然后去barrier中申请        //-1是不需要消费的,第一个要消费的是0        long nextSequence = sequence.get() + 1L;        while (true)        {            try            {                final long availableSequence = sequenceBarrier.waitFor(nextSequence);                if (batchStartAware != null)                {                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);                }                while (nextSequence <= availableSequence)                {                    event = dataProvider.get(nextSequence);                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);                    nextSequence++;                }                sequence.set(availableSequence);            }            catch (final TimeoutException e)            {                notifyTimeout(sequence.get());            }            catch (final AlertException ex)            {                if (running.get() != RUNNING)                {                    break;                }            }            catch (final Throwable ex)            {                exceptionHandler.handleEventException(ex, nextSequence, event);                sequence.set(nextSequence);                nextSequence++;            }        }    }
  • sequenceBarrier.waitFor():
代码语言:javascript
复制
@Override    public long waitFor(final long sequence)        throws AlertException, InterruptedException, TimeoutException    {        checkAlert();        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);        // 目标sequence还未发布,超时了        if (availableSequence < sequence)        {            return availableSequence;        }        // 目标sequence已经发布了,这里获取真正的最大序号(和生产者模型有关)        return sequencer.getHighestPublishedSequence(sequence, availableSequence);    }

com.lmax.disruptor.BlockingWaitStrategy#waitFor:

代码语言:javascript
复制
@Override    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)        throws AlertException, InterruptedException    {        long availableSequence;        // 步骤1.确保等待的序号的数据已经发布(协调与生产者之间的关系)        // double check        if (cursorSequence.get() < sequence)        {            synchronized (mutex)            {                // 循环中检测,避免虚假唤醒                while (cursorSequence.get() < sequence)                {                    barrier.checkAlert();                    mutex.wait();                }            }        }        // 步骤2.确保该序号已经被我前面的消费者消费(协调与其他消费者的关系)        //如果前面的消费者还没有消费,则等待        //如果前面的消费者已经消费,则返回已经消费到的序列        while ((availableSequence = dependentSequence.get()) < sequence)        {            // 可理解为返回之前检查中断            barrier.checkAlert();            ThreadHints.onSpinWait();        }        return availableSequence;    }

com.lmax.disruptor.MultiProducerSequencer#getHighestPublishedSequence:

代码语言:javascript
复制
/**     * 查询 nextSequence-availableSequence 区间段之间连续发布的最大序号。多生产者模式下sequence可能是不连续的。     * 多生产者模式下{@link Sequencer#next(int)} next是预分配的,因此可能部分数据还未被填充。     *     * @param lowerBound        期望消费的最小序号,前面的一定都已经填充并被当前消费者消费     * @param availableSequence The sequence to scan to.从已发布的最大序号     *                          多生产者模式下,已发布的数据可能是不连续的,因此不能直接该序号进行消费。     *                          必须顺序的消费,不能跳跃     * @return     */    @Override    public long getHighestPublishedSequence(long lowerBound, long availableSequence) {        for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {            // 这里中断了,不是连续发布的,需要剪断            if (!isAvailable(sequence)) {                return sequence - 1;            }        }        return availableSequence;    }

当前消费者执行部分:

代码语言:javascript
复制
  while (nextSequence <= availableSequence)    {        event = dataProvider.get(nextSequence);        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);        nextSequence++;    }    sequence.set(availableSequence);

当nextSequence小于前面消费者消费到的最新序列的时候则执行消费操作,回调用户传入的eventHandler的onEvent方法,并更新当前eventProcessor对应的sequence的序列号。

2.4 示例

代码语言:javascript
复制
public static void main(String[] args) throws Exception {        // The factory for the event        LongEventFactory factory = new LongEventFactory();        // Specify the size of the ring buffer, must be power of 2.        int bufferSize = 1024;        // Construct the Disruptor        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);        // Connect the handler        disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler());        // Start the Disruptor, starts all threads running        disruptor.start();        // Get the ring buffer from the Disruptor to be used for publishing.        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();        LongEventProducer producer = new LongEventProducer(ringBuffer);        ByteBuffer bb = ByteBuffer.allocate(8);        for (long l = 0; l <= 5; l++) {            bb.putLong(0, l);            producer.onData(bb);            Thread.sleep(1000);        }}public class LongEventHandler implements EventHandler<LongEvent> {    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {        System.out.println(Thread.currentThread().getName() + "  Event: " + event);    }}

输出结果为:

2.5 总结

  • 每个eventProcessor是一个消费者,示例执行的结果也符合disruptor推文第一篇中介绍的disruptor事件多播的特性。eventHandler最终会被包装成eventProcessor来处理。
  • 到这里整个消费者的处理流程都已经处理完成,接下来将会用一篇文章介绍下WorkerPool模式。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 常用的在disruptor中添加消费者的方法:
  • 2. disruptor中添加event handler的方法
  • 2.1 EventHandler(EventProcessor也与此类同,com.lmax.disruptor.dsl.Disruptor#createWorkerPool部分会单独用一篇文章讲解)
  • 2.2 disruptor的启动过程
  • 2.3 消费者也即eventProcessor的核心处理逻辑:
  • 2.4 示例
  • 2.5 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档