前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Disruptor之ConsumerRepository

Disruptor之ConsumerRepository

作者头像
克虏伯
发布2020-06-18 10:53:54
4710
发布2020-06-18 10:53:54
举报

Disruptor版本是3.4.2.

List-1

代码语言:javascript
复制
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
        new IdentityHashMap<>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<>();
...

    如List-1所示,ConsumerRepository类名称以Repository最为后缀,Repository来自DDD,是仓储的意思,即与存储有关,而ConsumerRepository中存放的是消费者信息。使用到了JDK的IdentityHashMap,这个map在fastjson中使用到,这里对这个map就不再深入,其底层上使用的数据结构是与HashMap不同的。

    当调用Disruptor的handleEventsWith方法时,就会把EventHandler存到ConsumerRepository的eventProcessorInfoByEventHandler、consumerInfos;调用Disruptor的handleEventsWithWorkerPool时,就会把WorkHandler存到ConsumerRepository的eventProcessorInfoBySequence、consumerInfos。

List-2

代码语言:javascript
复制
EventHandlerGroup<T> createEventProcessors(
    final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers)
{
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
    {
        final EventHandler<? super T> eventHandler = eventHandlers[i];
     //将eventHandler封装到EventProcessor中
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null)
        {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

    如List-2所示,eventHandler被封装到BatchEventProcessor中,再放入到consumerRepository中,进而放入到ConsumerRepository的eventProcessorInfoByEventHandler、consumerInfos。

List-3

代码语言:javascript
复制
EventHandlerGroup<T> createWorkerPool(
    final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);


    consumerRepository.add(workerPool, sequenceBarrier);

    final Sequence[] workerSequences = workerPool.getWorkerSequences();

    updateGatingSequencesForNextInChain(barrierSequences, workerSequences);

    return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}

    如List-3所示,多个WorkHandler被封装到一个WorkerPool中,之后该WorkerPool被放入到consumerRepository中,进而放入到ConsumerRepository的eventProcessorInfoBySequence、consumerInfos。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档