本文主要基于 SkyWalking 3.2.6 正式版
本文主要分享 SkyWalking Collector Remote 远程通信服务。该服务用于 Collector 集群内部通信。
目前集群内部通信的目的,跨节点的流式处理。Remote Module 应用在 SkyWalking 架构图如下位置( 红框 ) :
FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构,如下图所示 :
collector-remote-define
:定义远程通信接口。collector-remote-kafka-provider
:基于 Kafka 的远程通信实现。目前暂未完成。collector-remote-grpc-provider
:基于 Google gRPC 的远程通信实现。生产环境目前使用下面,我们从接口到实现的顺序进行分享。
collector-remote-define
:定义远程通信接口。项目结构如下 :
整体流程如下图:
我们按照整个流程的处理顺序,逐个解析涉及到的类与接口。
org.skywalking.apm.collector.remote.RemoteModule
,实现 Module 抽象类,远程通信 Module 。
#name()
实现方法,返回模块名为 "remote"
。
#services()
实现方法,返回 Service 类名:RemoteSenderService 、RemoteDataRegisterService 。
org.skywalking.apm.collector.remote.service.RemoteSenderService
,继承 Service 接口,远程发送服务接口,定义了 #send(graphId, nodeId, data, selector)
接口方法,调用 RemoteClient ,发送数据。
graphId
方法参数,Graph 编号。通过 graphId
,可以查找到对应的 Graph 对象。nodeId
方法参数,Worker 编号。通过 workerId
,可以查找在 Graph 对象中的 Worker 对象,从而 Graph 中的流式处理。data
方法参数,Data 数据对象。例如,流式处理的具体数据对象。selector
方法参数,org.skywalking.apm.collector.remote.service.Selector
选择器对象。根据 Selector 对象,使用对应的负载均衡策略,选择集群内的 Collector 节点,发送数据。Remote
和 Local
两种方式。前者,发送数据到远程的 Collector 节点;后者,发送数据到本地,即本地处理,参见 RemoteWorkerRef#in(message)
方法。org.skywalking.apm.collector.remote.service.RemoteClientService
,继承 Service 接口,远程客户端服务接口,定义了 #create(host, port, channelSize, bufferSize)
接口方法,创建 RemoteClient 对象。
org.skywalking.apm.collector.remote.service.RemoteClient
,继承 java.lang.Comparable
接口,远程客户端接口。定义了如下接口方法:
#push(graphId, nodeId, data, selector)
接口方法,发送数据。#getAddress()
接口方法,返回客户端连接的远程 Collector 地址。#equals(address)
接口方法,判断 RemoteClient 是否连接了指定的地址。在说 CommonRemoteDataRegisterService 之前,首先来说下 CommonRemoteDataRegisterService 的意图。
在上文中,我们可以看到发送给 Collector 是 Data 对象,而 Data 是数据的抽象类,在具体反序列化 Data 对象之前,程序是无法得知它是 Data 的哪个实现对象。这个时候,我们可以给 Data 对象的每个实现类,生成一个对应的数据协议编号。
org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService
,通用远程数据注册服务。
id
属性,数据协议自增编号。dataClassMapping
属性,数据类型( Class<? extends Data> )与数据协议编号的映射。dataInstanceCreatorMapping
属性,数据协议编号与数据对象创建器( RemoteDataInstanceCreator )的映射。org.skywalking.apm.collector.remote.service.RemoteDataRegisterService
,继承 Service 接口,远程客户端服务接口,定义了 #register(Class<? extends Data>, RemoteDataInstanceCreator)
接口方法,注册数据类型对应的远程数据创建器( RemoteDataRegisterService.RemoteDataInstanceCreator
)对象。
CommonRemoteDataRegisterService 实现了 RemoteDataRegisterService 接口,#register(Class<? extends Data>, RemoteDataInstanceCreator)
实现方法。
另外,AgentStreamRemoteDataRegister 会调用 RemoteDataRegisterService#register(Class<? extends Data>, RemoteDataInstanceCreator)
方法,注册每个数据类型的 RemoteDataInstanceCreator 对象。注意,例如 Application::new
是 RemoteDataInstanceCreator 的匿名实现类。
org.skywalking.apm.collector.remote.service.RemoteDataIDGetter
,继承 Service 接口,远程数据协议编号获取器接口,定义了 #getRemoteDataId(Class<? extends Data>)
接口方法,根据数据类型获取数据协议编号。
CommonRemoteDataRegisterService 实现了 RemoteDataIDGetter 接口,#getRemoteDataId(Class<? extends Data>)
实现方法。
org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter
,继承 Service 接口,远程数据创建器的获取器接口,定义了 #getInstanceCreator(remoteDataId
接口方法,根据数据协议编号获得远程数据创建器( RemoteDataInstanceCreator )。
CommonRemoteDataRegisterService 实现了 RemoteDataInstanceCreatorGetter 接口,#getInstanceCreator(remoteDataId)
实现方法。
org.skywalking.apm.collector.remote.service.RemoteSerializeService
,远程通信序列化服务接口,定义了 #serialize(Data)
接口方法,序列化数据,生成 Builder 对象。
org.skywalking.apm.collector.remote.service.RemoteDeserializeService
,远程通信序反列化服务接口,定义了 #deserialize(RemoteData, Data)
接口方法,反序列化传输数据。
collector-remote-grpc-provider
,基于 Google gRPC 的远程通信实现。
项目结构如下 :
默认配置,在 application-default.yml
已经配置如下:
remote:
gRPC:
host: localhost
port: 11800
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider
,实现 ModuleProvider 抽象类,基于 gRPC 的组件服务提供者实现类。
#name()
实现方法,返回组件服务提供者名为 "gRPC"
。
module()
实现方法,返回组件类为 RemoteModule 。
#requiredModules()
实现方法,返回依赖组件为 cluster
、gRPC_manager
。
#prepare(Properties)
实现方法,执行准备阶段逻辑。
#registerServiceImplementation()
父类方法,注册到 services
。#start()
实现方法,执行启动阶段逻辑。
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration
对象,将自己注册到集群管理。这样,自己可以被 Collector 集群节点发现,从而被调用。#notifyAfterCompleted()
实现方法,方法为空。
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService
,继承 ClusterModuleListener 抽象类,实现 RemoteSenderService 接口,基于 gPRC 的远程发送服务实现类。
通过继承 ClusterModuleListener 抽象类,实现了监听 Collector 集群节点的加入或离开。
remoteClients
属性,连接 Collector 集群节点的客户端数组。每个 Collector 集群节点,对应一个客户端。#path()
实现方法,返回监听的目录 "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME
。Collector 集群中,每个节点的 Remote Server 都会注册到该目录下。#serverJoinNotify(serverAddress)
实现方法,当新的节点加入,创建新的客户端连接。#serverQuitNotify(serverAddress)
实现方法,当老的节点离开,移除对应的客户端连接。RemoteModuleGRPCProvider 基于不同的选择器 ( Selector ) ,提供不同的客户端选择( org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector
)实现 :
hashCodeSelector
属性,HashCodeSelector ,基于数据的哈希码。foreverFirstSelector
属性,ForeverFirstSelector ,基于客户端数组的顺序,选择第一个。rollingSelector
属性,RollingSelector ,基于客户端数组的顺序,顺序向下选择。#send(graphId, nodeId, data, selector)
方法,代码如下:RemoteWorkerRef#in(message)
方法。RemoteClient#push(graphId, nodeId, data)
方法,发送数据。RemoteClientSelector#select(clients, data)
方法,选择客户端。#sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data)
方法,发送请求数据。org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService
,实现 RemoteClientService 接口,基于 gRPC 的远程客户端服务实现类。
#create(host, port, channelSize, bufferSize)
实现方法,创建 GRPCRemoteClient 对象。
友情提示:本小节会涉及较多 gRPC 相关的知识,建议不熟悉的胖友自己 Google ,补充下姿势。
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient
,实现 RemoteClient 接口,基于 gRPC 的远程客户端实现类。
client
属性,GRPCClient 对象。相比来说,GRPCRemoteClient 偏业务的封装,内部调用 GRPCClient 对象。carrier
属性,DataCarrier 对象,本地消息队列。GRPCRemoteClient 在被调用发送数据时,先提交到本地队列,异步消费进行发送到远程 Collector 节点。DataCarrier 在 《SkyWalking 源码分析 —— DataCarrier 异步处理库》 详细解析。DataCarrier#consume(IConsumer, num)
方法,设置消费者为 RemoteMessageConsumer 对象。#push(graphId, nodeId, data)
实现方法,异步发送消息到远程 Collector 。
RemoteDataIDGetter#getRemoteDataId(Class<? extends Data>)
方法,获得数据协议编号。DataCarrier#produce(data)
方法,发送数据到本地队列。RemoteMessageConsumer ,批量消费本地队列的数据,逐条发送数据到远程 Collector 节点。
#consume(List<RemoteMessage>)
实现方法,代码如下:io.grpc.stub.StreamObserver#onNext(RemoteMessage)
方法,逐条发送数据。io.grpc.stub.StreamObserver#onCompleted()
方法,全部请求数据发送完成。org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler
,实现 org.skywalking.apm.collector.server.grpc.GRPCHandler
接口,继承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象类,远程通信通用逻辑处理器。
其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在 RemoteCommonService.proto
文件的定义如下图:
#call(StreamObserver<Empty>)
实现方法,代码如下:
#onNext(RemoteMessage)
方法,处理每一条消息,代码如下:RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId)
方法,获得数据协议编号对应的 RemoteDataInstanceCreator 对象。然后,调用 RemoteDataInstanceCreator#createInstance(id)
方法,创建数据协议编号对应的 Data 实现类对应的对象。GraphManager#findGraph(graphId)
方法,获得 graphId
对应的 Graph 对象。然后,调动 GraphNodeFinder#findNext(nodeId)
方法,获得 Next 对象。Next#execute(Data)
方法,继续流式处理。org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService
,实现 RemoteSerializeService 接口,基于 gRPC 的远程通信序列化服务实现类。
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService
,实现 GRPCRemoteDeserializeService 接口,基于 gRPC 的远程通信反序列化服务实现类。
collector-remote-kafka-provider
:基于 Kafka 的远程通信实现。
目前暂未完成。