前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Kubernetes 的源码分析系列 - scheduler

Spark Kubernetes 的源码分析系列 - scheduler

作者头像
runzhliu
发布2020-08-06 09:49:49
1K0
发布2020-08-06 09:49:49
举报
文章被收录于专栏:容器计算容器计算

1 Overview

这一块代码可以理解为 Spark 是如何实现一个基于 K8S 的调度器,来调度生成 Executor Pod 的。

2 分析

代码语言:javascript
复制
/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler
└── cluster
    └── k8s
        ├── ExecutorPodStates.scala
        ├── ExecutorPodsAllocator.scala
        ├── ExecutorPodsLifecycleManager.scala
        ├── ExecutorPodsPollingSnapshotSource.scala
        ├── ExecutorPodsSnapshot.scala
        ├── ExecutorPodsSnapshotsStore.scala
        ├── ExecutorPodsSnapshotsStoreImpl.scala
        ├── ExecutorPodsWatchSnapshotSource.scala
        ├── KubernetesClusterManager.scala
        ├── KubernetesClusterSchedulerBackend.scala
        └── KubernetesExecutorBuilder.scala

2 directories, 11 files

2.1 KubernetesExecutorBuilder

由于上篇文章主要介绍了 Driver 的 Pod 是如何生成的,在讲 scheduler 之前,先补充一下 Executor 的配置步骤。重点代码在下面这个 features 里。步骤跟 Driver 类似,但是少了一些,剩下的就是 一个 Basic 的配置,当然是包含 Pod 或者 Container 的一些 meta 信息。此外,跟 ApiServer 交互请求 Executor Pod 的时候也需要 K8S 的安全认证的机制。然后就是类似 Env 和本地目录挂载的一些配置。

代码语言:javascript
复制
val features = Seq(
  new BasicExecutorFeatureStep(conf, secMgr),
  new ExecutorKubernetesCredentialsFeatureStep(conf),
  new MountSecretsFeatureStep(conf),
  new EnvSecretsFeatureStep(conf),
  new LocalDirsFeatureStep(conf),
  new MountVolumesFeatureStep(conf))

2.2 KubernetesClusterManager

这个是 Spark 这一段,关于 K8S 集群作为 resource manager 的一个管理中心。这个类是继承了 ExternalClusterManager 接口的,主要是控制生成 schedulerBackend 对象。

2.3 KubernetesClusterSchedulerBackend

这是 K8S 集群调度器的封装,SchedulerBackend,简称 SB 就好了…SB 主要是包含了申请 request 和删除 remove Executor 的逻辑。

代码语言:javascript
复制
// 这里是指定初始申请的 Executor 的数量,可以通过 conf 来配置
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
代码语言:javascript
复制
// 这个是 Executor 出问题 debug 的关键
// 默认情况下 Executor 退出后,会由 Spark 的 K8S 客户端主动进行删除
// 所以 Executor 的日志就找不到了
// 开启这个配置 spark.kubernetes.executor.deleteOnTermination
// 这样 Executor 即时 Failed 了,他的 Pod 也不会被自动删除
private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)

// 移除 Executor 的逻辑,上面说到的 Pod 被删除就是这里的 delete 导致的
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
  logInfo("do send request to kill executors!")
  kubernetesClient
    .pods()
    .withLabel(SPARK_APP_ID_LABEL, applicationId())
    .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
    .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
    .delete()
  // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
}

2.4 ExecutorPodsSnapshotsStore

这个接口是用于管理 Executor Pod,下面简称 EP…EP 的状态,并且用 ExecutorPodsSnapshot 的数据结构来记录变化的情况。

2.5 ExecutorPodsSnapshot

ExecutorPodsSnapshot 是关于 Spark App 在集群里 EP 的状态的不可变视图。

代码语言:javascript
复制
private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) {

  import ExecutorPodsSnapshot._
  
  // 核心方法,witUpdate 通过传入 Pod 参数,通过 new 一个 EP snapshot 视图来记录 EP 的状态,本质上一个 Map(Executor id -> Executor Pod 状态) 的数据结构
  def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
    val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
    new ExecutorPodsSnapshot(newExecutorPods)
  }
}

2.6 ExecutorPodsSnapshotsStoreImpl

这是 ExecutorPodsSnapshotsStore 的实现类。下面一段是理解整个 scheduler 的关键,所以建议拿着英文注释认真看一遍,大概就能理解了。

