前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark任务调度 | Spark,从入门到精通

Spark任务调度 | Spark,从入门到精通

作者头像
美图数据技术团队
发布2018-09-18 14:45:09
1.3K0
发布2018-09-18 14:45:09
举报

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)

往期直通车:Hello Spark!

Spark on Yarn

RDD原理与基础操作

图 1

如图 1 所示是 Spark 的执行过程,那么具体 Drvier 是如何把 Task 提交给 Executor 的呢?本文将通过 DAGScheduler 、TaskScheduler、调度池和 Executor 四部分介绍 Spark 的任务调度原理及过程。

/ DAGScheduler /

Spark 任务调度中各个 RDD 之间存在着依赖关系,这些依赖关系就形成有向无环图 DAG,DAGScheduler 负责对这些依赖关系形成的 DAG 并进行 Stage 划分,而 DAGScheduler 分为创建、Job 提交、Stage 划分、Task 生成四个部分。

DAGScheduler 创建—Job 提交—Stage 划分—Task 生成

代码语言:javascript
复制
private[spark]
class DAGScheduler(
  private[scheduler] val sc: SparkContext,
  private[scheduler] val taskScheduler: TaskScheduler,
  listenerBus: LiveListenerBus,
  mapOutputTracker: MapOutputTrackerMaster,
  blockManagerMaster: BlockManagerMaster,
  env: SparkEnv,
  clock: Clock = new SystemClock())
extends Logging {

def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
  this(
    sc,
    taskScheduler,
    sc.listenerBus,
    sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
    sc.env.blockManager.master,
    sc.env)
}
.....
}

DAGScheduler 在 SparkContext 中创建,并且需要提供 TaskScheduler 的实例。在构造函数中的 MapOutputTrackerMaster 是运行在 Driver 端用来管理 ShuffleMapTask 的输出,下游的 Task 可以通过 MapOutputTrackerMaster 来获取 Shuffle 输出的位置信息。

代码语言:javascript
复制
private[spark]
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

  .....
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
  .....
}

DAGScheduler 是基于 Akka Actor 的消息传递机制来构建事件循环处理逻辑,如上段代码所示,在 DAGScheduler 初始化时创建了 eventProcessLoop 以处理各种 DAGSchedulerEvent,这些事件包括作业的提交、任务状态的变化、监控等。

DAGScheduler 创建—Job 提交—Stage 划分—Task 生成

图 2

如图 2 所示是 RDD 的 count 执行调用过程。其中,在 DAGScheduelr 的 submitJob 方法中会生成 JobId,并创建一个 JobWaiter 监听 Job 是否执行成功。一个 Job 内包含多个 Task,只有所有 Task 都执行成功该 Job 才会被 JobWaiter 标记为 Succes。

DAGScheduler 创建—Job 提交—Stage 划分—Task 生成

用户提交的计算任务是由多个 RDD 构成的 DAG, 当 RDD 在转换时需要进行 Shuffle,Shuffle 的过程中就将这个 DAG 划分成了多个 Stage。

由于后面的 Stage 需要前面的 Stage 提供 Shuffle 的结果,因此不同的 Stage 不能并行计算。那么 RDD 在哪些操作时需要进行 Shuffle 呢?这里涉及到 RDD 的两种依赖关系:宽依赖与窄依赖。

图 3

如图 3 左侧所示为窄依赖,由于 RDD 每个 partition 依赖固定数量的 parent RDD 的 partition,所以可以通过 Task 来处理这些 partition。而且这些 partition 相互独立,所以 Task 可以并行计算。宽依赖反之。

图 4

让我们举例说明 Stage 的划分过程,如图 4 所示从触发 Action 的 RDD G 开始划分,G 依赖 B 和 F,处理 B 和 F 的顺序是随机的,假设先处理 B。由于 G 和 B 是窄依赖关系,可以划分到同一个 Stage 。接着处理 F,此时 F 和 G 是宽依赖关系,所以将 F 划分到一个新的 Stage,以此类推划分其它 Stage。

接着以 Stage 1 为例看它的计算方式,如图 4 所示 RDD A 有三个 Partition,因此会生成三个 ShuffleMapTask,这三个 Task 会把结果输出到三个 Partition 中。

