[spark] DAGScheduler划分stage源码解析

概述

Spark Application只有遇到action操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个RDD之间的依赖关系形成一个DAG,并根据ShuffleDependency来进行stage的划分,stage包含多个tasks,个数由该stage的finalRDD决定,stage里面的task完全相同,DAGScheduler 完成stage的划分后基于每个Stage生成TaskSet,并提交给TaskScheduler,TaskScheduler负责具体的task的调度,在Worker节点上启动task。

Job的提交

以count为例,直接看源码都有哪些步骤:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    DAGScheduler#runJob
        DAGScheduler#runJob
            DAGScheduler#runJob
                DAGScheduler#dagScheduler.runJob
                    DAGScheduler#submitJob
                        eventProcessLoop.post(JobSubmitted(**))

eventProcessLoop是一个DAGSchedulerEventProcessLoop(this)对象,可以把DAGSchedulerEventProcessLoop理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节。无论是内部还是外部消息,DAGScheduler可以共用同一消息处理代码,逻辑清晰,处理方式统一。 eventProcessLoop接收各种消息并进行处理,处理的逻辑在其doOnReceive方法中:

 private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    ......
}

当提交的是JobSubmitted,便会通过 dagScheduler.handleJobSubmitted处理此事件。

Stage的划分

在handleJobSubmitted方法中第一件事情就是通过finalRDD向前追溯对Stage的划分。

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try { 
 //Stage划分过程是从最后一个Stage开始往前执行的,最后一个Stage的类型是ResultStage
    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }
  //为此job生成一个ActiveJob对象
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
  logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  logInfo("Parents of final stage: " + finalStage.parents)
  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job //记录该job处于active状态
  activeJobs += job 
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post( //向LiveListenerBus发送Job提交事件
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  submitStage(finalStage) //提交Stage

  submitWaitingStages()
}

跟进newResultStage方法:

private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) //获取stage的parentstage
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
    stageIdToStage(id) = stage //将Stage和stage_id关联
    updateJobIdStageIdMaps(jobId, stage) //跟新job所包含的stage
    stage
  }

直接实例化一个ResultStage,但需要parentStages作为参数,我们看看getParentStagesAndId做了什么:

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, firstJobId)
    val id = nextStageId.getAndIncrement()
    (parentStages, id)
  }

获取parentStages,并返回一个与stage关联的唯一id,由于是递归的向前生成stage,所以最先生成的stage是最前面的stage,越往前的stageId就越小,即父Stage的id最小。继续跟进getParentStages:

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage] // 当前Stage的所有parent Stage
    val visited = new HashSet[RDD[_]] // 已经访问过的RDD
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]] //等待访问的RDD
    def visit(r: RDD[_]) {
      if (!visited(r)) { //若未访问过
        visited += r  //标记已被访问
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) { //遍历其所有依赖
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => //若为宽依赖,则生成新的Stage,shuffleMapstage
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ => //若为窄依赖(归为当前Stage),压入栈,继续向前循环,直到遇到宽依赖或者无依赖
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd) //将当前rdd压入栈
    while (waitingForVisit.nonEmpty) { //等待访问的rdd不为空时继续访问
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

通过给定的RDD返回其依赖的Stage集合。通过RDD每一个依赖进行遍历,遇到窄依赖就继续往前遍历,遇到ShuffleDependency便通过getShuffleMapStage返回一个ShuffleMapStage对象添加到父Stage列表中。可见,这里的parentStage是Stage直接依赖的父stages(parentStage也有自己的parentStage),而不是整个DAG的所有stages。继续跟进getShuffleMapStage的实现:

private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage //若已经在shuffleToMapStage存在直接返回Stage
      case None => //不存在需要生成新的Stage
        //为当前shuffle的父shuffle都生成一个ShuffleMapStage
       getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          if (!shuffleToMapStage.contains(dep.shuffleId)) {
            shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) //跟新shuffleToMapStage映射
          }
        }
        // 为当前shuffle生成新的Stage
        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage
        stage
    }
  }

