前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark2.4.0屏障调度器

Spark2.4.0屏障调度器

作者头像
Spark学习技巧
发布2018-12-13 14:44:54
9430
发布2018-12-13 14:44:54
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

前几天,浪尖发了一篇文章,讲了Spark 2.4发布更新情况:

Spark2.4.0发布了!

其中,就有一项说到Spark 为了支持深度学习而引入的屏障调度器。本文就详细讲讲。

基于消息传递结构的计算模型和Spark计算模型是有很大区别。在Spark 内部,每个Stage的某一个一个task不会依赖于相同Stage任何其他的task,因此,Spark的task 可以被独立进行调度执行。为了在Spark中嵌入MPI功能,需要引入一个新的调度模型,暂时命名为“屏障调度”(浪尖直译自barrier scheduling),该调度模型会同时启动任务,并为用户提供足够的信息和工具,将分布式DL训练嵌入到Spark Pipeline中。 Spark还为MPI任务引入了一种新的容错机制。当任何MPI任务在中间失败时,Spark将中止所有任务并重新启动该stage。

1. 要求

概述

  • 每个job中单个barrier stage。
  • 每个job中多个barrier stage。
  • 多job且每个job都带有barrier stage。
  • Barrier stage 请求的slot比可用的slot多(无动态资源申请)。
  • Barrier stage请求的slot比可用的slot多(有动态资源申请)。(Spark 2.4就不要想了)

目标

  • 支持barrier调度:对于同一个barrierstage同时启动所有task,并且提供给用户足够的信息和工具,以便用户可以嵌入分布式DL训练模型。
  • 正确的处理失败的场景。
  • Barrier执行模式支持运行与Standalone模式
  • 使用yarn/mesos/k8s的用户可以再有BarrierStage的时候设置MPI。

安全

用户使用外部线程启动MPI任务的时候,存在外部进行不被杀死而导致内存泄漏的风险。Barrier tasks会使用远程客户端相互交流,但是不会影响Spark当前的安全模型。

API变化

代码语言:javascript
复制
class RDD[T] {
  /** Indicates that Spark must launch the tasks together for the current stage. */
  def barrier(): RDDBarrier[T] = ???
}
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
class BarrierTaskContext extends TaskContext {
  /** Sets a global barrier and waits until all tasks in this stage hit this
barrier. */
  def barrier(): Unit = ???
  /** Returns the all task infos in this stage. */
  def getTaskInfos(): Array[BarrierTaskInfo]
}
/** Represents an RDD barrier, which forces Spark to launch tasks of this stage
together. */
class RDDBarrier[T] {
  /** Maps partitions together with a provided [[BarrierTaskContext]]. */
  def mapPartitions[S](f: Iterator[T] => Iterator[S]): RDD[S] = ???
  /** TODO extra conf(e.g. timeout) */
}

使用案例

代码语言:javascript
复制
rdd.barrier().mapPartitions { iter =>
    // Write iter to disk.
    ???
    // Fetch TaskContext
    val context = BarrierTaskContext.get()
    // Wait until all tasks finished writing.
    context.barrier()
    // The 0-th task launches an MPI job.
    if (context.partitionId() == 0) {
      val hosts = context.getTaskInfos().map(_.address)
         // Set up MPI machine file using host infos.
      ???
      // Launch the MPI job by calling mpirun.
??? }
    // Wait until the MPI job finished.
    context.barrier()
    // Collect output and return.
??? }

3. 架构

设计提议

为了使spark支持屏障调度(barrier scheduling),在Spark内部增加了RDDBarrier和BarrierTaskContext。

BarrierStage

如果没有充足的slot资源,barrier stage不会被拉起(也即是空闲的core 必须能够拉起该barrier所有tasks),这样设计使为了满足一次拉起所有task的目标。

同时当任意的task执行失败的时候,总是重启整个barrier stage。

