前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >链路追踪 SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)

链路追踪 SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)

作者头像
芋道源码
发布2019-05-31 17:40:43
8720
发布2019-05-31 17:40:43
举报
文章被收录于专栏:芋道源码1024芋道源码1024

点击上方“芋道源码”,选择“设为星标

做积极的人,而不是积极废人!

源码精品专栏

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-streaming-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版

  • 1. 概述
  • 2. apm-collector-core/graph
    • 2.1 Graph 创建
    • 2.2 Graph 启动
  • 3. apm-collector-stream
    • 3.1 WayToNode 实现类
    • 3.2 NodeProcessor 实现类

1. 概述

本文主要分享 Collector Streaming 流式处理。主要包含如下部分:

  • apm-collector-core 模块的 graph 包,提供最精简单节点的流式处理的封装。如下图所示:
  • apm-collector-stream 模块,在 graph 包的基础上,提供异步跨节点等等的流式处理的封装。如下图所示:

免打脸大保健:笔者对流式处理非常不了解,本文可能是一本正经的胡说八道。考虑到笔者是靠脸吃饭(颜值我只服我红雷哥),所以读者老爷请爱护下笔者。

Collector Streaming 在 SkyWalking 架构图处于如下位置( 红框 ) :

FROM https://github.com/apache/incubating-skywalking

OK,下面来一本正经的代码走起!

2. apm-collector-core/graph

整体类图如下:

看起来略复杂,不要方,我们先来看一个流式大数据处理框架 Apache Storm 的说明:

FROM 《流式大数据处理的三种框架:Storm,Spark和Samza》 在 Storm 中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。这个拓扑将会被提交给集群,由集群中的主控节点(master node)分发代码,将任务分配给工作节点(worker node)执行。

  • Graph :定义了一个数据各个 Node 的处理拓扑图。
  • WayToNode :提交数据给 Node 的方式
  • Node :节点,包含一个 NodeProcessor 和 一个 Next 。
    • NodeProcessor :Node 处理器,处理数据
    • Next :包含 WayToNode 数组,即 Node 提交数据给 Next 的 Node 数组方式

整体交互流程如下:

  • 粉色箭头:当数据进来时,提交给 Grpah 。按照定义的拓扑图,使用 NodeWay 提交给 Node ,NodeProcessor 进行处理。
  • 蓝色箭头:当 NodeProcessor 处理完成后,Next 逐个使用 NodeWay 数组提交给下面的 Node ,继续处理。
    • ps :注意,这块流程,根据不同的 NodeProcessor 的实现类会有不同,蓝色箭头的过程,只是其中的一种,下面会详细解析。

整体顺序图如下:

  • DirectWay 是 WayToNode 接口的一种实现,正如其名,直接提交数据给 Node 。在 「3. apm-collector-stream」 会看到其他实现,例如提交到其他服务器节点的 Node,从而实现跨服务器节点的流式处理。
  • AbstractWorker 在 apm-collector-stream 模块,是 NodeProcessor 接口的一种实现,处理提交给 Node 的数据。在 #onWork(message) 抽象方法里,子类可以实现该方法,根据自身需求,是否调用 #onNext(message) 方法,Next 逐个使用 NodeWay 数组提交给下面的 Node ,继续处理。

下面,我们来详细分别看看如下逻辑的详细代码实现:

  • Graph 创建
  • Graph 启动

2.1 Graph 创建

创建 Graph 的顺序图如下:

  • 第一步,调用 GraphManager#createIfAbsent(graphId, input) 方法( input 参数没用 ),创建一个 Graph 对象。
  • 第二步,调用 Graph#addNode(WayToNode) 方法,创建该 Graph 的首个 Node 对象。
  • 第三步,调用 Node#addNext(WayToNode) 方法,创建该 Node 的下一个 Node 对象。

如下是 collector-agent-stream-provider 模块,TraceStreamGraph#createServiceReferenceGraph() 方法的代码:

代码语言:javascript
复制
public void createServiceReferenceGraph() {
    QueueCreatorService<ServiceReference> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
    RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

    Graph<ServiceReference> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class);
    graph.addNode(new ServiceReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
        .addNext(new ServiceReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener))
        .addNext(new ServiceReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
}

让我们来看看每个方法的具体代码实现。


第一步

GraphManager#createIfAbsent(graphId, input) 方法,创建一个 Graph 对象,并添加到 Graph 映射。代码如下:

  • INSTANCE 属性,单例。
  • allGraphs 属性,Graph 映射。其中映射的 KEY 为每个 Graph 全局唯一编号。在 JvmMetricStreamGraph 、RegisterStreamGraph 、TraceStreamGraph 类中,枚举了实际使用的 Graph 编号们。
  • 第 50 至 58 行:当 Graph 映射里不存在指定 Graph 编号时,创建 Graph 对象,并返回。

第二步

Graph#addNode(WayToNode) 方法,创建该 Graph 的首个 Node 对象。代码如下:

  • id 属性,Graph 编号。
  • entryWay首个提交数据给 Node 的方式。
  • 第 58 行 :将方法参数 entryWay 赋值给 this.entryWay 属性。在下分享的 Graph#start(input) 方法里,我们会看到这是 Graph 启动的入口,首个提交给 Node 的方式。
  • 第 60 至 62 行 :调用 WayToNode#buildDestination(Graph) 方法,创建 Node 对象,并返回该 Node。在上文中,我们已经说过创建的 Node 对象,为该 Graph 的首个 Node 。

WayToNode#buildDestination() 方法,创建该 WayToNode 的 Node 对象。代码如下:

  • destination 属性,目标 Node 。即该 WayToNode 提交数据到的 Node 。
  • destinationHandler 属性,目标 Node 的处理器。见 `#out(INPUT)` 方法。
  • 第 42 行:创建 Node 对象。
    • 目前,`destinationHandler` 属性,除了用于创建 Node 对象,无其他用途。

Node 构造方法 方法,代码如下:

  • nodeProcessor 属性,节点处理器。
  • next 属性,包含 WayToNode 数组,即 Node 提交数据给 Next 的 Node 数组的方式。
  • 第 44 行:调用 Graph#checkForNewNode(Node) 方法,校验 Node 的 NodeProcessor 在其 Graph 里,编号唯一

Graph#checkForNewNode(Node) 方法,校验 Node 的 NodeProcessor 在 Graph 里,编号唯一,代码如下:

  • nodeIndex 属性,处理器编号与 Node 的映射。其中映射的 KEY 为 NodeProcessor#id()
  • 第 72 至 78 行:校验 Node 的 NodeProcessor 在 Graph 里,编号唯一

第三步

Node#addNext(WayToNode) 方法,创建该 Node 的下一个 Node 对象。代码如下:

  • 第 54 行:调用 WayToNode#buildDestination(Graph) 方法,创建该 Node 的下面的 Node 对象。
  • 第 56 行:添加创建的 Node 对象到 next 属性。
  • 第 58 行:返回创建的 Node 对象。

2.2 Graph 启动

创建 Graph 的顺序图如下:

数据流向

FROM

TO

逻辑

第一步

Graph

WayToNode

第二步

WayToNode

Node

第三步

Node

NodeProcessor

第四步

NodeProcessor

Next

根据具体实现,若到 Next ,重复第一步


第一步

Graph#start(input) 方法,启动 Graph ,处理数据。代码如下:

  • 第 49 行:调用 WayToNode#in(input) 方法,输入数据给 WayToNode 。

WayToNode#in(input) 抽象方法,以 DirectWay#in(input) 实现方法举例子,代码如下:

  • 第 30 行:调用 super#out(input) 方法,直接输出数据,调用 Node#execute(input) 方法,提交数据给 Node ,进行处理。

第二步

Node#execute 方法,调用 NodeProcessor#process(input, next) 方法,处理数据。


第三步

NodeProcessor#process(input, next) 接口方法,以 AbstractWorker#process(input, next)实现方法举例子,代码如下:

  • 第 64 行:将方法参数 next 赋值给 this.next 属性。this.next 属性,用于封装的 #onNext(OUTPUT)方法,提交数据给当前 Node 的 Next ( 下面的 Node 们 )继续处理数据。
  • 第 67 行:调用 `#onWork` 抽象方法,处理数据。当 AbstractWorker 抽象类的实现类需要继续讲数据提交给 Next 时,需要在 #onWork 方法里,调用 #onNext(OUTPUT) 方法,例如 `ApplicationRegisterRemoteWorker#onWork(Application)` 。

第四步

Next#execute(INPUT) 方法,循环 WayToNode 数组,输入数据给 WayNode ,相当于"重回"【第一步】。

3. apm-collector-stream

在文章的开头,我们提到了 apm-collector-stream 模块,在 graph 包的基础上,提供异步跨节点等等的流式处理的封装。主要在 WayToNode 、NodeProcessor 的实现类上做文章。

3.1 WayToNode 实现类

整体类图如下:

3.1.1 WorkerRef

org.skywalking.apm.collector.stream.worker.base.WorkerRef ,Worker 引用抽象类

apm-collector-stream 模块里,我们会发现类的命名从 Node / NodeProcessor 转向了 Worker ?这是为什么呢?关于这一点,我们特意采访( 请教 )了官方大佬。

Worker 更具业务含义 Node / Processor 更偏技术含义

目前,WorkerRef 无具体的方法。

3.1.2 LocalAsyncWorkerRef

org.skywalking.apm.collector.stream.worker.base.LocalAsyncWorkerRef ,异步 Worker 引用实现类,提供了异步的流式处理封装。

我们回到 「2.2 Graph 创建」 的【第一步】。

LocalAsyncWorkerRef#in(INPUT) 方法,代码如下:

  • `queueEventHandler` 属性,队列事件处理器。在 《SkyWalking 源码分析 —— Collector Queue 队列组件》 我们会详细解析它的代码实现,这里只简单介绍下。
  • 第 47 行:将输入的数据,作为"事件",提交到队列事件处理器中,不再执行后续逻辑。此后,队列事件处理器,会在后台处理到该"事件"( 数据 ),回调 `LocalAsyncWorkerRef#execute` 方法,从而提交数据到 Worker ( Node )。详细参见 DisruptorEventHandler#onEvent(…) 方法。

那么为什么会回调呢?LocalAsyncWorkerRef 实现了 org.skywalking.apm.collector.queue.base.QueueExecutor 接口,它自身被设置到 QueueEventHandler 中, 作为"事件"的执行器。

整体流程如下:

3.1.3 RemoteWorkerRef

org.skywalking.apm.collector.stream.worker.base.RemoteWorkerRef ,远程 Worker 引用实现类,提供了远程跨节点的流式处理的封装。

我们再回到 「2.2 Graph 创建」 的【第一步】。

RemoteWorkerRef#in(INPUT) 方法,代码如下:

  • remoteSenderService 属性,远程发送服务。在 《SkyWalking 源码分析 —— Collector Remote 远程通信服务》「3.2 GRPCRemoteSenderService」 我们会详细解析它的代码实现,这里只简单介绍下。
  • remoteWorker 属性,远程 Worker 。在下文会详细分享它的实现。
  • 第 56 行:调用 RemoteSenderService#send(…) 方法,根据远程 Worker 的 Selector 选择器,选择一个 Worker 进行发送。
  • 第 58 至 60 行:当选择的 Worker 为本地模式( Mode )时,调用 #out(INPUT) 方法,提交数据到本地的 Worker ( Node )。

3.2 NodeProcessor 实现类