先从shuffleToMapStage根据shuffleid获取Stage,若未获取到再去计算,第一次都肯定为None,我们先看getAncestorShuffleDependencies干了什么:

 private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    val parents = new Stack[ShuffleDependency[_, _, _]] // 当前shuffleDependency所有的祖先ShuffleDependency(不是直接ShuffleDependency)
    val visited = new HashSet[RDD[_]] // 已经被访问过的RDD
    // 等待被访问的RDD
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) { //未被访问过
        visited += r //标记已被访问
        for (dep <- r.dependencies) { //遍历直接依赖
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => 
              if (!shuffleToMapStage.contains(shufDep.shuffleId)) { // 若为shuffleDependency并且还没有映射,则添加到parents 
                parents.push(shufDep)
              }
            case _ =>
          }
          waitingForVisit.push(dep.rdd)  //即使是shuffleDependency的rdd也要继续遍历
        }
      }
    }

    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents
  }

貌似和getParentStages方法很像,区别是这里获取的所有祖先ShuffleDependency,而不是直接父ShuffleDependency。

为当前shuffle的父shuffle都生成一个ShuffleMapStage后再通过newOrUsedShuffleStage获取当前依赖的shuffleStage,再和shuffleid关联起来,看newOrUsedShuffleStage的实现:

private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd //依赖对应的rdd
    val numTasks = rdd.partitions.length //分区个数
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) //返回当前rdd的shufflestage
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    //如果当前shuffle已经在MapOutputTracker中注册过,也就是Stage已经被计算过,从MapOutputTracker中获取计算结果
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i => // 更新Shuffle的Shuffle Write路径
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else { //还没有被注册计算过
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)  //注册
    }
    stage
  }

继续看newShuffleMapStage:

private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int,
      callSite: CallSite): ShuffleMapStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId) //获取parentstages即stageid
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
      firstJobId, callSite, shuffleDep) //实例化一个shuffleStage对象

    stageIdToStage(id) = stage //Stage和id关联
    updateJobIdStageIdMaps(firstJobId, stage) //跟新job所有的Stage
    stage
  }

怎么和newResultStage极其的相似?是的没错,这里会生成ShuffleStage,getParentStagesAndId里面的实现就是一个递归调用。

由finalRDD往前追溯递归生成Stage,最前面的ShuffleStage先生成,最终生成ResultStage,至此,DAGScheduler对Stage的划分已经完成。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏简单聊聊Spark

spark-1.3.0的编译及三种运行模式的配置

最近安装spark1.3.0并与Hadoop2.5.0集成,在Spark的历史资源管理中心没有找到对应的版本,而且在网上也没有找到对应版本的编译教程,于是只能根...

22230
来自专栏简单聊聊Spark

Spark性能调优篇二之重构RDD架构及RDD持久化

上一篇介绍了一些关于提交Spark任务参数的调优,本片文章来聊聊一个Spark作业中RDD的重构,以及一些复用的RDD持久化的常用策略。

10620
来自专栏极客编程

Apache spark 的一些浅见。

分布并行计算和几个人一起搬砖的意思是一致的,一个资源密集型的任务(搬砖或计算),需要 一组资源(小伙伴或计算节点),并行地完成:

14220
来自专栏Spark生态圈

[spark] RDD解析

每个具体的RDD都得实现compute 方法,该方法接受的参数之一是一个Partition 对象,目的是计算该分区中的数据。 我们通过map方法来看具体的实现...

11010
来自专栏Spark生态圈

[spark] TaskScheduler 任务提交与调度源码解析

在DAGScheduler划分为Stage并以TaskSet的形式提交给TaskScheduler后,再由TaskScheduler通过TaskSetMagag...

29330
来自专栏Spark生态圈

[spark] DAGScheduler 提交stage源码解析

DAGScheduler在划分完Stage后([spark] DAGScheduler划分stage源码解析 ),将会通过submitStage(finalSt...

13230
来自专栏Spark生态圈

[spark] RDD缓存源码解析

我们可以利用不同的存储级别存储每一个被持久化的RDD。可以存储在内存中,也可以序列化后存储在磁盘上等方式。Spark也会自动持久化一些shuffle操作(如re...

30030
来自专栏Spark生态圈

[spark] Task执行流程

在文章TaskScheduler 任务提交与调度源码解析 中介绍了Task在executor上的逻辑分配,调用TaskSchedulerImpl的resourc...

22310
来自专栏Spark生态圈

[spark] 数据本地化及延迟调度

Spark数据本地化即移动计算而不是移动数据,而现实又是残酷的,不是想要在数据块的地方计算就有足够的资源提供,为了让task能尽可能的以最优本地化级别(Loca...

23120

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励