前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark on Kubernetes 动态资源分配

Spark on Kubernetes 动态资源分配

作者头像
runzhliu
修改2020-08-06 10:09:31
2.1K0
修改2020-08-06 10:09:31
举报
文章被收录于专栏:容器计算容器计算

1 Overview

本文主要讲述了 Spark on Kubernetes 的发展过程和 Dynamic Resource Allocatoin(DRA) 这个重要特性,以及与之相关的 External Shuffle Service(ESS)。

2 Spark on Kubernetes 的发展

随着近几年 Kubernetes 的火热发展,将 Spark 作业提交到 Kubernetes 集群成为了工业界讨论的热门话题。最早的尝试在 Kubernetes 集群内以 Standalone 的模式部署 Spark 集群,但在 Standalone 模式下,由于 Spark Driver 不能和 Kubernetes ApiServer 直接通信,除了很难直接利用 Kubernetes 提供的如 Namespace,Service Account,Rolling Out 等特性,多租户的场景一般需要通过部署多个 Standalone 集群,因此用户资源的隔离成为了比较棘手的问题。

image_1dm 2j7rm13ho19m61k7ta4m1t301g.png-29.4kB
image_1dm 2j7rm13ho19m61k7ta4m1t301g.png-29.4kB

Source: Standalone 模式的 Spark 集群

2016年年底,由 Kubernetes 社区发起的一个 issue,联合 Spark 社区提出的 SPIP: Support native submission of spark jobs to a kubernetes cluster ,作为 Spark on Kubernetes 的初始原型 Fork 的项目 apache-spark-on-k8s,开始将 Kubernetes 作为 Spark 的 External Cluster Manager。Spark 也在 Release 2.3 版本的时候正式支持 on Kubernetes ,但是需要注意 on Kubernetes 的模块还非常年轻,即使到目前 Spark 2.4.4,在对 Kubernetes 的支持上还是相对有限的,期待在 Spark 3.0 发布后会有个更多的提升。

image_1dm7j7n2b1anosbk1rae1c97k3op.png-487.4kB
image_1dm7j7n2b1anosbk1rae1c97k3op.png-487.4kB

下图展示过程的是 Client 请求 Kubernetes 调度并且创建 Driver Pod,然后 Driver 进程会根据 Spark 作业,再向 Kubernetes 申请创建 Executor Pod。

image_1dm2jc99a1cej13or4461p7i1lnk2a.png-101.3kB
image_1dm2jc99a1cej13or4461p7i1lnk2a.png-101.3kB

3 Dynamic Resource Allocation 动态资源申请

Dynamic Resource Allocation 是指 Spark 会根据工作负荷,动态地调整作业使用的资源。具体一点来说,当工作负荷增大,Spark 会申请更多的 Executor,当工作负荷变小,则会移除多余的 Executor。这里所指的资源,主要是指 Executor 分配的 CPU/Memory,当然也包括一个 Executor JVM 进程占用的 Disk 和 Network IO 等等,而这里所指的工作负荷是指处于 Pending 和 Running 状态的 Task。

Spark 最早是从 on Yarn 模式支持 Dynamic Resouce Allocation 的特性。至少从 Spark 1.2 开始就已经可用了。on Yarn 模式下,开启 Dynamic Resource Allocation,官方给出了配置的文档,具体可以参考 Configuration and Setup

代码语言:javascript
复制
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true

动态资源分配看着很美,因为如果没有动态资源分配,不管数据集的大小,以及过程,需要多少个 Stage,多少个 Task,都只有固定的 N 个 Executor 来工作。但是试想一下,当集群到了凌晨或者某些特定的时候,突然释放大量的资源,而没有用户使用,那么这部分资源是不能产生价值的,甚至还是一种资源浪费,那么解决这个问题的最好的方法就是动态分配资源,结合当前集群的资源以及工作的负载,来动态调整 Executor 的数量。比如一个发生在深夜的 Shuffle,本来只有10个 Executor,当时因为深夜集群资源被其他任务大量释放了,此时根据工作负载,申请到100个 Executor 来工作,如果不考虑数据本地性以及 Executor 初始化带来的时间消耗,那么100个 Executor 是可以同时并行得处理更多的任务,又或者说,同样多的任务,有了更多 Executor 来处理,每个 Executor 的处理是原来的 1/10,那么这个 Stage 就会更快地被完成。原来需要跑到第二天9点的任务,甚至有可能在凌晨3点的时候结束,并且也可以快速地释放出自己的资源。

image_1dm13a6scec41khs5071a71d049.png-1505.6kB
image_1dm13a6scec41khs5071a71d049.png-1505.6kB
image_1dm13b2q919begm1umv71qgfgm.png-1553.4kB
image_1dm13b2q919begm1umv71qgfgm.png-1553.4kB
image_1dm13e8ms15vi1sfoe6bauc1aa713.png-1022.6kB
image_1dm13e8ms15vi1sfoe6bauc1aa713.png-1022.6kB

4 External Shuffle Service

与 Dynamic Resouce Allocation 关系紧密的是 External Shuffle Service。

正常来说,Executor 退出有两种情况,一是整个 Spark 任务结束,这是属于正常结束;二是 Executor 遇到 failure,会导致异常退出。在 Dynamic Resouce Allocation 的场景下,由于 Executor 数量会根据工作负荷增加或者移除,当 Spark Job 下游 Stage 需要读取上游 Stage 的状态(一般来说是数据 ShuffleMapStage 的数据),那么由于原来的 Executor 被 如果被移除了,随着 Executor 的退出,磁盘上的临时目录也会被移除。此时相关的数据就需要通过 RDD 的血缘重新计算了,通常来说这是非常耗时。

