专栏首页开发架构二三事disruptor源码分析四之consumer流程

disruptor源码分析四之consumer流程

1. 常用的在disruptor中添加消费者的方法:

 Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(                () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);        disruptor.setDefaultExceptionHandler(new MyExceptionHandler());        disruptor                .handleEventsWith(new ProcessingEventHandler())                .then(new ClearingObjectHandler());

一般调用disruptor的hadleEventsWith方法添加event handler处理消费到的event。

2. disruptor中添加event handler的方法

  /**     * 添加多个消费者,每一个EventHandler都是独立的消费者,相互之间没有影响     * <p>Set up event handlers to handle events from the ring buffer. These handlers will process events     * as soon as they become available, in parallel.</p>     *     * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must     * process events before handler <code>B</code>:</p>     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>     *     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>     *     * @param handlers the event handlers that will process events.     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.     */    @SuppressWarnings("varargs")    @SafeVarargs    public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)    {        return createEventProcessors(new Sequence[0], handlers);    }    /**     * 添加并行消费者,每一个EventProcessorFactory创建一个EventProcessor映射为一个消费者。     * 这些消费者之间是并行关系,相互之间互不影响     * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will     * automatically start these processors when {@link #start()} is called.</p>     *     * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must     * process events before handler <code>B</code>:</p>     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>     *     * <p>Since this is the start of the chain, the processor factories will always be passed an empty <code>Sequence</code>     * array, so the factory isn't necessary in this case. This method is provided for consistency with     * {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)}     * which do have barrier sequences to provide.</p>     *     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>     *     * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.     */    @SafeVarargs    public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)    {        final Sequence[] barrierSequences = new Sequence[0];        return createEventProcessors(barrierSequences, eventProcessorFactories);    }    /**     * 添加并行消费者,每一个EventProcessor映射为一个消费者。     * 这些消费者之间是并行关系,相互之间不影响     *     * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will     * automatically start this processors when {@link #start()} is called.</p>     *     * <p>This method can be used as the start of a chain. For example if the processor <code>A</code> must     * process events before handler <code>B</code>:</p>     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>     *     * @param processors the event processors that will process events.     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.     */    public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)    {        for (final EventProcessor processor : processors)        {            consumerRepository.add(processor);        }        final Sequence[] sequences = new Sequence[processors.length];        for (int i = 0; i < processors.length; i++)        {            sequences[i] = processors[i].getSequence();        }        ringBuffer.addGatingSequences(sequences);        return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));    }    /**     * 添加一个多线程的消费者。该消费者会将event分发到各个WorkHandler。每一个event只会被其中一个WorkHandler处理。具体的要看WorkerPool的处理逻辑     *     * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads.     * Each event will only be processed by one of the work handlers.     * The Disruptor will automatically start this processors when {@link #start()} is called.     *     * @param workHandlers the work handlers that will process events.     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.     */    @SafeVarargs    @SuppressWarnings("varargs")    public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)    {        return createWorkerPool(new Sequence[0], workHandlers);    }    /**     * 添加event异常处理器     * <p>Specify an exception handler to be used for any future event handlers.</p>     *     * <p>Note that only event handlers set up after calling this method will use the exception handler.</p>     *     * @param exceptionHandler the exception handler to use for any future {@link EventProcessor}.     * @deprecated This method only applies to future event handlers. Use setDefaultExceptionHandler instead which applies to existing and new event handlers.     */    public void handleExceptionsWith(final ExceptionHandler<? super T> exceptionHandler)    {        this.exceptionHandler = exceptionHandler;    }

其中EventHandler与EventProcessor的关系是,EventProcessor中代理了EventHandler,这一点可以看下BatchEventProcessor的实现:

2.1 EventHandler(EventProcessor也与此类同,com.lmax.disruptor.dsl.Disruptor#createWorkerPool部分会单独用一篇文章讲解)

  • 从createEventProcessors(new Sequence0, handlers)方法为入口去看:
/**     * 创建事件处理器     *     * @param barrierSequences 屏障sequences, {@link com.lmax.disruptor.ProcessingSequenceBarrier#dependentSequence}     *                         消费者的消费进度需要慢于它的前驱消费者     * @param eventHandlers    事件处理方法 每一个EventHandler都会被包装为{@link BatchEventProcessor},是通过BatchEventProcessor来代理EventHandler的工作     * @return     */    EventHandlerGroup<T> createEventProcessors(            final Sequence[] barrierSequences,            final EventHandler<? super T>[] eventHandlers) {        // 组织消费者之间的关系只能在启动之前        checkNotStarted();        // 用一个数组保存添加进来的的消费者的序号        final Sequence[] processorSequences = new Sequence[eventHandlers.length];        // 本次添加进来的消费者使用的屏障        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);        // 创建单线程消费者(BatchEventProcessor)        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {            final EventHandler<? super T> eventHandler = eventHandlers[i];            //用BatchEventProcessor包装eventHandler            final BatchEventProcessor<T> batchEventProcessor =                    new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);            if (exceptionHandler != null) {                batchEventProcessor.setExceptionHandler(exceptionHandler);            }            // 添加到消费者仓库中            consumerRepository.add(batchEventProcessor, eventHandler, barrier);            processorSequences[i] = batchEventProcessor.getSequence();        }        // 更新gatingSequences(生产者只需要关注所有的末端消费者节点的序列)        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);    }

每个消费者的sequence都会添加进processorSequences列表中去。

  • 更新updateGatingSequencesForNextInChain(barrierSequences, processorSequences)方法:
/**     * 在往消费者链后面添加新的节点时,需要更新gatingSequence     * @param barrierSequences   往消费者节点链上添加新节点时,之前节点的屏障序列(消费者节点链的开头部分的序列)就可以移除了     * @param processorSequences 新增加的节点的序列,新增的在消费者链的末端,因此它们的序列就是新增的gatingSequences     */    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {        if (processorSequences.length > 0) {            // 将新增加的消费者节点序列添加到gatingSequences中            ringBuffer.addGatingSequences(processorSequences);            //从gatingSequences中移除之前的barrierSequences            for (final Sequence barrierSequence : barrierSequences) {                ringBuffer.removeGatingSequence(barrierSequence);            }            // 将之前的屏障序列标记为不再是消费者链的最后节点            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);        }    }
  • 接下来我们看一看ringBuffer.addGatingSequences方法:
  /**     * 添加消费者序列到消费者链末端的消费者序列后面     * Add the specified gating sequences to this instance of the Disruptor.  They will     * safely and atomically added to the list of gating sequences.     *     * @param gatingSequences The sequences to add.     */    public void addGatingSequences(Sequence... gatingSequences)    {        sequencer.addGatingSequences(gatingSequences);    }
  • 然后调用的是sequencer.addGatingSequences方法,这个sequencer我们继续看看ringbuffer初始化时默认使用的MultiProducerSequencer,实际上调用的是com.lmax.disruptor.AbstractSequencer#addGatingSequences:
  /**     * 添加Sequence到gatingSequences 链末端     * @see Sequencer#addGatingSequences(Sequence...)     */    @Override    public final void addGatingSequences(Sequence... gatingSequences)    {        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);    }

继续看com.lmax.disruptor.SequenceGroups#addSequences: 入参主要有:

  • holder:在上面方法中传入的是MultiProducerSequencer,看过上一篇推文的童鞋会知道这个MultiProducerSequencer实例也是生产者申请ringbuffer空间时使用的Sequencer;
  • updater: AtomicReferenceFieldUpdater是jdk的一个用于原子更新域的类,在MultiProducerSequencer中定义的原子更新域的属性定义为:
/**     * 原子方式更新 追踪的Sequences     */    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

它是通过sequencer的对象引用来原子更新sequencer对象中的gatingSequences属性。

  • cursor: 在这里传入的是MultiProducerSequencer,它是实现了Cursored接口的,代表的是生产者移动的光标,也就是生产者的当前进度。
  • Sequence... sequencesToAdd 需要添加进来的消费者Sequence列表
  • 源码部分如下:
    /**     * 原子方式添加Sequence,并将要添加的Sequence进度设置为生产者最新的值     * @param holder 域所属的对象     * @param updater 域原子更新器     * @param cursor 生产者光标(进度)     * @param sequencesToAdd 要添加的Sequences     * @param <T>     */    static <T> void addSequences(        final T holder,        final AtomicReferenceFieldUpdater<T, Sequence[]> updater,        final Cursored cursor,        final Sequence... sequencesToAdd)    {        long cursorSequence;        Sequence[] updatedSequences;        Sequence[] currentSequences;        do        {            currentSequences = updater.get(holder);//获取当前sequences            updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);            cursorSequence = cursor.getCursor();            int index = currentSequences.length;            for (Sequence sequence : sequencesToAdd)//添加sequencesToAdd到updatedSequences列表中去            {                sequence.set(cursorSequence);                updatedSequences[index++] = sequence;            }        }        while (!updater.compareAndSet(holder, currentSequences, updatedSequences));//原子更新currentSequences        cursorSequence = cursor.getCursor();        for (Sequence sequence : sequencesToAdd)        {            sequence.set(cursorSequence);        }    }

注意:这里更新的其实是sequencer也就是本例中的MultiProducerSequencer的gatingSequences列表。

com.lmax.disruptor.SequenceGroups#removeSequence也和addSequence的操作类似

2.2 disruptor的启动过程

  • com.lmax.disruptor.dsl.Disruptor#start方法:
/**     * 启动Disruptor,其实就是启动消费者为。     * 为每一个EventProcessor创建一个独立的线程。     * <p>     * 这个方法必须在消费者(event processors/event handler)被添加到disruptor中之后调用     *     * <p>Starts the event processors and returns the fully configured ring buffer.</p>     *     * <p>The ring buffer is set up to prevent overwriting any entry that is yet to     * be processed by the slowest event processor.</p>     *     * <p>This method must only be called once after all event processors have been added.</p>     *     * @return the configured ring buffer.     */    public RingBuffer<T> start() {        checkOnlyStartedOnce();//检查一下保存只启动一次        for (final ConsumerInfo consumerInfo : consumerRepository) {//遍历消费者仓库,用executor为每个消费者提供一个启动线程            consumerInfo.start(executor);        }        return ringBuffer;    }
  • consumerRepository是上面添加消费者时存放消费者信息的仓库;
  • executor为disruptor初始化时用户提供的执行器,默认使用BasicExecutor,关于为什么默认使用BasicExecutor的部分,在之前的推文中也有介绍过,请不明白的童鞋自行翻阅。
  • consumerInfo.start:
  @Override    public void start(final Executor executor)    {        executor.execute(eventprocessor);    }

可以看到executor.execute传入的是实现了Runnable接口的eventprocessor,也就是每个消费者线程,这里代表的是com.lmax.disruptor.BatchEventProcessor的run方法:

/**     * 暂停以后交给下一个线程继续执行是线程安全的     * It is ok to have another thread rerun this method after a halt().     * @throws IllegalStateException if this object instance is already running in a thread     */    @Override    public void run()    {        // 原子变量,当能从IDLE切换到RUNNING状态时,前一个线程一定退出了run()        // 具备happens-before原则,是线程安全的        if (running.compareAndSet(IDLE, RUNNING))        {            sequenceBarrier.clearAlert();            notifyStart();            try            {                if (running.get() == RUNNING)                {                    processEvents();                }            }            finally            {                notifyShutdown();                // 在退出的时候会恢复到IDLE状态,且是原子变量,具备happens-before原则                // 由volatile支持                running.set(IDLE);            }        }        else        {            // This is a little bit of guess work.  The running state could of changed to HALTED by            // this point.  However, Java does not have compareAndExchange which is the only way            // to get it exactly correct.            if (running.get() == RUNNING)            {                throw new IllegalStateException("Thread is already running");            }            else            {                earlyExit();            }        }    }

上面的sequenceBarrier的来处是:

ringbuffer.newBarrier调用的是这个:

它的构造方法如下:

 ProcessingSequenceBarrier(        final Sequencer sequencer,        final WaitStrategy waitStrategy,        final Sequence cursorSequence,        final Sequence[] dependentSequences)    {        this.sequencer = sequencer;        this.waitStrategy = waitStrategy;        this.cursorSequence = cursorSequence;        // 如果没有和我绑定的事件处理器,那么只需要与生产者的进度进行协调        if (0 == dependentSequences.length)        {            dependentSequence = cursorSequence;        }        else        {            dependentSequence = new FixedSequenceGroup(dependentSequences);        }    }

我们的例子中传入的dependentSequences即为上层方法中的barrierSequences,是一个空的sequence数组,所以只需要与生产者的进度进行协调。

2.3 消费者也即eventProcessor的核心处理逻辑:

private void processEvents()    {        T event = null;        //获取到下一个序列的编号,然后去barrier中申请        //-1是不需要消费的,第一个要消费的是0        long nextSequence = sequence.get() + 1L;        while (true)        {            try            {                final long availableSequence = sequenceBarrier.waitFor(nextSequence);                if (batchStartAware != null)                {                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);                }                while (nextSequence <= availableSequence)                {                    event = dataProvider.get(nextSequence);                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);                    nextSequence++;                }                sequence.set(availableSequence);            }            catch (final TimeoutException e)            {                notifyTimeout(sequence.get());            }            catch (final AlertException ex)            {                if (running.get() != RUNNING)                {                    break;                }            }            catch (final Throwable ex)            {                exceptionHandler.handleEventException(ex, nextSequence, event);                sequence.set(nextSequence);                nextSequence++;            }        }    }
  • sequenceBarrier.waitFor():
@Override    public long waitFor(final long sequence)        throws AlertException, InterruptedException, TimeoutException    {        checkAlert();        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);        // 目标sequence还未发布,超时了        if (availableSequence < sequence)        {            return availableSequence;        }        // 目标sequence已经发布了,这里获取真正的最大序号(和生产者模型有关)        return sequencer.getHighestPublishedSequence(sequence, availableSequence);    }

com.lmax.disruptor.BlockingWaitStrategy#waitFor:

@Override    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)        throws AlertException, InterruptedException    {        long availableSequence;        // 步骤1.确保等待的序号的数据已经发布(协调与生产者之间的关系)        // double check        if (cursorSequence.get() < sequence)        {            synchronized (mutex)            {                // 循环中检测,避免虚假唤醒                while (cursorSequence.get() < sequence)                {                    barrier.checkAlert();                    mutex.wait();                }            }        }        // 步骤2.确保该序号已经被我前面的消费者消费(协调与其他消费者的关系)        //如果前面的消费者还没有消费,则等待        //如果前面的消费者已经消费,则返回已经消费到的序列        while ((availableSequence = dependentSequence.get()) < sequence)        {            // 可理解为返回之前检查中断            barrier.checkAlert();            ThreadHints.onSpinWait();        }        return availableSequence;    }

com.lmax.disruptor.MultiProducerSequencer#getHighestPublishedSequence:

/**     * 查询 nextSequence-availableSequence 区间段之间连续发布的最大序号。多生产者模式下sequence可能是不连续的。     * 多生产者模式下{@link Sequencer#next(int)} next是预分配的,因此可能部分数据还未被填充。     *     * @param lowerBound        期望消费的最小序号,前面的一定都已经填充并被当前消费者消费     * @param availableSequence The sequence to scan to.从已发布的最大序号     *                          多生产者模式下,已发布的数据可能是不连续的,因此不能直接该序号进行消费。     *                          必须顺序的消费,不能跳跃     * @return     */    @Override    public long getHighestPublishedSequence(long lowerBound, long availableSequence) {        for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {            // 这里中断了,不是连续发布的,需要剪断            if (!isAvailable(sequence)) {                return sequence - 1;            }        }        return availableSequence;    }

当前消费者执行部分:

  while (nextSequence <= availableSequence)    {        event = dataProvider.get(nextSequence);        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);        nextSequence++;    }    sequence.set(availableSequence);

当nextSequence小于前面消费者消费到的最新序列的时候则执行消费操作,回调用户传入的eventHandler的onEvent方法,并更新当前eventProcessor对应的sequence的序列号。

2.4 示例

public static void main(String[] args) throws Exception {        // The factory for the event        LongEventFactory factory = new LongEventFactory();        // Specify the size of the ring buffer, must be power of 2.        int bufferSize = 1024;        // Construct the Disruptor        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);        // Connect the handler        disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler());        // Start the Disruptor, starts all threads running        disruptor.start();        // Get the ring buffer from the Disruptor to be used for publishing.        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();        LongEventProducer producer = new LongEventProducer(ringBuffer);        ByteBuffer bb = ByteBuffer.allocate(8);        for (long l = 0; l <= 5; l++) {            bb.putLong(0, l);            producer.onData(bb);            Thread.sleep(1000);        }}public class LongEventHandler implements EventHandler<LongEvent> {    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {        System.out.println(Thread.currentThread().getName() + "  Event: " + event);    }}

输出结果为:

2.5 总结

  • 每个eventProcessor是一个消费者,示例执行的结果也符合disruptor推文第一篇中介绍的disruptor事件多播的特性。eventHandler最终会被包装成eventProcessor来处理。
  • 到这里整个消费者的处理流程都已经处理完成,接下来将会用一篇文章介绍下WorkerPool模式。

本文分享自微信公众号 - 开发架构二三事(gh_d6f166e26398),作者:两个小灰象

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-07-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • disruptor源码分析五之WorkerPool

    开发架构二三事
  • sharding-sphere源码之sql解析

    这里我们以org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPrepared...

    开发架构二三事
  • AQS源码分析之Elasticsearch BaseFuture

    Java多线程编程中,常用的多线程设计模式包括:Future模式、Master-Worker模式、Guarded Suspeionsion模式、不变模式和生产者...

    开发架构二三事
  • 【CodeForces 602A】C - 特别水的题3-Two Bases

    http://acm.hust.edu.cn/vjudge/contest/view.action?cid=102271#problem/C

    饶文津
  • 【2019年8月】OCP 071认证考试最新版本的考试原题-第23题

    Which two statements are true about transactions in the Oracle Database serve?

    用户5892232
  • 【Codeforces 738C】Road to Cinema

    http://codeforces.com/contest/738/problem/C

    饶文津
  • 【2019年8月】OCP 071认证考试最新版本的考试原题-第22题

    Which two statements are true about transactions in the Oracle Database serve?

    用户5892232
  • IBM刀片服务器管理模块恢复出厂默认值实战

    Resetting the management module back to factory defaults

    力哥聊运维与云计算
  • 10分钟上手,OpenCV自然场景文本检测(Python代码+实现)

    EAST文本检测器需要OpenCV3.4.2或更高版本,有需要的读者可以先安装OpenCV。

    新智元
  • 10分钟上手,OpenCV自然场景文本检测(Python代码+实现)

    EAST文本检测器需要OpenCV3.4.2或更高版本,有需要的读者可以先安装OpenCV。

    磐创AI

扫码关注云+社区

领取腾讯云代金券