Controls the propagation of the Spark application’s executor pods state to subscribers that react to that state. Roughly follows a producer-consumer model. Producers report states of executor pods, and these states are then published to consumers that can perform any actions in response to these states. Producers push updates in one of two ways. An incremental update sent by updatePod() represents a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that the passed pods are all of the most up to date states of all executor pods for the application. The combination of the states of all executor pods for the application is collectively known as a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that most recent snapshot - either by incrementally updating the snapshot with a single new pod state, or by replacing the snapshot entirely on a full sync. Consumers, or subscribers, register that they want to be informed about all snapshots of the executor pods. Every time the store replaces its most up to date snapshot from either an incremental update or a full sync, the most recent snapshot after the update is posted to the subscriber’s buffer. Subscribers receive blocks of snapshots produced by the producers in time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different time intervals.

以上就是他的设计思想,简单来说就是依照生产消费者模式,订阅者订阅的是 EP 的状态,而这个状态是上文提到的 Snapshot。

代码语言:javascript
复制
SNAPSHOT_LOCK // 锁
subscribers // 订阅者
pollingTasks // ?
currentSnapshot // 当前的 Snapshot

2.7 ExecutorPodsWatchSnapshotSource

这里面主要是继承 K8S 客户端的一个 Wathcher 监听器,主要监听 Pod 的事件。因为 EP 被增删改出错,等都需要被 SB 感知。

代码语言:javascript
复制
enum Action {
  ADDED, MODIFIED, DELETED, ERROR
}

2.8 ExecutorPodsPollingSnapshotSource

这个是通过 K8S client 轮询 ApiServer 获取 Pod 状态并且保存到 Snapshot 里的过程。

代码语言:javascript
复制
private class PollRunnable(applicationId: String) extends Runnable {

  override def run(): Unit = Utils.tryLogNonFatalError {
    logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
    // 核心方法,将得到的 Pod 的状态,通过 replaceSnapshot 来记录
    snapshotsStore.replaceSnapshot(kubernetesClient
      .pods()
      .withLabel(SPARK_APP_ID_LABEL, applicationId)
      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
      .list()
      .getItems
      .asScala)
  }
}

// 轮询默认是30s一次
private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)

2.9 ExecutorPodsLifecycleManager

这个就是一个 EP 生命周期的 Manager,本质上 Pod 是创建在 K8S 集群的,Driver Pod 对 EP 的管理需要通过 K8S 的 ApiServer,而当 Pod 发生状态改变了,对应的也要告知 Driver。

代码语言:javascript
复制
private def onNewSnapshots(
    schedulerBackend: KubernetesClusterSchedulerBackend,
    snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
  val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
  snapshots.foreach { snapshot =>
    snapshot.executorPods.foreach { case (execId, state) =>
      state match {
        case deleted@PodDeleted(_) =>
          logDebug(s"Snapshot reported deleted executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}")
          removeExecutorFromSpark(schedulerBackend, deleted, execId)
          execIdsRemovedInThisRound += execId
        case failed@PodFailed(_) =>
          logDebug(s"Snapshot reported failed executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}")
          onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
        case succeeded@PodSucceeded(_) =>
          logDebug(s"Snapshot reported succeeded executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
            s" unusual unless Spark specifically informed the executor to exit.")
          onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
        case _ =>
      }
    }
  }

  if (snapshots.nonEmpty) {
    val latestSnapshot = snapshots.last
    (schedulerBackend.getExecutorIds().map(_.toLong).toSet
      -- latestSnapshot.executorPods.keySet
      -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
      if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
        val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
          s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
          s" executor may have been deleted but the driver missed the deletion event."
        logDebug(exitReasonMessage)

        val exitReason = ExecutorExited(
          UNKNOWN_EXIT_CODE,
          exitCausedByApp = false,
          exitReasonMessage)
        schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason)
        execIdsRemovedInThisRound += missingExecutorId
      }
    }
  }

  if (execIdsRemovedInThisRound.nonEmpty) {
    logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
      s" from Spark that were either found to be deleted or non-existent in the cluster.")
  }
}

3 Summary

Scheduler 的粗浅分析就到这里,其实不是太难理解的,调度器的功能就是找到给 Driver 分配和在合适的时候移除 Executor,至于如何找合适的节点来跑 Executor,那是 K8S 的事情,这里是把 K8S 作为一个外部的集群模式,具体的调度工作是交给 K8S 的。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-07-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Overview
  • 2 分析
    • 2.1 KubernetesExecutorBuilder
      • 2.2 KubernetesClusterManager
        • 2.3 KubernetesClusterSchedulerBackend
          • 2.4 ExecutorPodsSnapshotsStore
            • 2.5 ExecutorPodsSnapshot
              • 2.6 ExecutorPodsSnapshotsStoreImpl
                • 2.7 ExecutorPodsWatchSnapshotSource
                  • 2.8 ExecutorPodsPollingSnapshotSource
                    • 2.9 ExecutorPodsLifecycleManager
                    • 3 Summary
                    相关产品与服务
                    容器服务
                    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档