前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 源码(3) - Master 启动之持久化引擎和选举代理

Spark 源码(3) - Master 启动之持久化引擎和选举代理

作者头像
kk大数据
发布2021-10-12 12:48:31
3430
发布2021-10-12 12:48:31
举报
文章被收录于专栏:kk大数据kk大数据

一、Master 启动

上回讲到,Master 的 main 方法中,创建了 RpcEnv 和 Master 的 Endpoint,紧接着就开始执行 Endpoint 的生命周期方法 onStart() 方法,今天就从这里开始。

首先创建了 Master 的 Ui,也就是我们在浏览器上看到的 Master 信息:

紧接着,定时给自己发送 CheckForWorkerTimeOut 消息:

在 Master 类中搜索 case CheckForWorkerTimeOut,可看到如下逻辑:

然后,开始创建持久化引擎和选举代理

什么是 持久化引擎,如果 Master 需要主备,并且使用 Zookeeper 作为主备信息的存储,则需要创建一个读写 Zookeeper 的组件,就称之为持久化引擎。当前这是一种抽象,具体实现可以有多种方式,可以是 Zookeeper,也可以本地文件系统,也可以是自定义的。

二、持久化引擎的创建

new 了这个对象,我们去看它的构造方法:

代码语言:javascript
复制
new ZooKeeperPersistenceEngine(conf, serializer)

首先从配置中获取存储到 Zookeeper 的主目录:

代码语言:javascript
复制
private val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/master_status"

然后创建了一个 Zookeeper 的客户端(这个类就不往下点了,往下就是用 Curator 框架创建了一个 Zookeeper 客户端):

代码语言:javascript
复制
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

然后初始化了工作目录

代码语言:javascript
复制
SparkCuratorUtil.mkdir(zk, workingDir)

持久化引擎提供了读写 Zookeeper 的方法

这个方法底层就是 Curator 框架的 Api :

那么这就是持久化引擎的逻辑了,也没什么。

三、选举代理的创建

选举代理,就是提供了一种组件,来实现主备 Master 的选举,它使用了 Curator 框架提供的 LeaderLatch 来实现的

在 ZooKeeperLeaderElectionAgent 的构造器中,调用了 start() 方法

start() 方法中

先拿到了一个 zk 对象,然后创建了一个分布式锁:LeaderLatch,并且注册了一个监听,最后启动。

有个值得注意的点是,如果哪个节点选举成功了,监听就会回调 isLeader() 方法,没有注册成功,则回调 notLeader 方法。

在 isLeader() 方法中,调用了这个方法:

代码语言:javascript
复制
updateLeadershipStatus(true)

这个方法中,做了两件事,一个是更改了当前节点的 status 状态为 Leader

代码语言:javascript
复制
status = LeadershipStatus.LEADER

第二件事是做了选举之后的一些逻辑,点进去可以看到,给自己发送了一个 ElectedLeader 消息:

代码语言:javascript
复制
self.send(ElectedLeader)

那我们需要在 Master 类中搜索 case ElectedLeader. 看一下是如何处理这个消息的

这里需要考虑一种情况,比如上一个 Alive 状态的 Master 刚挂了,当前 Standby 的 Master 选举成为主节点,那需要从 Zookeeper 中恢复集群的一些数据到自己的内存中。所以,需要先从 Zookeeper 中拿到所有 Application、Driver、Worker 信息:

代码语言:javascript
复制
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)

然后开始执行 beginRecovery() 方法,这个方法就是把从 Zookeeper 的各种数据,放到 Master 的各种内存里面:

数据都恢复完成之后,给自己发送一个 CompleteRecovery 消息

CompleteRecovery 消息主要是最后检查一下刚刚恢复出来数据的准确性,此处不细看。

至此,Master 就启动完成了。

四、总结

本次我们主要阅读了 Master 启动时,如何初始化持久化引擎以及选举代理,选举成功之后,都做了些什么事情,对于 Master 启动过程有了初步的了解。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Master 启动
  • 二、持久化引擎的创建
    • 三、选举代理的创建
      • 四、总结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档