专栏首页容器计算Spark Kubernetes 的源码分析系列 - submit

Spark Kubernetes 的源码分析系列 - submit

1 Overview

Kubernetes 是作为新的 resouceManager 集成到 Spark 中的,集成的思路跟将 YARN 集成是类似的,Spark 本身提供 Standalone 这种资源管理的模式,当然是不够的。

而集成 Kubernetes 的方式,其实是很好理解的,也就是在 Spark 中起一个 Http 的客户端从而和 Kubernetes 的 ApiSever 进行通信,从而把与 Appication 相关的一些配置,例如如何创建 Driver 和 Executor 的 Pod,当然也包括对 Pod 的 Watch 相关。

2 源码分析

Spark Kubernetes 的模块的代码其实并不多,建议大家到以下目录下利用 tree 简单看一下。

# 路径
path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark
➜  spark git:(master) ✗ tree -d -L 3
.
├── deploy
│   └── k8s
│       ├── features // 包括 Driver/Executor, configMap, secret 等配置的步骤
│       └── submit // 跟 submit 有关
└── scheduler
    └── cluster
        └── k8s // 跟 executor pod 的调度,状态等有关

代码结构还是很清晰的,一部分是与 deploy 有关,一部分是跟 scheduler 有关。

本文重点解析以下 submit 相关的代码。

/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit
├── K8sSubmitOps.scala // spark submit 相关
├── KubernetesClientApplication.scala // spark submit 的封装
├── KubernetesDriverBuilder.scala // Driver builder
├── LoggingPodStatusWatcher.scala // Spark Pod 的状态 Watcher
└── MainAppResource.scala // 包含 Java/Python/R 的一些资源定义

然后看一下 Spark K8S 模式的入口类。

private[spark] class KubernetesClientApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    val parsedArguments = ClientArguments.fromCommandLineArgs(args)
    run(parsedArguments, conf)
  }

  private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
    val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
    val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
    val kubernetesConf = KubernetesConf.createDriverConf(
      sparkConf,
      kubernetesAppId,
      clientArguments.mainAppResource,
      clientArguments.mainClass,
      clientArguments.driverArgs)
    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
    val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

    val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)

    Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
      master,
      Some(kubernetesConf.namespace),
      KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
      SparkKubernetesClientFactory.ClientType.Submission,
      sparkConf, None, None)) { kubernetesClient =>
        val client = new Client(
          kubernetesConf,
          new KubernetesDriverBuilder(),
          kubernetesClient,
          waitForAppCompletion,
          watcher)
        client.run()
    }
  }
}

这一段是 Spark K8S 的入口 Main Class,重点关注 run() 方法。首先生成一个 kubernetesAppId,为什么不是 spark app name,原因是这个关于 App 的标识,会以 Label 的方式,标注在关于这个 App 的所有资源上,包括 Driver/Executor Pod,ConfigMap,Secret 等。

可以留意一下 WAIT_FOR_APP_COMPLETION 这个配置,默认值为 true。表示当选择 cluster mode 的时候,laucher 进程是否会等待 App 结束后才会退出,如果改为 false,则 laucher 进程会马上结束。

private[spark] class Client(
    conf: KubernetesDriverConf, builder: KubernetesDriverBuilder,
    kubernetesClient: KubernetesClient, waitForAppCompletion: Boolean,
    watcher: LoggingPodStatusWatcher) extends Logging {
  def run(): Unit = {
    // driver Pod 的配置是这里来的
    val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)

    // configmap 从这里来的
    val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
    val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)

    // Driver 容器的配置
    val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container).addNewEnv()
        .withName(ENV_SPARK_CONF_DIR)
        .withValue(SPARK_CONF_DIR_INTERNAL)
        .endEnv()
      .addNewVolumeMount()
        .withName(SPARK_CONF_VOLUME)
        .withMountPath(SPARK_CONF_DIR_INTERNAL)
        .endVolumeMount()
      .build()

    // Driver Pod 的配置
    val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
      .editSpec()
        .addToContainers(resolvedDriverContainer)
        .addNewVolume()
          .withName(SPARK_CONF_VOLUME)
          .withNewConfigMap()
            .withName(configMapName)
            .endConfigMap()
          .endVolume()
        .endSpec()
      .build()

    Utils.tryWithResource(
      kubernetesClient
        .pods()
        .withName(resolvedDriverPod.getMetadata.getName)
        .watch(watcher)) { _ =>
      val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)

      try {
        // including configMap
        val otherKubernetesResources =
          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
      } catch {
        case NonFatal(e) =>
          kubernetesClient.pods().delete(createdDriverPod)
          throw e
      }

      val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
        s"${resolvedDriverPod.getMetadata.getName}"
      if (waitForAppCompletion) {
        logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...")
        // watcher
        watcher.awaitCompletion()
        logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.")
      } else {
        logInfo(s"Deployed Spark application ${conf.appName} with " +
          s"submission ID ${sId} into Kubernetes.")
      }
    }
  }

  // K8S 的特性,其他的包括 configmap secret executor-pod 这些,如果 driver 挂了,其他都要删除干净
  private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
    val driverPodOwnerReference = new OwnerReferenceBuilder()
      .withName(driverPod.getMetadata.getName)
      .withApiVersion(driverPod.getApiVersion)
      .withUid(driverPod.getMetadata.getUid)
      .withKind(driverPod.getKind)
      .withController(true)
      .build()
    // 给每个 resource 都给个名字 
    resources.foreach { resource =>
      val originalMetadata = resource.getMetadata
      originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
    }
  }

  // 创建 ConfigMap,主要是得到像 Hadoop/Spark Conf 之类的配置信息
  private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
    // Java 的 Prop?
    val properties = new Properties()

    conf.foreach { case (k, v) =>
      properties.setProperty(k, v)
    }
    val propertiesWriter = new StringWriter()
    properties.store(propertiesWriter,
      s"Java properties built from Kubernetes config map with name: $configMapName")
    new ConfigMapBuilder()
      .withNewMetadata()
        .withName(configMapName)
        .endMetadata()
      .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
      .build()
  }
}

