前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Kubernetes 的源码分析系列 - features

Spark Kubernetes 的源码分析系列 - features

作者头像
runzhliu
发布2020-08-06 09:49:25
8770
发布2020-08-06 09:49:25
举报
文章被收录于专栏:容器计算容器计算

1 Overview

features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。

2 分析

看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。

代码语言:javascript
复制
/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features
├── BasicDriverFeatureStep.scala
├── BasicExecutorFeatureStep.scala
├── DriverCommandFeatureStep.scala
├── DriverKubernetesCredentialsFeatureStep.scala
├── DriverServiceFeatureStep.scala
├── EnvSecretsFeatureStep.scala
├── ExecutorKubernetesCredentialsFeatureStep.scala
├── HadoopConfDriverFeatureStep.scala
├── KerberosConfDriverFeatureStep.scala
├── KubernetesFeatureConfigStep.scala
├── LocalDirsFeatureStep.scala
├── MountSecretsFeatureStep.scala
├── MountVolumesFeatureStep.scala
└── PodTemplateConfigMapStep.scala

还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到的,在 KubernetesDriverBuilder 中,有一个 features 这个变量,这里需要 new 很多配置,也就是具体的用来配置 Pod 的一些步骤。

代码语言:javascript
复制
val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))

下面我们按照顺序来分析一下。

2.1 BasicDriverFeatureStep

类名就告诉我们,他是干嘛用的了,就是 Driver Feature 相对 Basic 的部分 feature,那么 Baisc 的 feature 包括什么呢?

代码语言:javascript
复制
driverPodName // Driver Pod 的名字
driverContainerImage // Driver Container 
driverCpuCores // Driver 需要的 Cpu Cores
driverCoresRequest // Driver 的 Request Cpu Cores(K8S相关)
driverLimitCores // Driver 的 Limit Cpu Cores(K8S相关)
driverMemoryMiB // Driver 的内存 MiB
overheadFactor // 这个稍后会讲到
memoryOverheadMiB // 这个稍后会讲到
driverMemoryWithOverheadMiB // 这个稍后会讲到

以上的参数,生成后,主要是用于配置 Pod 和 Container 的参数。这一块内容又长又臭,可以看看我写在里面的注释。

代码语言:javascript
复制
# 一堆的 Builder
val driverContainer = new ContainerBuilder(pod.container)
  # Container Name
  .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
  # Image Name
  .withImage(driverContainerImage)
  # Image 拉取的策略
  .withImagePullPolicy(conf.imagePullPolicy)
  # Driver 的端口
  .addNewPort()
    .withName(DRIVER_PORT_NAME)
    .withContainerPort(driverPort)
    .withProtocol("TCP")
    .endPort()
  .addNewPort()
    # Block Manager 的 Port 相关配置
    .withName(BLOCK_MANAGER_PORT_NAME)
    .withContainerPort(driverBlockManagerPort)
    .withProtocol("TCP")
    .endPort()
  .addNewPort()
     # Spark UI 的端口配置
    .withName(UI_PORT_NAME)
    .withContainerPort(driverUIPort)
    .withProtocol("TCP")
    .endPort()
  .addNewEnv()
    # 一些环境变量
    .withName(ENV_SPARK_USER)
    .withValue(Utils.getCurrentUserName())
    .endEnv()
  .addAllToEnv(driverCustomEnvs.asJava)
  .addNewEnv()
    .withName(ENV_DRIVER_BIND_ADDRESS)
    .withValueFrom(new EnvVarSourceBuilder()
      .withNewFieldRef("v1", "status.podIP")
      .build())
    .endEnv()
  .editOrNewResources()
     # cpu 相关配置
    .addToRequests("cpu", driverCpuQuantity)
    .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
    .addToRequests("memory", driverMemoryQuantity)
    .addToLimits("memory", driverMemoryQuantity)
    .addToLimits(driverResourceQuantities.asJava)
    .endResources()
    # 终于 build 完
  .build()

val driverPod = new PodBuilder(pod.pod)
   # 如果 Pod 是存在的,表示要么修改,否则就是新增
  .editOrNewMetadata()
    # Pod 的名字
    .withName(driverPodName)
    # Pod 的 Label
    .addToLabels(conf.labels.asJava)
    .addToAnnotations(conf.annotations.asJava)
    .endMetadata()
  .editOrNewSpec()
    # Pod 的重启策略
    .withRestartPolicy("Never")
    # Pod 的 NodeSelector 特性
    .addToNodeSelector(conf.nodeSelector.asJava)
    # 拉取镜像的 Repository 密码(ru
    .addToImagePullSecrets(conf.imagePullSecrets: _*)
    .endSpec()
  .build()

