前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark源码之Standalone模式下master持久化引擎讲解

Spark源码之Standalone模式下master持久化引擎讲解

作者头像
Spark学习技巧
发布2018-01-30 17:47:36
6270
发布2018-01-30 17:47:36
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

Spark源码之Standalone模式下master持久化引擎讲解

Standalone 模式下Master为了保证故障恢复,会持久化一些重要的数据,来避免master故障导致集群不可用这种情况(也即单点故障)。目前,有四种持久化策略:

1,基于zookeeper的持久化引擎。

2,基于文件的持久化引擎。

3,用户自定义持久化引擎。

4,不使用持久化引擎。

在master的OnStart方法中,对应的源码如下:

代码语言:js
复制
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
 case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
 val zkFactory =
 new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
 case "FILESYSTEM" =>
 val fsFactory =
 new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
 case "CUSTOM" =>
 val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
 val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
 case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_

默认,情况下是无持久化引擎,也就是没有ha策略。Spark提供的可用的ha策略:基于文件系统的和基于zookeeper。配置方法如下:

基于文件系统:

property

Meaning

spark.deploy.recoveryMode

FILESYSTEM

spark.deploy.recoveryDirectory

用来恢复状态的目录

基于zookeeper:

property

Meaning

spark.deploy.recoveryMode

ZOOKEEPER

spark.deploy.zookeeper.url

e.g., 192.168.1.100:2181,192.168.1.101:2181

spark.deploy.zookeeper.dir

zookeeper保存恢复状态的目录

生产环境中可用的是基于zookeeper的持久化引擎。

基于zookeeper持久化策略,会允许我们同时运行多个master,然后支持leader选举,最终是一个leader,其余是standby。

Spark的Master的leader选举实现

Spark源码里面使用的是CuratorFramework,跟zookeeper交流。该框架有以下特点:

1,自动连接管理:自动处理zookeeper的连接和重试存在一些潜在的问题;可以watch NodeDataChanged event和获取updateServerList;Watches可以自动被Cruator recipes删除;

2,更加简洁的API:简化raw zookeeper方法,事件等;提供现代流式API接口

3,Recipe实现:leader选举,分布式锁,path缓存,和watcher,分布式队列,Barriers等。

Spark源码里面使用了LeaderLatch实现选举功能。这个实现实际是基于zookeeper的节点类型来做,zookeeper有四种节点类型:

1,持久节点(PERSISTENT)

节点创建后,会一直存在,不会因客户端会话失效而删除;

2,持久顺序节点(PERSISTENT_SEQUENTIAL)

基本特性与持久节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;

3,临时节点(EPHEMERAL)

客户端会话失效或连接关闭后,该节点会被自动删除,且不能再临时节点下面创建子节点。

4,临时顺序节点(EPHEMERAL_SEQUENTIAL)

基本特性与临时节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;

LeaderLatch实现leader选举实际上基于临时顺序节点来做的。

Spark源码里面基于zookeeper的leader选举具体实现过程源码如下:

在master的OnStart方法里面

leaderElectionAgent = leaderElectionAgent_

实际是在构建zookeeper的持久化引擎的时候,构建的

(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))

在createLeaderElectionAgent方法里面构建了

new ZooKeeperLeaderElectionAgent(master, conf)

该对象,继承了LeaderLatchListener,并且覆盖了notLeader和isLeader两个重要的方法具体。

在ZooKeeperLeaderElectionAgent构建的时候调用了自己的start方法,该方法构建了LeaderLatch,并添加ZooKeeperLeaderElectionAgent作为其listener。

代码语言:js
复制
private def start() {
  logInfo("Starting ZooKeeper LeaderElection agent")
 zk = SparkCuratorUtil.newClient(conf)
 leaderLatch = new LeaderLatch(zk, WORKING_DIR)
 leaderLatch.addListener(this)
 leaderLatch.start()
}

Leader选举在zookeeper的临时节点的路径为

val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"

执行ZooKeeperLeaderElectionAgent对象的start方法之后,每当该对象所在的master由standby变为Leader的时候,会调用isLeader()方法。由Leader变为StandBy的时候会调用notLeader()。我们就可以在这两个方法里实现自己要的状态切换的相关操作。

代码语言:js
复制
override def isLeader() {
  synchronized {
 // could have lost leadership by now.
 if (!leaderLatch.hasLeadership) {
 return
 }

    logInfo("We have gained leadership")
    updateLeadershipStatus(true)
  }
}

override def notLeader() {
  synchronized {
 // could have gained leadership by now.
 if (leaderLatch.hasLeadership) {
 return
 }

    logInfo("We have lost leadership")
    updateLeadershipStatus(false)
  }
}

要实现,我们自己应用的ha,也可基于此方法。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-06-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档