然后看看 Driver Pod 都需要配置些什么东西。

private[spark] class KubernetesDriverBuilder {

  def buildFromFeatures(
      conf: KubernetesDriverConf,
      client: KubernetesClient): KubernetesDriverSpec = {

    val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
      .map { file =>
        // Spark 3.0 开始可以支持传 Pod 的 Template 文件,而且 Template 最后会覆盖之前的配置,Priority 最高
        KubernetesUtils.loadPodFromTemplate(
          client,
          new File(file),
          conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
      }
      .getOrElse(SparkPod.initialPod())

    // 重点关注,这里是配置 Pod 的步骤清单
    val features: Seq[KubernetesFeatureConfigStep] = Seq(
      new BasicDriverFeatureStep(conf),
      new DriverKubernetesCredentialsFeatureStep(conf),
      new DriverServiceFeatureStep(conf),
      new MountSecretsFeatureStep(conf),
      new EnvSecretsFeatureStep(conf),
      new LocalDirsFeatureStep(conf),
      new MountVolumesFeatureStep(conf),
      new DriverCommandFeatureStep(conf),
      new HadoopConfDriverFeatureStep(conf),
      new KerberosConfDriverFeatureStep(conf),
      new PodTemplateConfigMapStep(conf))

    val spec = KubernetesDriverSpec(
      initialPod,
      driverKubernetesResources = Seq.empty,
      conf.sparkConf.getAll.toMap)

    features.foldLeft(spec) { case (spec, feature) =>
      val configuredPod = feature.configurePod(spec.pod)
      val addedSystemProperties = feature.getAdditionalPodSystemProperties()
      val addedResources = feature.getAdditionalKubernetesResources()

      KubernetesDriverSpec(
        configuredPod,
        spec.driverKubernetesResources ++ addedResources,
        spec.systemProperties ++ addedSystemProperties)
    }
  }
}

然后看一下 Pod 状态的监听器。原理是创建一个 scheduler 后台线程池,按照配置的时间间隔,去监听 Pod 的状态。

private[k8s] class LoggingPodStatusWatcherImpl(
  appId: String,
  maybeLoggingInterval: Option[Long]) extends LoggingPodStatusWatcher with Logging {

  private val podCompletedFuture = new CountDownLatch(1)
  // start timer for periodic logging
  private val scheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
  private val logRunnable: Runnable = () => logShortStatus()

  private var pod = Option.empty[Pod]

  private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")

  def start(): Unit = {
    maybeLoggingInterval.foreach { interval =>
      scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
    }
  }

  // 当异步接受到事件的时候,判断 Pod 是已经 Succeeded 还是 Failed
  override def eventReceived(action: Action, pod: Pod): Unit = {
    this.pod = Option(pod)
    action match {
      case Action.DELETED | Action.ERROR =>
        closeWatch()

      case _ =>
        logLongStatus()
        if (hasCompleted()) {
          closeWatch()
        }
    }
  }

最后,看看一些新的特性,比如说可以用 spark submit 来 kill 掉整个 APP。

spark-submit --kill dbyin:spark-hdfs-* --master k8s://https://kubernetes.default.svc --conf spark.kubernetes.namespace=dbyin
private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
  with CommandLineLoggingUtils {

  private def isGlob(name: String): Boolean = {
    name.last == '*'
  }

  def execute(submissionId: String, sparkConf: SparkConf, op: K8sSubmitOp): Unit = {
    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
    submissionId.split(":", 2) match {
      case Array(part1, part2@_*) =>
        val namespace = if (part2.isEmpty) None else Some(part1)
        val pName = if (part2.isEmpty) part1 else part2.headOption.get
        Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
          master,
          namespace,
          KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
          SparkKubernetesClientFactory.ClientType.Submission,
          sparkConf,
          None,
          None)
        ) { kubernetesClient =>
          implicit val client: KubernetesClient = kubernetesClient
          if (isGlob(pName)) {
            val ops = namespace match {
              case Some(ns) =>
                kubernetesClient
                  .pods
                  .inNamespace(ns)
              case None =>
                kubernetesClient
                  .pods
            }
            val pods = ops
              .list()
              .getItems
              .asScala
              .filter { pod =>
                val meta = pod.getMetadata
                meta.getName.startsWith(pName.stripSuffix("*")) &&
                  meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
              }.toList
            op.executeOnGlob(pods, namespace, sparkConf)
          } else {
            op.executeOnPod(pName, namespace, sparkConf)
          }
        }
      case _ =>
        printErrorAndExit(s"Submission ID: {$submissionId} is invalid.")
    }
  }

  // 这是可以 kill 掉 Spark App 的方法
  override def kill(submissionId: String, conf: SparkConf): Unit = {
    printMessage(s"Submitting a request to kill submission " +
      s"${submissionId} in ${conf.get("spark.master")}. " +
      s"Grace period in secs: ${getGracePeriod(conf).getOrElse("not set.")}")
    execute(submissionId, conf, new KillApplication)
  }

  // 这是可以看 App 状态的
  override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = {
    printMessage(s"Submitting a request for the status of submission" +
      s" ${submissionId} in ${conf.get("spark.master")}.")
    execute(submissionId, conf, new ListStatus)
  }

  override def supports(master: String): Boolean = {
    master.startsWith("k8s://")
  }
}