此外 getAdditionalPodSystemProperties() 还需要这个方法是拉取其他的配置,比如说 spark.app.id 等等,不赘述了。

2.2 DriverKubernetesCredentialsFeatureStep

这个 Step 是用于配置 Driver 的安全认证相关的配置,一般认为就是 K8S 那一套安全认证的机制了。

代码语言:javascript
复制
maybeMountedOAuthTokenFile // OAuthToken 文件
maybeMountedClientKeyFile // Client Key 文件
maybeMountedClientCertFile // Cient Cert 文件
maybeMountedCaCertFile // Ca Cert 文件
driverServiceAccount // Driver 的 Service Account
oauthTokenBase64 // OauthToken Base64 编码
caCertDataBase64 // CaCert 里面的数据 Base64 编码
clientKeyDataBase64 // Client Key 数据的 Base64 编码
clientCertDataBase64 // Client Cert 数据的 Base 64 编码
shouldMountSecret // 是否需要挂载 Secret
driverCredentialsSecretName // Driver 的认证 Secret 名

这里有很多关于访问 ApiServer 的安全认证的细节,如果不熟悉 K8S 的同学,需要补补课。下面是这个 Step 的关键方法,也就是把这些安全相关的文件通过 secret 保存下来。

代码语言:javascript
复制
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
  // 如果 conf 存在以上提及的一些认证文件,则会进行挂载 Secret
  if (shouldMountSecret) {
    Seq(createCredentialsSecret())
  } else {
    Seq.empty
  }
}

2.3 DriverServiceFeatureStep

这个就是配置 Driver Service 的 Step,因为 Pod 在 K8S 集群里,创建 Executor 需要不同的 Executor Pod 访问到 Driver Pod,才能注册上,也包括 Block Manager 以及 Spark UI 的端口和服务负载配置。

代码语言:javascript
复制
preferredServiceName // Service Name
resolvedServiceName // 上面的 Service Name 超过63个字符的话需要重新配置
driverPort // Driver 的端口
driverBlockManagerPort // Block Manager 的端口
driverUIPort // Spark UI 的端口

上面的 Service Name 超过63个字符的话需要重新配置。

代码语言:javascript
复制
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
  preferredServiceName
} else {
  // 超过63个字符,就是需要系统内部重置这个名字了
  val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
  logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
    s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
    s"$shorterServiceName as the driver service's name.")
  shorterServiceName
}

2.4 MountSecretsFeatureStep

2.5 EnvSecretsFeatureStep

2.6 LocalDirsFeatureStep

代码语言:javascript
复制
resolvedLocalDirs // 本地目录
useLocalDirTmpFs // 如果 conf 配置为 true,则表示本地目录会用其他的存储系统,例如内存,具体请看 spark.kubernetes.local.dirs.tmpfs

2.7 MountVolumesFeatureStep

2.8 DriverCommandFeatureStep

这是关于 Driver 命令行的一些配置,具体看看注释是怎么解释的。

代码语言:javascript
复制
/**
 * Creates the driver command for running the user app, and propagates needed configuration so
 * executors can also find the app code.
 */

2.9 HadoopConfDriverFeatureStep

这是用于挂载 Hadoop 配置文件的 Step,例如访问 HDFS 的时候,需要 core-site.x ml,hdfs-site.xml 等等。

代码语言:javascript
复制
confDir // Hadoop 相关的环境变量 HADOOP_CONF_DIR
existingConfMap // spark.kubernetes.hadoop.configMapName 提交任务的 configMap 名字,这些可以提前生成,直接挂载
confFiles // 配置文件

然后具体看看 Hadoop 的配置文件是如何通过 configMap 挂载到 Driver Pod 上的。

