专栏首页芋道源码1024链路追踪 SkyWalking 源码分析 —— Collector Queue 队列组件

链路追踪 SkyWalking 源码分析 —— Collector Queue 队列组件

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-queue-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!


1. 概述

本文主要分享 SkyWalking Collector Queue Module,队列组件。该组件被 Collector Streaming Module 流式处理使用,提供异步执行的特性。

友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。

Cluster Module 在 SkyWalking 架构图处于如下位置( 红框 ) :

FROM https://github.com/apache/incubating-skywalking

下面我们来看看整体的项目结构,如下图所示 :

  • collector-queue-define :定义队列组件接口。
  • collector-queue-datacarrier-provider :基于 apm-datacarrier 的队列组件实现。目前暂未完成。
  • collector-queue-zookeeper-provider :基于 Disruptor 的队列组件实现。

下面,我们从接口到实现的顺序进行分享。

2. collector-queue-define

collector-queue-define :定义队列组件接口。项目结构如下 :

2.1 QueueModule

org.skywalking.apm.collector.queue.QueueModule ,实现 Module 抽象类,队列 Module 。

#name() 实现方法,返回模块名为 "queue"

#services() 实现方法,返回 Service 类名:QueueCreatorService 。

2.2 QueueCreatorService

org.skywalking.apm.collector.queue.service.QueueCreatorService ,继承 Service 接口,队列创建服务接口

#create(queueSize, executor) 接口方法,创建队列处理器。

  • 一般情况下,实现该接口方法,调用 `org.skywalking.apm.collector.queue.base.QueueCreator#create(queueSize, executor)` 方法,创建队列处理器。

2.3 MessageHolder

org.skywalking.apm.collector.queue.base.MessageHolder ,消息持有者。

  • `message` 属性,持有的消息。
  • `#reset()` 方法,清空消息。为什么会有这个方法,下文胖友会看到。

2.4 QueueEventHandler

org.skywalking.apm.collector.queue.base.QueueEventHandler,队列处理器接口。它定义了 #tell(message) 接口方法,输入消息给自己。最终,QueueEventHandler 会"提交"消息给 org.skywalking.apm.collector.queue.base.QueueExecutor,执行处理该消息。

LocalAsyncWorkerRef 实现 QueueEventHandler 接口,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3.1.2 LocalAsyncWorkerRef」 有详细解析。

2.5 DaemonThreadFactory

org.skywalking.apm.collector.queue.base.DaemonThreadFactory,守护进程线程工厂,被用于创建消息处理器的线程。

3. collector-queue-disruptor-provider

collector-queue-disruptor-provider ,基于 Disruptor 的队列组件实现。

项目结构如下 :

默认配置,在 application-default.yml 已经配置如下:

JSON queue: disruptor:

3.1 QueueModuleDisruptorProvider

org.skywalking.apm.collector.queue.disruptor.CQueueModuleDisruptorProvider ,实现 ModuleProvider 抽象类,基于 Disruptor 的队列服务提供者。

#name() 实现方法,返回组件服务提供者名为 "disruptor"

module() 实现方法,返回组件类为 QueueModule 。

#requiredModules() 实现方法,返回依赖组件为空。


#prepare(Properties) 实现方法,执行准备阶段逻辑。

  • 第 44 行 :创建 DisruptorQueueCreatorService 对象,并调用 #registerServiceImplementation() 父类方法,注册到 services

#start() 实现方法,方法为空。

#notifyAfterCompleted() 实现方法,方法为空。

2.2 DisruptorQueueCreatorService

org.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService,实现 QueueCreatorService 接口,基于 Disruptor 的队列创建服务实现类

#create(queueSize, executor) 实现方法,调用 DisruptorQueueCreator#register(queueSize, executor) 方法,创建队列处理器。

3.2.1 DisruptorQueueCreator

友情提示:如果胖友对 Disruptor 暂时不了解,建议先使用 Disruptor 写个小 Demo 。 如下是笔者阅读的文章:

  • 《三步创建Disruptor应用》
  • 《Disruptor入门》
  • 《剖析Disruptor:为什么会这么快?(一)Ringbuffer的特别之处》

org.skywalking.apm.collector.queue.disruptor.base.DisruptorQueueCreator ,实现 QueueCreator 接口,基于 Disruptor 的队列创建器实现类

#create(queueSize, executor) 实现方法,代码如下:

  • 第 42 至 45 行:校验队列大小为 2 的指数,否则创建 Disruptor 对象会报 "bufferSize must be a power of 2" 的异常,参见 AbstractSequencer 的代码。
  • 第 49 行:创建 Disruptor 对象。
    • `org.skywalking.apm.collector.queue.disruptor.base.MessageHolderFactory` ,MessageHolder 工厂
  • 第 51 至 64 行:设置 Disruptor 对象的默认异常处理器
  • 第 67 至 70 行:创建 DisruptorEventHandler 对象,并设置为 Disruptor 对象的事件处理器
  • 第 74 行:启动 Disruptor 对象。

为什么 Disruptor 要求队列大小为 2 的指数呢?如下是相关资料,感兴趣的同学可以看看( 可跳过 ):

  • FROM 《环形缓冲器》

  • SingleProducerSequencer#hasAvailableCapacity(requiredCapacity) 方法,代码如下:

3.3 DisruptorEventHandler

org.skywalking.apm.collector.queue.disruptor.base.DisruptorEventHandler ,基于 Disruptor 的队列处理器实现类

  • `ringBuffer` 属性,Disruptor RingBuffer 对象。
  • `executor` 属性,执行器。
  • 实现 org.skywalking.apm.collector.queue.base.QueueEventHandler 接口 的 `#tell(message)` 接口方法,标准的 Disruptor 发布事件的代码。
  • 实现 `com.lmax.disruptor.EventHandler` 的 `#onEvent(event, sequence, endOfBatch)` 接口方法,代码如下:
    • `endOfBatch` 方法参数,标记该事件( 消息 )是否是 Disruptor 每次批处理的最后一个事件。胖友可以参见 《LMAX Disruptor - what determines the batch size?》 这篇文章,自己搭建一个 Demo 理解下该参数。
    • 第 66 行:调用 `MessageHolder#reset()` 方法,清空消息,因为在 Disruptor RingBuffer 里,事件( 消息 )对象是重用的,虽然后续发布事件( 消息 )可以进行覆盖,考虑到安全性进行清空。
    • 第 69 行:设置消息为该批量的结尾( 最后一条 )。为什么?在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「3. AggregationWorker」 揭晓答案。
    • 第 72 行:调用 `QueueExecutor#execute(message)` 方法,执行处理消息。

4. collector-queue-datacarrier-provider

collector-queue-datacarrier-provider :基于 apm-datacarrier 的队列组件实现。

目前暂未完成。


已在知识星球更新源码解析如下:

本文分享自微信公众号 - 芋道源码(javayuanma)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-05-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

推荐阅读

  • 远程办公经验为0,如何将日常工作平滑过度到线上?

    我是一名创业者,我的公司(深圳市友浩达科技有限公司)在2018年8月8日开始运营,现在还属于微型公司。这个春节假期,我一直十分关注疫情动向,也非常关心其对公司带来的影响。

    TVP官方团队
    TAPD 敏捷项目管理腾讯乐享企业邮箱企业编程算法
  • 数据中台,概念炒作还是另有奇效? | TVP思享

    作者简介:史凯,花名凯哥,腾讯云最具价值专家TVP,ThoughtWorks数据智能业务总经理。投身于企业数字化转型工作近20年。2000年初,在IBM 研发企业级中间件,接着加入埃森哲,为大型企业提供信息化架构规划,设计,ERP,云平台,数据仓库构建等技术咨询实施服务,随后在EMC负责企业应用转型业务,为企业提供云迁移,应用现代化服务。现在专注于企业智能化转型领域,是数据驱动的数字化转型的行业布道者,数据中台的推广者,精益数据创新体系的创始人,2019年荣获全球Data IQ 100人的数据赋能者称号,创业邦卓越生态聚合赋能官TOP 5。2019年度数字化转型专家奖。打造了行业第一个数据创新的数字化转型卡牌和工作坊。创建了精益数据创新方法论体系构建数据驱动的智能企业,并在多个企业验证成功,正在向国内外推广。

    TVP官方团队
    大数据数据分析企业
  • 扩展 Kubernetes 之 CRI

    使用 cri-containerd 的调用流程更为简洁, 省去了上面的调用流程的 1,2 两步

    王磊-AI基础
    Kubernetes
  • 扩展 Kubernetes 之 Kubectl Plugin

    kubectl 功能非常强大, 常见的命令使用方式可以参考 kubectl --help,或者这篇文章

    王磊-AI基础
    Kubernetes
  • 多种登录方式定量性能测试方案

    最近接到到一个测试任务,某服务提供了两种登录方式:1、账号密码登录;2、手机号+验证码登录。要对这两种登录按照一定的比例进行压测。

    八音弦
    测试服务 WeTest
  • 线程安全类在性能测试中应用

    首先验证接口参数签名是否正确,然后加锁去判断订单信息和状态,处理用户增添VIP时间事务,成功之后释放锁。锁是针对用户和订单的分布式锁,使用方案是用的redis。

    八音弦
    安全编程算法
  • 使用CDN(jsdelivr) 优化博客访问速度

    PS: 此篇文章适用于 使用 Github pages 或者 coding pages 的朋友,其他博客也类似.

    IFONLY@CUIT
    CDNGitGitHub开源
  • 扩展 Kubernetes 之 CNI

    Network Configuration 是 CNI 输入参数中最重要当部分, 可以存储在磁盘上

    王磊-AI基础
    Kubernetes
  • 聚焦【技术应变力】云加社区沙龙online重磅上线!

    云加社区结合特殊时期热点,挑选备受关注的音视频流量暴增、线下业务快速转线上、紧急上线防疫IoT应用等话题,邀请众多业界专家,为大家提供连续十一天的干货分享。从视野、预判、应对等多角度,帮助大家全面提升「技术应变力」!

    腾小云
  • 京东购物小程序购物车性能优化实践

    它是小程序开发工具内置的一个可视化监控工具,能够在 OS 级别上实时记录系统资源的使用情况。

    WecTeam
    渲染JavaScripthttps网络安全缓存

扫码关注云+社区

领取腾讯云代金券