前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 源码(2) - Spark Rpc 三剑客的理解

Spark 源码(2) - Spark Rpc 三剑客的理解

作者头像
kk大数据
发布2021-10-12 12:49:12
6350
发布2021-10-12 12:49:12
举报
文章被收录于专栏:kk大数据kk大数据

一、Spark Rpc 三剑客

谈到 Spark Rpc ,不得不提到 Spark Rpc 的三剑客:RpcEnv,RpcEndpoint,RpcEndpointRef。

如何理解他们呢,请看下图:

精简一下发送消息的过程为:通过 RpcEndpointRef 来发送一个消息给 RpcEnv,然后经过 Dispatcher 和 Inbox 的共同处理,发送给 RpcEndpoint 来处理(当前中间的过程比较复杂!)。

那什么是 Endpoint ,什么是 EndpointRef ?

Endpoint 很好理解,可以理解为是分布式环境的一个个节点。

EndpointRef,相当于是对其他节点的引用,比如:国家 A 在国家 B 设立了一个大使馆,里面住着一位大使。如果国家 B 想和国家 A 通信,那么只要和国家 A 的大使通信即可,不用千里迢迢跑去国家 A 。那么这里国家A的大使就是 EndpointRef ,endpointRef.send(MESSAGE),就可以往国家 A 发送一个消息。

在 Spark 源码中,Worker 在启动完成之后,要向 Master 注册自己,那么注册的时候,就是用 Rpc 通信的,首先需要拿到 Master 的一个引用,然后发送一个注册消息:

代码语言:javascript
复制
// 先拿到 master 的引用
val masterEndpointRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// 然后发送一个消息
masterEndpointRef.send(RegisterWorker(...))

最后值得一提的是 Endpoint 的生命周期:

如果要使用 Endpoint ,首先需要往 RpcEnv 注册,注册之后会依次自动调用 Endpoint 的 构造函数 -> onStart() 方法 -> receive* -> onStop() 方法。

二、通过 Master 的启动来理解 Spark Rpc

下面我们通过 Master 的启动流程,来理解 RPCEndpoint 的注册与启动。

(1)启动脚本

在启动 Spark 集群的时候,一般是使用启动脚本:start-all.sh 来启动集群,这个脚本会分别启动 Master 和 Worker,如下:

代码语言:javascript
复制
# Start Master
"${SPARK_HOME}/sbin"/start-master.sh

# Start Workers
"${SPARK_HOME}/sbin"/start-workers.sh

然后在 start-master.sh 中,最终会执行 Master 这个类:

代码语言:javascript
复制
CLASS="org.apache.spark.deploy.master.Master"
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
  --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
  $ORIGINAL_ARGS

可以看到是用 spark-daemon.sh 来启动 org.apache.spark.deploy.master.Master 这个类,那我们直接看 Master 的 main 方法

(2)NettyRpc 服务端的启动

main 方法中可以看到,这里在启动 RpcEnv 和 Endpoint,做了两件事情:

代码语言:javascript
复制
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)

首先创建 RpcEnv:

代码语言:javascript
复制
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

可以看到最终是用 NettyRpcEnvFactory 工厂和 RpcEnvConfig 来创建的

代码语言:javascript
复制
 new NettyRpcEnvFactory().create(config)

然后到了 NettyRpcEnv 类中

阅读源码小技巧:在阅读 scala 代码时,如果见到 new 一个对象,那么小括号中的是成员变量,大括号中的变量声明、代码块,静态代码块全部是构造方法,都会执行,方法的定义不会执行,如下:

在 new NettyRpcEnv 时,小括号中的都是这个类的成员变量,大括号的:

这些代码都会执行的。

可以看到在 new NettyRpcEnv 的时候,创建了 transportConf 和 transportContext 对象。

然后接着启动了 NettyRpcEnv

代码语言:javascript
复制
nettyEnv.startServer(config.bindAddress, actualPort)

启动的时候,是用上面创建好的 TransportContext 来创建了一个 Server

然后点进去可以看到有个 init 方法

这里就是用 Netty 的 Api 正式的启动了一个 服务端

首先创建 BootStrap 引导器:

完了之后,绑定了端口,就完成了 Netty 服务器的启动:

(3)注册 Endpoint

然后再回到 Master 的 startRpcEnvAndEndpoint 方法中来

此时,RpcEnv 已经创建好了,也启动了一个服务端了:

然后下面就是把自己作为 Endpoint 注册到 RpcEnv 中来,因为 Master 本身就继承 了 Endpoint 接口:

(4)调用生命周期的 onStart() 方法

然后就是调用 Master 的 onStart() 方法,这里面主要做了下面几件事情:

首先启动了 Master 的 webUi:

然后给自己每隔几秒钟就发送一个 CheckForWorkerTimeOut 消息:

既然是给自己发送了一个消息,那么必然有处理消息的地方,然后我们就可以 Master 类中搜索:case CheckForWorkerTimeOut,来看具体是怎么处理这个消息的:

在这个方法中可以看到,会遍历 workers 这个列表,超过一定时间没有心跳的,就把状态改为 DEAD,并且移除

再然后启动了 masterMetricsSystem 这个东西,不重要。

再然后启动了一个持久化引擎和选举代理:

这个我们就下次再详细的讲。

到此为止,Master 就启动完毕了

三、小结

本篇我们通过 Master 的启动,介绍了 Spark Rpc 相关的源码,介绍了 RpcEndpoint 和 RPCEndpointRef,相信各位小伙伴已经对 Spark Rpc 有了初步的认知了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Spark Rpc 三剑客
  • 二、通过 Master 的启动来理解 Spark Rpc
    • (1)启动脚本
      • (2)NettyRpc 服务端的启动
        • (3)注册 Endpoint
          • (4)调用生命周期的 onStart() 方法
          • 三、小结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档