3 Summary

至此,关于 Spark 以 K8S 作为 ResourceManager 的时候,Spark submit 的流程的过程中,是如何创建 Driver 和 Executor,还有各种 Secret 和 ConfigMap 这些资源的过程就分析到这里。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark Kubernetes 的源码分析系列 - features

    features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kin...

    runzhliu
  • 【Ceph】集群升级之好好看ceph.conf

    记录一个乌龙事件,今天给一个 v12.2.13 的测试集群升级到 v14.2.11 的时候,发现其中一个节点的 osd 进程一直报错,然后 ceph -s 也一...

    runzhliu
  • Spark 3.0.0-SNAPSHOT Access Kerberized HDFS

    Spark 2.2 on K8S 的 Fork 已经废弃近两年了,那时候的几个主力开发也全部转移到 Spark 2.3/2.4 以及即将发布的 3.0 的 on...

    runzhliu
  • 快速学习Lucene-Lucene入门程序

    实现一个文件的搜索功能,通过关键字搜索文件,凡是文件名或文件内容包括关键字的文件都需要找出来。还可以根据中文词语进行查询,并且需要支持多个条件查询。 本案例中...

    cwl_java
  • Hive 编程入门 之 Join 的失效

    为了保障整个集群的稳定性,类似非全等的 Join 是默认禁止的,原因在异常代码中给出了:

    Lenis
  • Quartz.net官方开发指南 第九课: JobStore

    JobStore负责保持对所有scheduler “工作数据”追踪,这些工作数据包括:job(任务),trigger(触发器),calendar(日历)等。为你...

    张善友
  • Leetcode 257. Binary Tree Paths

    Given a binary tree, return all root-to-leaf paths. For example, given the fol...

    triplebee
  • 二值图像分析之轮廓分析

    在先前的文章二值图像分析:案例实战(文本分离+硬币计数)中已经介绍过,什么是图像的二值化以及二值化的作用。

    fengzhizi715
  • cdrX7怎么画货车矢量图? cdr货车图标的画法

    cdr中想要绘制货车形状,我们需要使用贝赛尔、椭圆形、矩形、选择工具,属性栏,调色板填充颜色等进行操作,下面我们就来看看详细的教程。

    砸漏
  • CVE-2017-16943 Exim UAF漏洞分析

    感恩节那天,meh在Bugzilla上提交了一个exim的uaf漏洞:https://bugs.exim.org/show_bug.cgi?id=2199,这周...

    Seebug漏洞平台

扫码关注云+社区

领取腾讯云代金券