判断一个stage是否是Barrier stage的一种方式是跟踪Stage所包含的RDD,如果该stage包含RDDBarrier 或者至少一个父RDD是RDDBarrier,那么该stage是一个barrier stage,当然要以shuffleDependency为界限。

调度Barrier Tasks

目前,TaskScheduler会尽可能的在可用的slot上调度task,所以通常不会同时启动同一个stage的所有task。因此需要在barrier stage 的task在调度之前加上资源可用性判断。由于任务的局部性问题,仍然可能仅启动整个barrier stage的部分tasks,因此必须在启动任务之前在此检查确认同一个barrier stage的所有task同时被启动。

Barrier tasks预计比常规tasks具有更长的生命周期,因此barriertasks可能会在相对长的时间范围内占用集群资源,后续提交的任务估计会延迟运行或者仅使用更少的slot运行。建议使用Fair调度策略进行调度,而不是默认的FIFO调度策略,并将barrier任务独立运行,这样至少可以保证普通任务可以在配置给定最少的集群资源上运行。

另一个问题是barrier stage可以提交,但是集群当前没有足够的slot来同时启动所有barrier tasks。如果启用了动态资源分配,则在等待一段时间后,可能会或可能不会满足要求(取决于允许的最大节点)。对于Spark 2.4,提出了一个简单的解决方案,它只检查当前运行的slot的总数,如果数量不足以同时启动同一个stage的所有屏障任务,则取消该job。目标是在3.0的时候可以更好地与动态资源分配集成。对于Spark 2.4,在启用动态资源分配时,job会立即失败,或者job无法连续提交,因为它试图提交一个barrier stage,该stage需要比集群中当前可用的slot更多的slot。

Task Barrier

Barrier tasks应允许用户在task执行过程中插入同步操作,这可以通过在BarrierTaskContext中引入全局barrier操作来实现,这使得当前任务等待直到同一stage中的所有task都达到此barrier。将为BarrierTaskContext.barrier()提交单独的设计文档。

关注公众号,bigdatatip,回复barrier 即可获得该文档。

失败容错

为确保正确性,当任何task失败时,barrier stage始终会重试整个stage。因此,将要求杀死失败stage的所有正在运行的任务,并且还保证每个单个stage最多只能运行一个taskset (没有zombie task),这是非常简单的。理想情况下,除了在zombie taskset中杀死正在运行的任务需要一段时间,每个单一stage只应运行一个taskset,必须将失败的taskset标记为zombie 并正确处理TaskKilled消息。

推测任务(Speculativetask)

在barrier 执行模式中,要求每个barrier task必须仅有一个唯一的task ID,目的是其他的tasks 可以直接使用该ID和它交互。这也就意味着每个task只能尝试启动一次,因此必须禁止推测执行。

此外,3.0的时候可能会将Spark任务推测执行设置为单个stage的配置而不是全局配置。

SparkContext.runJob()/PartitionPruningRDD

SparkContext.runJob()执行的时候可以仅是所有分区的子集,其中一个用例是RDD.first(),不会执行所有分区。这种是与barrer执行模式冲突的,可能无法启动某些barrier tasks。在barrier stage检测到这种用法,会由于不支持该操作而抛出异常。

ParititionPruningRDD的情况类似,它只在满足`partitionFilterFunc`的分区上启动任务。我们将在barrierstage 检测PartitionPruningRDD并抛出显式异常。

以上问题都与父RDD与生成的RDD具有不同分区数的问题有关(例如union()/ coalesce()/ first()/ take()/ PartitionPruningRDD),因此可以检测RDD的血统链条,然后在job 提交的时候立即停止。

如果RDD依赖于多个barrier RDD(例如,barrierRdd1.zip(barrierRdd2)),也将立即停止,如果发生这种情况,则无法确保`barrier()`调用的正确行为。

针对Spark 3.0,可以进一步调查上述用例并提出支持它们的方法。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-11-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档