摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-queue-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要分享 SkyWalking Collector Queue Module,队列组件。该组件被 Collector Streaming Module 流式处理使用,提供异步执行的特性。
友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。
Cluster Module 在 SkyWalking 架构图处于如下位置( 红框 ) :
FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构,如下图所示 :
collector-queue-define
:定义队列组件接口。collector-queue-datacarrier-provider
:基于 apm-datacarrier 的队列组件实现。目前暂未完成。collector-queue-zookeeper-provider
:基于 Disruptor 的队列组件实现。下面,我们从接口到实现的顺序进行分享。
collector-queue-define
:定义队列组件接口。项目结构如下 :
org.skywalking.apm.collector.queue.QueueModule
,实现 Module 抽象类,队列 Module 。
#name()
实现方法,返回模块名为 "queue"
。
#services()
实现方法,返回 Service 类名:QueueCreatorService 。
org.skywalking.apm.collector.queue.service.QueueCreatorService
,继承 Service 接口,队列创建服务接口。
#create(queueSize, executor)
接口方法,创建队列处理器。
org.skywalking.apm.collector.queue.base.MessageHolder
,消息持有者。
org.skywalking.apm.collector.queue.base.QueueEventHandler
,队列处理器接口。它定义了 #tell(message)
接口方法,输入消息给自己。最终,QueueEventHandler 会"提交"消息给 org.skywalking.apm.collector.queue.base.QueueExecutor
,执行处理该消息。
LocalAsyncWorkerRef 实现 QueueEventHandler 接口,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3.1.2 LocalAsyncWorkerRef」 有详细解析。
org.skywalking.apm.collector.queue.base.DaemonThreadFactory
,守护进程线程工厂,被用于创建消息处理器的线程。
collector-queue-disruptor-provider
,基于 Disruptor 的队列组件实现。
项目结构如下 :
默认配置,在 application-default.yml
已经配置如下:
JSON queue: disruptor:
org.skywalking.apm.collector.queue.disruptor.CQueueModuleDisruptorProvider
,实现 ModuleProvider 抽象类,基于 Disruptor 的队列服务提供者。
#name()
实现方法,返回组件服务提供者名为 "disruptor"
。
module()
实现方法,返回组件类为 QueueModule 。
#requiredModules()
实现方法,返回依赖组件为空。
#prepare(Properties)
实现方法,执行准备阶段逻辑。
#registerServiceImplementation()
父类方法,注册到 services
。#start()
实现方法,方法为空。
#notifyAfterCompleted()
实现方法,方法为空。
org.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService
,实现 QueueCreatorService 接口,基于 Disruptor 的队列创建服务实现类。
#create(queueSize, executor)
实现方法,调用 DisruptorQueueCreator#register(queueSize, executor)
方法,创建队列处理器。
友情提示:如果胖友对 Disruptor 暂时不了解,建议先使用 Disruptor 写个小 Demo 。 如下是笔者阅读的文章:
org.skywalking.apm.collector.queue.disruptor.base.DisruptorQueueCreator
,实现 QueueCreator 接口,基于 Disruptor 的队列创建器实现类。
#create(queueSize, executor)
实现方法,代码如下:
"bufferSize must be a power of 2"
的异常,参见 AbstractSequencer 的代码。为什么 Disruptor 要求队列大小为 2 的指数呢?如下是相关资料,感兴趣的同学可以看看( 可跳过 ):
SingleProducerSequencer#hasAvailableCapacity(requiredCapacity)
方法,代码如下:
org.skywalking.apm.collector.queue.disruptor.base.DisruptorEventHandler
,基于 Disruptor 的队列处理器实现类。
org.skywalking.apm.collector.queue.base.QueueEventHandler
接口 的 `#tell(message)` 接口方法,标准的 Disruptor 发布事件的代码。collector-queue-datacarrier-provider
:基于 apm-datacarrier 的队列组件实现。
目前暂未完成。
已在知识星球更新源码解析如下: