摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-receive-trace/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 SkyWalking 3.2.6 正式版
分布式链路追踪系统,链路的追踪大体流程如下:
本文主要分享【第三部分】 SkyWalking Collector 接收 Trace 数据。
友情提示:Collector 接收到 TraceSegment 的数据,对应的类是 Protobuf 生成的。考虑到更加易读易懂,本文使用 TraceSegment 相关的原始类。
大体流程如下:
什么是构建?
从 TraceSegment 数据中,会构建出更多的数据维度,如下图所示:
构建的过程,本文只分享调用的过程,具体怎么生成新的数据,数据的流式处理与存储,在 《SkyWalking 源码解析 —— Collector 存储 Trace 数据》 详细解析。
为什么构建会失败?
在 TraceSegment 里的数据结构,例如操作名( operationName
)和操作编号( operationId
) ,在 《SkyWalking 源码分析 —— Agent 收集 Trace 数据》 中我们可以看到,考虑到网络传输,优先使用 operationId
,若不存在( 例如操作还未注册,或者注册了 Agent 未同步到本地 ),则使用 operationName
。
但是,Collector 构建过程时,要求的是 operationId
,如果传递的是 operationName
时,需要将 operationName
转换成 operationId
。若此时 operationName
未注册时,则无法获取到 operationId
,导致构建失败。
那么有胖友可能有疑惑,在构建过程中,注册 operationName
呢?答案是不行, 在 《SkyWalking 源码分析 —— Agent DictionaryManager 字典管理》「2.2 操作的同步 API」 中,我们可以看到,operationName
的注册,是异步的过程。因而,即使构建的过程中,调用注册,也无法获得 operationId
。
涉及的逻辑点比较多,如果胖友理解不能,下面我们可以直接看代码。
我们先来看看 API 的定义,TraceSegmentService.proto
,如下图所示:
TraceSegmentServiceHandler#collect(Application, StreamObserver<ApplicationMapping>)
, 代码如下:
ITraceSegmentService#send(UpstreamSegment)
方法,处理一条 TraceSegment 。org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService
,继承 Service 接口,TraceSegment 服务接口。
#send(UpstreamSegment)
接口方法,处理一条 TraceSegment 。org.skywalking.apm.collector.agent.stream.worker.trace.ApplicationIDService
,实现 IApplicationIDService 接口,TraceSegment 服务实现类。
#send(UpstreamSegment)
方法,代码如下:SegmentParse#parse(UpstreamSegment, Source)
方法,解析并处理 TraceSegment 。org.skywalking.apm.collector.agent.stream.parser.SegmentParse
,Segment 解析器。属性如下:
spanListeners
属性,Span 监听器集合。通过不同的监听器,对 TraceSegment 进行构建,生成不同的数据。在 #SegmentParse(ModuleManager)
构造方法 ,会看到它的初始化。segmentId
属性,TraceSegment 编号,即 TraceSegment.traceSegmentId
。timeBucket
属性,第一个 Span 的开始时间。#parse(UpstreamSegment, Source)
方法,解析并处理 TraceSegment 。在该方法里,我们会看到,本文开头提到的【构造】。整个构造的过程,实际分成两步:1)预构建;2)执行构建。代码如下:
segment
参数中,解析出 :traceIds
,关联的链路追踪全局编号。segmentObject
,TraceSegmentObject 对象。#preBuild(List<UniqueId>, SegmentDecorator)
方法,预构建。#writeToBufferFile()
方法,将 TraceSegment 写入 Buffer 文件暂存。为什么会判断 source == Source.Agent
呢?#parse(UpstreamSegment, Source)
方法的调用,共有两个 Source :Source.Agent
。Source.Buffer
,如果不加盖判断,会预构建失败重复写入。false
,表示构建失败。#notifyListenerToBuild()
方法,通知 Span 监听器们,执行构建各自的数据。在 《SkyWalking 源码解析 —— Collector 存储 Trace 数据》 详细解析。buildSegment(id, dataBinary)
方法,执行构建 TraceSegment 。true
,表示构建成功。false
,表示构建失败。#preBuild(List<UniqueId>, SegmentDecorator)
方法,前置构建,用于通过不同的监听器,对 TraceSegment 进行构建,生成不同的数据。在该过程中,会发生我们在文章头所说的,"为什么构建会失败"。代码如下:
segmentId
。#notifyGlobalsListener(...)
方法,使用 GlobalTraceSpanListener 处理链路追踪全局编号数组( TraceSegment.relatedGlobalTraces
)。#notifyRefsListener(...)
方法,使用 RefsListener 处理父 Segment 指向数组( TraceSegment.refs
)。ReferenceIdExchanger#exchange(ReferenceDecorator, applicationId)
方法,将 TraceSegmentRef 未生成编号的属性,进行兑换处理。若兑换失败,返回构造失败。在 「2.3 Standardization 标准化」 详细解析。TraceSegment.spans
属性。#notifyFirstListener(...)
,使用 FirstSpanListener 处理第一个 Span 。#notifyExitListener(...)
,使用 ExitSpanListener 处理。#notifyEntryListener(...)
,使用 EntrySpanListener 处理。#notifyLocalListener(...)
,使用 LocalSpanListener 处理。true
,预构建成功。#writeToBufferFile(id, upstreamSegment)
方法,将 TraceSegment 写入 Buffer 文件。代码如下:
org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardization
对象,并设置 TraceSegment 属性。TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID
对象的 Graph 对象。在 TraceStreamGraph#createSegmentStandardizationGraph()
方法中,我们可以看到,该 Graph 对象只有一个 SegmentStandardizationWorker 。Graph#start(INPUT)
方法,执行该 Graph 实现的流式处理,将 TraceSegment 写到 Buffer 文件。org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker
,继承 AbstractLocalAsyncWorker 抽象类,TraceSegment 标准化 Worker ,负责将接收到的 TraceSegment 异步写到 Buffer 文件。
#id()
实现方法,返回 108 。#onWork(SegmentStandardization)
实现方法,将接收到的 TraceSegment 异步写到 Buffer 文件。。代码如下:SegmentBufferManager#writeBuffer(UpstreamSegment)
方法,将接收到的 TraceSegment 写到 Buffer 文件。在 「3. Buffer 文件」 详细解析。Factory#workerInstance(ModuleManager)
方法,创建 SegmentStandardizationWorker 后,会调用 #startTimer(SegmentStandardizationWorker)
方法,创建定时任务。该定时任务调用 #flushAndSwitch()
方法,定时将 Buffer 文件 flush 。目前 SegmentBufferManager#flush()
是个空方法。为什么不这里不需要 flush 呢?因为 SegmentBufferManager#writeBuffer(UpstreamSegment)
已经进行 flush 。本小节涉及到的类如下图:
我们先来说说,什么叫 standardization 标准化?其实就是我们在文章开头说的"例如将 operationName
转换成 operationId
"。
org.skywalking.apm.collector.agent.stream.parser.standardization.StandardBuilder
,标准化 Builder 接口。
#toBuilder()
接口方法,转换成 Builder 。感觉这个接口方法怪怪的?不要捉急,等会看一个实现类就明白了。StandardBuilder 有三个实现类:
怎么都是装饰者呢,而且恰好和一个数据结构对应?以 SpanDecorator 为例子,代码如下:
spanObject
属性,SpanObject ,Span 的 Protobuf 数据对象。standardBuilder
属性,SpanObject 的 Builder 对象。isOrigin
属性,是否是原始对象。isOrigin = true
,使用 spanObject
属性 。isOrigin = false
,使用 standardBuilder
属性。#setOperationNameId(value)
#getOperationName()
#toBuilder()
实现方法,创建 SpanObject 对应的 Builder ,并修改 isOrigin = false
。另外,会调用 standardBuilder
属性的 #toBuilder()
方法,目前在项目里,此处的 standardBuilder
属性为 SegmentDecorator 。SegmentDecorator 、ReferenceDecorator 和 SpanDecorator 目的一致。
org.skywalking.apm.collector.agent.stream.parser.standardization.IdExchanger
,编号兑换器接口。
#exchange(standardBuilder, applicationId)
接口方法,兑换 standardBuilder 里的属性,并返回是否兑换成功。IdExchanger 有三个实现类:
ReferenceIdExchanger#exchange(standardBuilder, applicationId)
方法,代码如下:
TraceSegmentRef.entryOperationId
为空,将 TraceSegmentRef.entryOperationName
进行兑换。ServiceNameService#getOrCreate(applicationId, serviceName)
方法,根据应用编号和操作名获得或创建操作编号。false
。ReferenceDecorator#toBuilder()
方法,创建 Builder ,然后设置操作编号。TraceSegmentRef.parentApplicationInstanceId
为空,将 TraceSegmentRef.parentOperationName
进行兑换。TraceSegmentRef.entryOperationName
为空,将 TraceSegmentRef.peerHost
进行兑换。在【第 93 行】,我们可以看到,调用 ApplicationIDService#getOrCreate(applicationCode)
方法,将服务地址作为 applicationCode
使用。SpanIdExchanger#exchange(standardBuilder, applicationId)
方法,类似,已经添加代码注释,胖友自己阅读理解。
本小节涉及到的类如下图:
我们先来看看 Buffer 包括哪些文件:
yunai$ pwd
/Users/yunai/Java/buffer
yunai$ ls
data_20171205004132.sw offset_20171205004132.sw
类型_${时间}.sw
,并且相同类型,同时可以存在多个。org.skywalking.apm.collector.agent.stream.buffer.BufferFileConfig
,Buffer 文件配置 。
org.skywalking.apm.collector.agent.stream.buffer.Offset
,偏移 。
下面,我们来一起看看 Buffer 文件的初始化、写入、读取的三种操作过程。
SegmentBufferManager#initialize(ModuleManager)
方法,初始化 Offset 文件、Data 文件、定期读取 Buffer 文件的任务。代码如下:
OffsetManager#initialize()
方法,初始化 Offset 文件。#newDataFile()
,创建 Data 文件。代码如下:data_${yyyyMMddHHmmss}.sw
。OffsetManager#setWriteOffset(writeFileName, writeFileOffset)
方法,设置 Offset 的写入的文件名和偏移。outputStream
。outputStream
。outputStream
。SegmentBufferReader#initialize(ModuleManager)
方法,初始化定期读取 Buffer 文件的任务。OffsetManager#initialize()
方法,初始化 Offset 文件。代码如下:
#createOffsetFile()
,创建 Data 文件。代码如下:Offset#serialize()
方法,序列化读写偏移,格式为 ${读取文件名},${读取文件偏移量},${写入文件名},${写入文件偏移量}
。offset_${yyyyMMddHHmmss}.sw
。#flush()
方法,写入 Offset 对象到 Offset 文件。代码如下:#createOffsetFile()
方法,创建新的 Offset 文件。#flush()
方法,定时写入 Offset 对象到 Offset 文件。注意,所以 Offset 改变时,不是立即写入 Offset 文件,而是周期性刷盘。SegmentBufferReader#initialize(ModuleManager)
方法,初始化定期读取 Buffer 文件的任务。代码如下:
#preRead()
方法,读取 Buffer 文件,将 TraceSegment 提交给 SegmentParse 重新解析与构建处理。SegmentBufferManager#writeBuffer(UpstreamSegment)
方法,将 TraceSegment 写入 Buffer 文件,包括两个步骤:1)将 TraceSegment 写入 Data 文件;2)更新 Offset 文件的偏移。代码如下:
AbstractMessageLite#writeDelimitedTo(OutputStream)
方法,将 TraceSegment 写入 Data 文件。该方法包括 flush 操作,代码如下:#newDataFile()
,创建 Data 文件。OffsetManager#setWriteOffset(position)
方法,设置 Offset 对象的写入偏移。SegmentBufferReader#preRead()
方法,读取 Buffer 文件,将 TraceSegment 提交给 SegmentParse 重新解析与构建处理。另外该方法,会删除已经读取完成的 Data 文件。代码如下:
#deleteTheDataFilesBeforeReadFile(readFileName)
方法,删除比指定文件早创建的 Data 文件,基于文件名带有创建时间。#read()
方法,读取 Buffer 文件,将 TraceSegment 提交给 SegmentParse 重新解析与构建处理。另外,返回 true
,文件被全部读取完成、处理并删除。返回 false
,文件未被全部读取完成。false
,结束循环,等待下次读取处理。false
。true
,文件被全部读取完成、处理并删除。#readEarliestCreateDataFile()
方法,循环顺序读取 Data 文件,直到有一个没读完。#read()
方法里,没有读完。#read(readFile, readFileOffset)
方法,读取 Data 文件,直到有一个没读完。#deleteTheDataFilesBeforeReadFile(readFileName)
方法,删除比指定文件早创建的 Data 文件。#readEarliestCreateDataFile()
方法,循环顺序读取 Data 文件,直到有一个没读完。#readEarliestCreateDataFile()
方法,循环顺序读取 Data 文件,直到有一个没读完。d