前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高并发数据结构Disruptor解析(6)

高并发数据结构Disruptor解析(6)

作者头像
干货满满张哈希
发布2021-04-12 15:11:57
7940
发布2021-04-12 15:11:57
举报
文章被收录于专栏:干货满满张哈希

SequenceBarrier

SequenceBarrier是消费者与Ringbuffer之间建立消费关系的桥梁,同时也是消费者与消费者之间消费依赖的抽象。

SequenceBarrier只有一个实现类,就是ProcessingSequenceBarrier。ProcessingSequenceBarrier由生产者Sequencer,消费定位cursorSequence,等待策略waitStrategy还有一组依赖sequence:dependentSequence组成:

代码语言:javascript
复制
public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
{
   this.sequencer = sequencer;
   this.waitStrategy = waitStrategy;
   this.cursorSequence = cursorSequence;
   if (0 == dependentSequences.length)
   {
       dependentSequence = cursorSequence;
   }
   else
   {
       dependentSequence = new FixedSequenceGroup(dependentSequences);
   }
}

首先,为了实现消费依赖,SequenceBarrier肯定有一个获取可以消费的sequence方法,就是

代码语言:javascript
复制
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;

实现为:

代码语言:javascript
复制
@Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        //检查是否alerted
        checkAlert();
        //通过等待策略获取下一个可消费的sequence,这个sequence通过之前的讲解可以知道,需要大于cursorSequence和dependentSequence,我们可以通过dependentSequence实现先后消费
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
        //等待可能被中断,所以检查下availableSequence是否小于sequence
        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        //如果不小于,返回所有sequence(可能多生产者)和availableSequence中最大的
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

其他方法实现很简单,功能上分别有: 1. 获取当前cursorSequence(并没有什么用,就是为了监控) 2. 负责中断和恢复的alert标记

代码语言:javascript
复制
    @Override
    public long getCursor()
    {
        return dependentSequence.get();
    }

    @Override
    public boolean isAlerted()
    {
        return alerted;
    }

    @Override
    public void alert()
    {
        alerted = true;
        waitStrategy.signalAllWhenBlocking();
    }

    @Override
    public void clearAlert()
    {
        alerted = false;
    }

    @Override
    public void checkAlert() throws AlertException
    {
        if (alerted)
        {
            throw AlertException.INSTANCE;
        }
    }

构造SequenceBarrier在框架中只有一个入口,就是AbstractSequencer的:

代码语言:javascript
复制
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }

SequenceProcessor

通过SequenceBarrier,我们可以实现消费之间的依赖关系,但是,消费方式(比如广播,群组消费等等),需要通过SequenceProcessor的实现类实现:

通过类依赖关系我们发现,EventProcessor都是拓展了Runnable接口,也就是我们可以把它们当做线程处理。

1. BatchEventProcessor:

它的构造方法:

代码语言:javascript
复制
/**
     * 构造一个消费者之间非互斥消费的消费者
     *
     * @param dataProvider    对应的RingBuffer
     * @param sequenceBarrier 依赖关系,通过构造不同的sequenceBarrier用互相的dependentsequence,我们可以构造出先后消费关系
     * @param eventHandler    用户实现的处理消费的event的业务消费者.
     */
    public BatchEventProcessor(
        final DataProvider dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandlersuper T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;

        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler) eventHandler).setSequenceCallback(sequence);
        }

        timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }

线程为一个死循环:

代码语言:javascript
复制
 @Override
    public void run()
    {
        //检查状态
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        //清理
        sequenceBarrier.clearAlert();
        //如果用户实现的EventHandler继承了LifecycleAware,则执行其onStart方法
        notifyStart();

        T event = null;
        //sequence初始值为-1,设计上当前值是已经消费过的
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    //获取当前可以消费的最大sequence
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    while (nextSequence <= availableSequence)
                    {
                        //获取并处理
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    //设置当前sequence,注意,出现异常需要特殊处理,防止重复消费
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    //wait超时异常
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    //中断异常
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    //如果出现异常则设置为nextSequence
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            //如果用户实现的EventHandler继承了LifecycleAware,则执行其onShutdown方法
            notifyShutdown();
            running.set(false);
        }
    }

可以看出: 1. BatchEventProcessor可以处理超时,可以处理中断,可以通过用户实现的异常处理类处理异常,同时,发生异常之后再次启动,不会漏消费,也不会重复消费。 2. 不同的BatchEventProcessor之间通过SequenceBarrier进行依赖消费。原理如下图所示:

假设我们有三个消费者BatchEventProcessor1,BatchEventProcessor2,BatchEventProcessor3. 1需要先于2和3消费,那么构建BatchEventProcessor和SequenceBarrier时,我们需要让BatchEventProcessor2和BatchEventProcessor3的SequenceBarrier的dependentSequence中加入SequenceBarrier1的sequence。 其实这里2和3共用一个SequenceBarrier就行。

2. WorkProcessor

另一种消费者是WorkProcessor。利用它,可以实现互斥消费,同样的利用SequenceBarrier可以实现消费顺序

代码语言:javascript
复制
public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                if (processedSequence)
                {
                    processedSequence = false;
                    //获取下一个可以消费的Sequence
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    //多个WorkProcessor之间,如果共享一个workSequence,那么,可以实现互斥消费,因为只有一个线程可以CAS更新成功
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }

                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

3. WorkerPool

多个WorkerProcessor可以组成一个WorkerPool:

代码语言:javascript
复制
public WorkerPool(
        final RingBuffer ringBuffer,
        final SequenceBarrier sequenceBarrier,
        final ExceptionHandlersuper T> exceptionHandler,
        final WorkHandlersuper T>... workHandlers)
    {
        this.ringBuffer = ringBuffer;
        final int numWorkers = workHandlers.length;
        workProcessors = new WorkProcessor[numWorkers];

        for (int i = 0; i < numWorkers; i++)
        {
            workProcessors[i] = new WorkProcessor(
                ringBuffer,
                sequenceBarrier,
                workHandlers[i],
                exceptionHandler,
                workSequence);
        }
    }

里面的 workHandlers[i]共享同一个workSequence,所以,同一个WorkerPool内,是互斥消费。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016/08/08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SequenceBarrier
  • SequenceProcessor
    • 1. BatchEventProcessor:
      • 2. WorkProcessor
        • 3. WorkerPool
        相关产品与服务
        批量计算
        批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档