前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 源码(4) - Worker 启动流程

Spark 源码(4) - Worker 启动流程

作者头像
kk大数据
发布2021-10-12 12:40:48
4210
发布2021-10-12 12:40:48
举报
文章被收录于专栏:kk大数据kk大数据

一、Worker 启动

今天来看看 Worker 的启动流程,Worker 的启动是从 Shell 脚本开始的,Shell 脚本中就是从 Worker 类的 main 方法开始执行的,所以就从 main 方法开始看。

最主要的是启动了 RpcEnv 和 Endpoint,Worker 本身就是一个 Endpoint,它继承了 ThreadSafeRpcEndpoint 类。

所以下一步自然是去看 Endpoint 的声明周期方法 onStart()

首先创建了工作目录,就是从配置中拿到目录信息,然后创建它

代码语言:javascript
复制
createWorkDir()

然后启动了 Shuffle 服务,这个以后在讲 Shuffle 那块的时候,再来看,先略过:

代码语言:javascript
复制
startExternalShuffleService()

再然后检查了 Worker 的资源是否满足要求:

代码语言:javascript
复制
setupWorkerResources()

启动了 WebUI

代码语言:javascript
复制
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = s"${webUi.scheme}$publicAddress:${webUi.boundPort}"

最后做了一件重要的事情,向 Master 注册自己:

代码语言:javascript
复制
registerWithMaster()

首先启动一个线程,在线程中,首先拿到 Master 的 EndpointRef

然后向 Master 发送一个 RegisterWorker 的消息

代码语言:javascript
复制
masterEndpoint.send(RegisterWorker(
    workerId,
    host,
    port,
    self,   //  Worker 这个 RpcEndpoint 组件的 RpcEndpointRef 对象
    cores,
    memory,
    workerWebUiUrl,
    masterEndpoint.address,
    resources))

二、Master 处理 RegisterWorker 消息

在 Master 类中搜索 case RegisterWorker

首先创建一个 WorkerInfo 对象

代码语言:javascript
复制
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl, workerResources)

然后注册这个 Worker :

代码语言:javascript
复制
registerWorker(worker)

注册完了之后,使用持久化引擎,保存到 zk 中:

代码语言:javascript
复制
persistenceEngine.addWorker(worker)

然后给 Worker 发送一个注册成功的消息:

代码语言:javascript
复制
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, false))

最后调用了 schedule() 方法,这个方法非常重要

在这个方法中,遍历了两个数据结构,waitingDrivers,waitingApps

waitingDrivers 是在 Driver 往 Master 注册时,会加入进来;waitingApps 是 Driver 提交任务时,也会加入到这个结构中来。

遍历 waitingDrivers,就会给 Worker 发送 LaunchDriver 消息,来启动 Driver;

遍历 waitingApps,会计算 App 使用的资源,并且在 Worker 上启动对应资源的 Executor

这个过程,在 提交任务的时候,还会详细的讲解。

三、Worker 开始处理 Master 发送回来的注册成功消息

在 Worker 类中搜索:case RegisteredWorker

首先,把 Active Master 的地址等信息,放到自己的内存中

代码语言:javascript
复制
changeMaster(masterRef, masterWebUiUrl, masterAddress)

然后开始定时给自己发送心跳,然后再处理这个心跳消息,发送给 Master

四、Master 处理 Worker 的心跳消息

Master 收到 Hearbeat 消息后,开始处理 Worker 的心跳消息

首先看,Worker 有没有注册过,如果注册过,则更新心跳时间

如果没有注册过,则给 Worker 发送 ReconnectWorker 消息,要求 Worker 重新注册上来

五、总结

今天我们浏览了 Worker 启动的源码,Worker 启动时,一方面创建了工作目录,启动了 Shuffle 服务,启动了 WebUi;另一方面,向 Master 注册自己,Master 则把 Worker 的信息放到自己的内存中维护起来;同时,Worker 开始周期性发送心跳给 Master。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Worker 启动
  • 二、Master 处理 RegisterWorker 消息
  • 三、Worker 开始处理 Master 发送回来的注册成功消息
  • 四、Master 处理 Worker 的心跳消息
  • 五、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档