如下图所示,Node 1 挂了,那么 Executor 1 和 Executor 2 会相继退出,当进行 Shuffle Stage 的时候,Executor 3 和 Executor 4 可能需要去拉取 Executor 1 和 Executor 2 的 Block,此时就会引起 Fetch Failure,任务会被 Block 住,出错的 Task 会被重新调度到可用的 Node 上重新执行,这也会导致上文说的,需要重新计算这部分的数据。所以 Spark 需要一个 External Shuffle Service 来管理 Shuffle 数据,External Shuffle Service 本质上是一个辅助进程,原来在读取 Shuffle 数据的时候,是每个 Executor 互相读取,现在则是直接读取 External Shuffle Service,也相当于解耦了计算和读取数据的过程。

image_1dm2mib0u1cj515l615mlvb61rqi34.png-151.1kB
image_1dm2mib0u1cj515l615mlvb61rqi34.png-151.1kB

下图展示的是在两个不同节点上的 Executor,通过 External Shuffle Service 来拉取 Shuffle 数据的过程。

image_1dm2n7ne81v5siul68e1b701lch3h.png-406.6kB
image_1dm2n7ne81v5siul68e1b701lch3h.png-406.6kB

5 Spark on Kubernetes 的动态资源申请

实际上,即使到当前 Spark 2.4.4,也还没有官方支持的 on Kubernetes 的 Dynamic Resouce Allocation。虽然在 apache-spark-on-k8s 这个 Fork 里本来有一个实现,External Shuffle Service 是一个有状态的服务,所以可以作为 DaemonSet 通过 hostPath 绑定 节点的磁盘,但是因为挂载的 Volume 来源于固定的 Host,这意味着 External Shuffle Service 的 Pod 不能在集群上随意调度。【待 Hongli 补充】

此外,Spark 社区提出另一个 SPIP: Use remote storage for persisting shuffle data。该 SPIP 主要讨论的观点如下。

  1. 每个 Node 上的 External Shuffle Service 进程如果挂了,那么 Shuffle 数据依然需要被 recompute。
  2. External Shuffle Service 必须是 Long Running 的,而且当不需要 Shuffle 的时候会导致了另一种方式的资源浪费。
  3. External Shuffle Service 的数据写在本地磁盘,这不利于一些容器环境下的 IO 隔离。

下图是 on Kubernetes 的 External Shuffle Service 在 Kubernetes 部署的原型图。可以看到,Executor Pod 和 External Shuffle Service Pod 在同一个 Node 上运行,共享 Node 上的 Disk。

image_1dm2jdn1kgkf1oih1fca1cfs1n6q2n.png-108.9kB
image_1dm2jdn1kgkf1oih1fca1cfs1n6q2n.png-108.9kB

尽管官方仍未正式支持 on Kubernetes 的 Dynamic Resouce Allocation,但是目前 Master 分支合进去了一个很有趣的 commit SPARK-27963。我们留意到,这个 commit 做的事情是可以不通过 External Shuffle Service 来开启 Dynamic Resouce Allocation。

当遇到 ShuffleMapStage 的时候,会记录下产生 shuffle 数据的 Executor Id,并且会让这些 Executor 在这个 Stage 对应的 Job 没有结束的时候不被移除,这样使得后面如果有 Stage 需要对 ShuffleMapStage 的数据进行读取的时候,相关的数据可以不需要 recompute。

如果这些 shuffle file 一直没有被其他 Stage 使用呢,那占用的 Executor 是不是很浪费?其实还可以通过设置一个 timeout 参数 spark.dynamicAllocation.shuffleTimeout,当超时的时候,不管这些 shuffle 文件还是否有对应的 Active Job,也会把这些 Executor 移除释放资源。

按照这个逻辑,当一个 Stage 需要1000个 Executor 的时候,再下一个 Stage 计算得到只需要10个 Executor,比较理想的情况是可以很快释放990个 Executor 占用的资源。这可能有点困难,因为 shuffle file 往往会被持有一段时间以确保下面的 Stage 确实不需要了才会被标记,而下一个 Stage 什么时候才需要前面的 shuffle 文件是无法确定的,所以如果马上移除990个 Executor,可能也不是一个很好的方案。

6 Summary

Spark 的 Example 程序中的 SparkPi,参数 n 表示划分的任务数。另外配置的 SparkConf 如下。

代码语言:javascript
复制
# master 分支的选项,表示当 Executor 进程退出也不马上删除 Pod
spark.kubernetes.executor.deleteOnTermination=false
# 不开启 external shuffle service
spark.shuffle.service.enable=false
# 开启 DRA,并采用 track Executor 的方式
spark.dynamicAllocation.shuffleTracking.enabled=true
# 动态申请的 Executor 的上限为5个
spark.dynamicAllocation.maxExecutors=5

启动任务,虽然填写的 Executor 副本数为2,但是看看运行状态可以发现,当集群空闲的时候,可以一共申请5个 Exe

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-10-04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Overview
  • 2 Spark on Kubernetes 的发展
  • 3 Dynamic Resource Allocation 动态资源申请
  • 4 External Shuffle Service
  • 5 Spark on Kubernetes 的动态资源申请
  • 6 Summary
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档