整体类图如下:

  • `org.skywalking.apm.collector.stream.worker.base.Provider` ,Worker 供应者接口,用于创建 Worker 和 WorkerRef 对象的工厂

3.2.1 AbstractWorker

AbstractWorker 的代码实现,在 「2.2 Graph 启动」 已经详细解析。

org.skywalking.apm.collector.stream.worker.base.AbstractWorkerProvider ,Worker 供应者抽象类,定义了 #workerInstance(ModuleManager) 抽象方法,用于创建 Worker 对象。

3.2.2 AbstractLocalAsyncWorker

org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker ,异步 Worker 抽象类

目前,AbstractLocalAsyncWorker 无具体的方法。

实际使用时,继承 AbstractLocalAsyncWorker 类,实现 #work(INPUT) 方法,例如:ApplicationRegisterSerialWorker 。


org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider ,LocalAsyncWorker 供应者抽象类

  • `queueCreatorService` 属性,队列创建服务,用于创建 QueueEventHandler 对象。
  • `#queueSize()` 抽象方法,声明队列大小。
  • [#create(WorkerCreateListener)]() 实现方法,创建 AbstractLocalAsyncWorker 和 LocalAsyncWorkerRef 对象。
    • 第 51 行:创建 AbstractLocalAsyncWorker 实现类的对象。参见 ApplicationRegisterSerialWorker.Factory#workerInstance(ModuleManager) 方法。
    • 第 54 行:添加 AbstractLocalAsyncWorker 到 WorkerCreateListener ( Worker 创建监听器 )。WorkerCreateListener 在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4.1 WorkerCreateListener」 详细解析。
    • 第 57 行:创建 LocalAsyncWorkerRef 对象。
    • 第 60 行:调用 `QueueCreatorService#create(…)` 方法,创建 QueueEventHandler 对象,并设置 LocalAsyncWorkerRef 作为它的执行器
    • 第 63 行:设置 LocalAsyncWorkerRef 的 QueueEventHandler 属性。

3.2.3 AbstractRemoteWorker

org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker ,远程 Worker 抽象类,定义了 #selector() 抽象方法,获得选择器。RemoteSenderService 根据选择器,调用 RemoteClientSelector#select(...) 方法,选择好远程节点,而后进行发送数据。

实际使用时,继承 AbstractLocalAsyncWorker 类,实现 #work(INPUT) 方法,例如:ApplicationRegisterRemoteWorker 。


org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider ,AbstractRemoteWorker 供应者抽象类

  • `remoteSenderService` 属性,远程发送服务。
  • `#create(WorkerCreateListener)` 实现方法,创建 AbstractRemoteWorker 和 RemoteWorkerRef 对象。
    • 第 58 行:创建 AbstractRemoteWorker 实现类的对象。参见 ApplicationRegisterRemoteWorker.Factory#workerInstance(ModuleManager) 方法。
    • 第 61 行:添加 AbstractLocalAsyncWorker 到 WorkerCreateListener ( Worker 创建监听器 )。WorkerCreateListener 在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4.1 WorkerCreateListener」 详细解析。
    • 第 64 行:创建 RemoteWorkerRef 对象。


欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

如果你喜欢这篇文章,喜欢,转发。

生活很美好,明天见(。・ω・。)ノ♡

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 芋道源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. apm-collector-core/graph
    • 2.1 Graph 创建
      • 2.2 Graph 启动
      • 3. apm-collector-stream
        • 3.1 WayToNode 实现类
          • 3.1.1 WorkerRef
          • 3.1.2 LocalAsyncWorkerRef
          • 3.1.3 RemoteWorkerRef
        • 3.2 NodeProcessor 实现类
          • 3.2.1 AbstractWorker
          • 3.2.2 AbstractLocalAsyncWorker
          • 3.2.3 AbstractRemoteWorker
      相关产品与服务
      消息队列 TDMQ
      消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档