DAGScheduler 创建—Job 提交—Stage 划分—Task 生成

任务生成首先要获取需要计算的 Partition,如果是最后的 Stage 所对应的 Task 是 ResultTask,那么先判断 ResultTask 是否结束,若结束则无需计算;对于其它 Stage 对应的都是 ShuffleMapTask,因此只需要判断 Stage 中是否有缓存结果。判断出哪些 Partition 需要计算后生成对应的 Task,然后封装到相应的 TaskSet 中,并提交给 TaskScheduler。TaskSet 中包含了一组处理逻辑完全相同的 Task,但它们的处理数据不同,这里的每个 Task 负责一个 partition。

/ TaskScheduler /

TaskScheduler 是在 SparkContext 中通过 createTaskScheduler 把引用传给 DAGScheduler 的构造函数。每个 TaskScheduler 都会对应一个 SchedulerBackend,TaskScheduler 负责 Application 中不同 job 之间的调度,在 Task 执行失败时启动重试机制,并且为执行速度慢的 Task 启动备份的任务;而 SchdulerBackend 负责与 Cluster Manager 交互,获取该 Application 分配到的资源,然后传给 TaskScheduler。

TaskScheduler 执行流程主要分成两个部分:Driver 端执行和 Executor 执行,他们的执行步骤分别如下:

Driver 端执行 TaskSchedulerImpl#submitTasks 将Task加入到TaskSetManager当中 ScheduleBuilder#addTaskSetManager 根据调度优先级确定调度顺序 CoarseGrainedSchdulerBackend#reviveOffers DriverActor#makeOffers TaskSchedulerImpl#resourceOffers 响应资源调度请求,为每个Task分配资源 DriverActor#launchTasks 将tasks发送到Executor Executor上执行 ReceiveWithLogging#launchTasks Executor#launchTask

/ 调度池 /

调度池顾名思义就是存放了一堆待执行的任务,它决定 TaskSetManager 的调度顺序,然后由 TaskSetManager 根据就近原则来确定 Task 运行在哪个 Executor。

那么它是如何决定 TaskSetManager 的调度顺序的呢? 调度池主要有两个决策策略:FIFO 和 FAIR。

图 5

首先以整体看 FIFO 和 FAIR 的执行对比图图 5,可以看出在左侧 FIFO 只有一个调度池,即 rootPool,里面包含了待调度的 TaskSetManager;而右侧 FAIR 在 rootPool 调度池中包含了多个子调度池,比如图中的 production 和 test 调度池。

在 FIFO 算法中需要保证 JobId 比较小的优先执行,如果是同一个 Job 则 StageId 比较小的先被调度。FAIR 算法则提供参数配置,如图 6 所示是一份配置文件:

图 6

接着看看我们的 Spark 集群是如何配置的。

代码语言:javascript
复制
private[spark] trait SchedulingAlgorithm {
  def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    res < 0
  }
}

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

    var compare = 0
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }
    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}

首先获取 S1 和 S2 两个调度池中的运行状态 Task 个数,若 S1 的运行状态 Task 数小于该调度池的最小资源数,而 S2 相反,那么此时优先调度 S1 中的 Task;如果 S1 和 S2 中的运行状态 Task 数都小于该调度池的最小资源数,那么就依据资源占用率决定调度优先级;如果 S1、S2 的运行状态 Task 数都大于所属调度池的最小资源数,那么就对比它们的已运行 task 个数与分配权重的比例,得出来比例较小的优先调度。

/ Executor /

图 7

图 8

如图 8 所示,Executor 是在 worker 收到 master 的 LaunchExecutorde 消息后创建的。在 TaskScheduler 阶段提交 Task 之后 Driver 会序列化封装 Task 的依赖文件和自身信息,然后在 Executor 上反序列化得到 Task。在准备好了 Task 的执行环境之后就通过 TaskRunner 去执行计算,得到执行状态。值得注意的是,在得到计算结果发回 Driver 的过程中,如果文件太大会被直接丢弃(可以通过 spark.driver.maxResultSize 来设定大小)。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 美图数据技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
GPU 云服务器
GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档