代码语言:javascript
复制
  override def configurePod(original: SparkPod): SparkPod = {

    original.transform { case pod if hasHadoopConf =>

      // 如果有环境变量,就从环境变量指定的路径获取
      val confVolume = if (confDir.isDefined) {
        val keyPaths = confFiles.map { file =>
          new KeyToPathBuilder()
            .withKey(file.getName())
            .withPath(file.getName())
            .build()
        }
        new VolumeBuilder()
          .withName(HADOOP_CONF_VOLUME)
          .withNewConfigMap()
            .withName(newConfigMapName)
            .withItems(keyPaths.asJava)
            .endConfigMap()
          .build()
      } else {
        // 没有环境变量的话,就直接用存在的 configMap
        new VolumeBuilder()
          .withName(HADOOP_CONF_VOLUME)
          .withNewConfigMap()
            .withName(existingConfMap.get)
            .endConfigMap()
          .build()
      }

      // 修改 Pod,通过 editSpec 方法
      val podWithConf = new PodBuilder(pod.pod)
        .editSpec()
          .addNewVolumeLike(confVolume)
            .endVolume()
          .endSpec()
          .build()

      // Container Mount 需要的 Volume
      val containerWithMount = new ContainerBuilder(pod.container)
        .addNewVolumeMount()
          .withName(HADOOP_CONF_VOLUME)
          .withMountPath(HADOOP_CONF_DIR_PATH)
          .endVolumeMount()
        .addNewEnv()
          .withName(ENV_HADOOP_CONF_DIR)
          .withValue(HADOOP_CONF_DIR_PATH)
          .endEnv()
        .build()

      SparkPod(podWithConf, containerWithMount)
    }
  }

2.10 KerberosConfDriverFeatureStep

这是关于 Kerberos 配置的 Step。

代码语言:javascript
复制
/**
 * Provide kerberos / service credentials to the Spark driver.
 *
 * There are three use cases, in order of precedence:
 * Kerberos 的服务,有三种场景
 *
 * - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will
 *   manage the kerberos login and the creation of delegation tokens.
 * - existing tokens: if a secret containing delegation tokens is provided, it will be mounted
 *   on the driver pod, and the driver will handle distribution of those tokens to executors.
 * - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation
 *   tokens which will be provided to the driver. The driver will handle distribution of the
 *   tokens to executors.
 */
代码语言:javascript
复制
principal // 指的是 KDC 中账号的 Principal
keytab // 指的是 Kerberos 生成的 Keytab
existingSecretName  // 存在的 secret name
existingSecretItemKey // secret 中的 item key
krb5File // Kerberos 服务的配置文件
krb5CMap // krb5 的 configMap
hadoopConf // 多余?
delegationTokens // Hadoop 体系中的轻量级认证 DT

生成 token 的关键代码如下。

代码语言:javascript
复制
private lazy val delegationTokens: Array[Byte] = {
  // 如果 keytab 和 secret 都是空的,就去生成 DT
  if (keytab.isEmpty && existingSecretName.isEmpty) {
    val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,
      SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)
    val creds = UserGroupInformation.getCurrentUser().getCredentials()
    tokenManager.obtainDelegationTokens(creds)
    // If no tokens and no secrets are stored in the credentials, make sure nothing is returned,
    // to avoid creating an unnecessary secret.
    if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
      SparkHadoopUtil.get.serialize(creds)
    } else {
      null
    }
  } else {
    null
  }
}

2.11 PodTemplateConfigMapStep

可以指定 Executor 的 Pod 的模板 spark.kubernetes.executor.podTemplateFile。所以这个 Step 主要就是来解析这个 Pod Template 的。

3 Summary

可以看到 Driver 的构建是通过多个 feature 的配置来组装起来的,最终都会通过 K8S 的 Java 客户端来跟 ApiServer 交互来在 K8S 集群中生成 Driver。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-07-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Overview
  • 2 分析
    • 2.1 BasicDriverFeatureStep
      • 2.2 DriverKubernetesCredentialsFeatureStep
        • 2.3 DriverServiceFeatureStep
          • 2.4 MountSecretsFeatureStep
            • 2.5 EnvSecretsFeatureStep
              • 2.6 LocalDirsFeatureStep
                • 2.7 MountVolumesFeatureStep
                  • 2.8 DriverCommandFeatureStep
                    • 2.9 HadoopConfDriverFeatureStep
                      • 2.10 KerberosConfDriverFeatureStep
                        • 2.11 PodTemplateConfigMapStep
                        • 3 Summary
                        相关产品与服务
                        容器服务
                        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档