上一篇把 Executor 注册到 Driver 的过程进行了详尽的描述。并且把四次往复的过程用图和代码都做了说明,虽然后面的注册 Executor 的部分没有详细再画图,但是起过程和第一次确认 Driver 端服务的过程大体相同,如有问题可以给我留言我们来互动沟通。
上述过程中,在 Executor 的 client 端是如何构建了 socket,如何发送的请求,这部分细节是本章要探讨的主要内容。
这部分内容,其实是我第一篇 Spark 的源代码文章中就讲过的,但是当时讲的方法有点啰嗦,很多同学看完告诉我还是太硬核了,参考这里。当时还手工的画了一个下图。
所以我考虑再三后,重新把 spark 的内容进行了梳理后,写的现在的大画 spark 系列。本次再讲解的话,主要还是注重画图,减少大幅贴代码。
这个过程中,有两个地方相对优雅一些
send
处理来poll
出然后进行发送处理。在这个过程中,第一次连接服务端,则会初始化 client,即懒加载,并且,初始化 client 的过程也是一个异步的过程。会有一个专门的线程服务来进行 client 的初始化,当初始化结束后,会持续之前的发送流程,直到整个发送结束。如上图所示,整个过程可以看作是 1-16 个过程。我举的例子是第一次连接 Driver 的场合,所以这 16 个步骤都会走到,而连接一次以后,后续因为 client 已经存在,初始化 client 的操作就会被省略。
CoarseGrainedExecutorBackend
的onStart
方法,这个在上一篇中有介绍NettyRpcEnv
的asyncSetupEndpointRefByURI
方法EndpointRef
进行调用,所有的EndpointRef
都是NettyRpcEndpointRef
的实例,只是传入的Endpoint
的 name 不同,通过这个 name 会在目标的 server 去匹配到底哪一个Endpoint
的实例解析相应的请求NettyRpcEndpointRef
中会调用回NettyRpcEnv
的ask
方法postToOutbox
的方法,这个方法通过名字也可以看出,其引出了一个新的结构体,就是OutBox
,所有从 client 端发出去的消息都是通过这个OutBox
发出去的send
的流程开始的drainOutbox
的方法内,这个方法顾名思义,就是要开始对Outbox
中的消息进行发送操作OutBox
中持有的 client 变量是否为null
,这个 client 变量即TransportClient
的实例,当第一次进入到这里的时候,client 一定是 null,所以判断 is null 一定是 yes,所以会走到第 9 步nettyEnv.*clientConnectionExecutor*.submit
去启动一个线程 task 来去做 client 的初始化,而原来的线程的操作会被 returnNettyRpcEnv
的createClient
方法,从而可以初始化出一个 client,也就是TransportClient
的实例drainOutbox
方法,即和步骤 7 调用的一致RpcOutboxMessage
的sendWith
的方法TransportClient
的sendRpc
方法,通过这个方法, 可以利用 Netty 的 Channel 把消息发送出去,并且保存发总的requestId
,方便收到response
的时候判断是之前哪一个 reqeust 的消息,从而回掉发送消息时缓存的callback
方法,从而完成一次一来一回的发送接收过程通过以上的过程,Executor 就建立起了和 Driver 端的连接,通过这个连接,后续可以继续通过上述的 4 的ask
方法来发送RpcRequestMessage
,如果是发送的OneWayMessage
,则使用send
方法。从英文中的 ask 和 send 也可以分辨出来,ask 是询问,需要 answer,而 send 只是发送,不需要回答,所以从方法名也可以看出是否需要 response。具体RpcRequestMessage
与OneWayMessage
的区别参考这里
在 spark 中的 client,具有双重含义。
并不是只在类似于 Executor 的 Client 端存在,而是
TransportClient
TransportClient
上面要如何理解呢?首先要明确以下几个逻辑概念
TransportClient
,并不是一个只存在于物理意义上的 client 的 java 的 class,而是,在物理上 client 与 server 中都存在,目的是握取操作系统底层的对外联通的channel
,通过TransportClient
找到channel
,进而向外发送出消息TransportClient
并没有被初始化,而是在我上一节讲解当中描述的,会有一个 lazy 实例化的过程TransportClient
的实例化是根据每连上一个物理 client 而动态创建出来的。这里需要一些 Netty 与底层网络的基础知识了,我们不去深究,暂时记住这一点即可,可以参考下面的图来理解首先,通过这个图,我们先 High level 的理解一下数据传输的过程,以及 channel 的构建过程
TransportClient
构建出自己的 channel 联通 server 端知道了以上的知识基础后,我们会发现,在 client 与 server 都存在 channel,在 client 端 channel 是属于TransportClient
的,在 server 端是如何操作的呢?
答案是,在 server 端也是TransportClient
持有 channel,这里,spark 做的还是比较优雅,它的TransportClient
是公共的,无论是 client 还是 server 都有TransportClient
,其中都有 channel,它的逻辑含义都是通过TransportClient
所持有的 channel 可以相互通信,而这个 channel 和TransportClient
又是如何配合整体的架构体系所存在的呢,继续画图
**TransportClient
与 channel 是如何在大框架中存在的**
**TransportClient**
来持有 channel 来给对方发送消息,主要注意的是,消息并不是只有 client 端发送给 server 端,一旦 connection 联通后,双方是可以对等的给对方发送消息的,这是网路底层的基础原理TransportChannelHandler
来进行第一步的分发处理再借助一个以前的图来加深印象,在 driver 代表的 Physical server 与 Executor 代表的 Physical client 的构建与结构,可以看到,双方其实除了在底层网络的 server 层面有差异,其实其余部分大体相同
executor
发送消息出来到driver
通过executor
的TransportClient
把消息放入channel
,发送出来driver
内的流转和转换,最终来到处理消息的Endpoint
driver
发送回client
,通过载driver
中掌握的executor
的EndpointRef
的TransportClient
把消息放入channel
,从而发给executor
driver
到底是如何获取到的executor
的EndpointRef
的,或者说,最终是通过TransportClient
发送的response
回去,这个TransportClient
与channel
是如何与EndpointRef
集成到一起的呢?下一篇会详尽描述DriverEndpoint
主动发出调用了持有的executor
的EndpointRef
EndpointRef
的TransportClient
把消息放入channel
,从而发给executor
,executor
接到后调用相应的callback
处理本篇其实和上一篇差不多,是上一篇的概括版,把TransportClient
的逻辑细节又做了详尽的阐述。很多开发同学都会把自己陷入到网络就是 http 协议的怪圈中去,其实底层的网络 client 和 server 一旦联通之后,双方的逻辑是相同的,你可以把它比做是 websocket 协议,client→server 与 server→client 是对等的。基于此,在 spark 中,executor(client 端)主动向 driver(server 端)发送注册申请,注册之后,driver 获取了 executor 的连接(TransportClient 与 channel),才可能通过 driver(server 端)来下发任务给 executor(client 端),这块的细节,下一篇继续硬核攻击。
领取专属 10元无门槛券
私享最新 技术干货