[Spark源码剖析] DAGScheduler划分stage划分stage源码剖析

划分stage源码剖析

本文基于Spark 1.3.1

先上一些stage相关的知识点:

  1. DAGScheduler将Job分解成具有前后依赖关系的多个stage
  2. DAGScheduler是根据ShuffleDependency划分stage的
  3. stage分为ShuffleMapStage和ResultStage;一个Job中包含一个ResultStage及多个ShuffleMapStage
  4. 一个stage包含多个tasks,task的个数即该stage的finalRDD的partition数
  5. 一个stage中的task完全相同,ShuffleMapStage包含的都是ShuffleMapTask;ResultStage包含的都是ResultTask

下图为整个划分stage的函数调用关系图

DAGScheduler划分stage函数调用关系.png

在DAGScheduler内部通过post一个JobSubmitted事件来触发Job的提交

DAGScheduler.eventProcessLoop.post( JobSubmitted(...) )
DAGScheduler.handleJobSubmitted

既然这两个方法都是DAGScheduler内部的实现,为什么不直接调用函数而要这样“多此一举”呢?我猜想这是为了保证整个系统事件模型的完整性。

DAGScheduler.handleJobSubmitted部分源码及如下

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      //< 创建finalStage可能会抛出一个异常, 比如, jobs是基于一个HadoopRDD的但这个HadoopRDD已被删除
      finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)
    } catch {
      case e: Exception => 
        return
    }
    
    //< 此处省略n行代码
  }

该函数通过调用newResultStage函数来创建唯一的ResultStage,也就是finalStage。调用newResultStage时,传入了finalRDD、partitions.size等参数。

跟进到DAGScheduler.newResultStage

  private def newResultStage(
      rdd: RDD[_],
      numTasks: Int,
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)

    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

DAGScheduler.newResultStage首先调用val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId),这个调用看起来像是要先确定好该ResultStage依赖的父stages,

问题1:那么是直接父stage呢?还是父stage及间接依赖的所有父stage呢?记住这个问题,继续往下看。

跟进到DAGScheduler.getParentStagesAndId:

  private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()      //< 这个调用确定了每个stage的id,划分stage时,会从右到左,因为是递归调用,其实越左的stage创建时,越早调到?try
    (parentStages, id)
  }

该函数调用getParentStages获得parentStages,之后获取一个递增的id,连同刚获得的parentStages一同返回,并在newResultStage中,将id作为ResultStage的id。那么, 问题2:stage id是父stage的大还是子stage的大?。继续跟进源码,所有提问均会在后面解答。

跟到getParentStages

  //< 这个函数的实现方式比较巧妙
  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    //< 通过vist一级一级vist得到的父stage
    val parents = new HashSet[Stage]
    //< 已经visted的rdd
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r

        for (dep <- r.dependencies) {
          dep match {
            //< 若为宽依赖,调用getShuffleMapStage
            case shufDep: ShuffleDependency[_, _, _] => 
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              //< 若为窄依赖,将该依赖中的rdd加入到待vist栈,以保证能一级一级往上vist,直至遍历整个DAG图
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

函数getParentStages中,遍历整个RDD依赖图的finalRDD的List[dependency] (关于RDD及依赖,可参考举例说明Spark RDD的分区、依赖,若遇到ShuffleDependency(即宽依赖),则调用getShuffleMapStage(shufDep, jobId)返回一个ShuffleMapStage类型对象,添加到父stage列表中。若为NarrowDenpendency,则将该NarrowDenpendency包含的RDD加入到待visit队列中,之后继续遍历待visit队列中的RDD,直到遇到ShuffleDependency或无依赖的RDD。

函数getParentStages的职责说白了就是:以参数rdd为起点,一级一级遍历依赖,碰到窄依赖就继续往前遍历,碰到宽依赖就调用getShuffleMapStage(shufDep, jobId)。这里需要特别注意的是,getParentStages以rdd为起点遍历RDD依赖并不会遍历整个RDD依赖图,而是一级一级遍历直到所有“遍历路线”都碰到了宽依赖就停止。剩下的事,在遍历的过程中交给getShuffleMapStage

那么,让我来看看函数getShuffleMapStage的实现:

private def getShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    jobId: Int): ShuffleMapStage = {
  shuffleToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) => stage
    case None =>
      // We are going to register ancestor shuffle dependencies
      registerShuffleDependencies(shuffleDep, jobId)

      //< 然后创建新的ShuffleMapStage
      val stage = newOrUsedShuffleStage(shuffleDep, jobId)
      shuffleToMapStage(shuffleDep.shuffleId) = stage

      stage
  }
}

在划分stage的过程中,由于每次shuffleDep.shuffleId都不同且都是第一次出现,显然shuffleToMapStage.get(shuffleDep.shuffleId)会match到None,便会调用newOrUsedShuffleStage。来看看它的实现:

private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
    val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
    while (parentsWithNoMapStage.nonEmpty) {
      //< 出栈的其实是shuffleDep的前一个宽依赖,且shuffleToMapStage不包含以该出栈宽依赖id为key的元素
      val currentShufDep = parentsWithNoMapStage.pop()
      //< 创建新的ShuffleMapStage
      val stage = newOrUsedShuffleStage(currentShufDep, jobId)
      //< 将新创建的ShuffleMapStage加入到shuffleId -> ShuffleMapStage映射关系中
      shuffleToMapStage(currentShufDep.shuffleId) = stage
    }
  }

函数registerShuffleDependencies首先调用getAncestorShuffleDependencies,这个函数遍历参数rdd的List[dependency],若遇到ShuffleDependency,加入到parents: Stack[ShuffleDependency[_, _, _]]中;若遇到窄依赖,则遍历该窄依赖对应rdd的父一层依赖,知道遇到宽依赖为止。实现与getParentStages基本一致,不同的是这里是将宽依赖加入到parents中并返回。

registerShuffleDependencies拿到各个“依赖路线”最近的所有宽依赖后。对每个宽依赖调用newOrUsedShuffleStage,该函数用来创建新ShuffleMapStage或获得已经存在的ShuffleMapStage。来看它的实现:

private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.size
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
    //< 若该shuffleDep.shulleId对应的, stage已经在MapOutputTracker中存在,那么可用的输出的数量及位置将从MapOutputTracker恢复
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      for (i <- 0 until locs.size) {
        stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
      }
      stage.numAvailableOutputs = locs.count(_ != null)
    } else {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      //< 否则使用shuffleDep.shuffleId, rdd.partitions.size在mapOutputTracker中注册,这会在shuffle阶段reducer从shuffleMap端fetch数据起作用
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
    }
    stage
  }

函数newOrUsedShuffleStage首先调用newShuffleMapStage来创建新的ShuffleMapStage,来看下newShuffleMapStage的实现:

  private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      jobId: Int,
      callSite: CallSite): ShuffleMapStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
      jobId, callSite, shuffleDep)

    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

结合文章开始处的函数调用关系图,可以看到newShuffleMapStage竟然又调用getParentStagesAndId来获取它的parentStages。那么,文章开头处的整个函数调用流程又会继续走一遍,不同的是起点rdd不是原来的finalRDD而是变成了这里的宽依赖的rdd。

静下心来,仔细看几遍上文提到的源码及注释,其实每一次的如上图所示的递归调用,其实就只做了两件事:

  1. 遍历起点RDD的依赖列表,若遇到窄依赖,则继续遍历该窄依赖的父List[RDD]的依赖,直到碰到宽依赖;若碰到宽依赖(不管是起点RDD的宽依赖还是遍历多级依赖碰到的宽依赖),则以宽依赖RDD为起点再次重复上述过程。直到到达RDD依赖图的最左端,也就是遍历到了没有依赖的RDD,则进入2
  2. 达到RDD依赖图的最左端,即递归调用也到了最深得层数,getParentStagesAndId中getParentStages第一次返回(第一次返回为空,因为最初的stage没有父stage),val id = nextStageId.getAndIncrement()也是第一次被调用,获得第一个stage的id,为0(注意,这个时候还没有创建第一个stage)。这之后,便调用
val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages, jobId, callSite, shuffleDep)

创建第一个ShuffleMapStage。至此,这一层递归调用结束,返回到上一层递归中,这一层创建的所有的ShuffleMapStage会作为下一层stage的父List[stage]用来构造上一层的stages。上一层递归调用返回后,上一层创建的stage又将作为上上层的parent List[stage]来构造上上层的stages。依次类推,直到最后的ResultStage也被创建出来为止。整个stage的划分完成。

有一个需要注意的点是,无论对于ShuffleMapStage还是ResultStage来说,task的个数即该stage的finalRDD的partition的个数,仔细查看下上文中的newResultStagenewShuffleMapStage函数可以搞明白这点,不再赘述。

最后,解答下上文中的两个问题: 问题1:每个stage都有一个val parents: List[Stage]成员,保存的是其直接依赖的父stages;其直接父stages又有自身依赖的父stages,以此类推,构成了整个DAG图

问题2:父stage的id比子stage的id小,DAG图中,越左边的stage,id越小。


本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏我是业余自学C/C++的

链队 原

队列用链表来表示时,需要用两个变量来记录队列两端的变化:theFront,theBack.

8620
来自专栏Python、Flask、Django

Python 3.7的新特性

15510
来自专栏Android干货

浅谈Semaphore类 如何控制某个方法允许并发访问线程的个数?

请求一个信号量,这时候信号量个数-1,当减少到0的时候,下一次acquire不会再执行,只有当执行一个release()的时候,信号量不为0的时候才可以继续执行...

26210
来自专栏PingCAP的专栏

三十分钟成为 Contributor | 为 TiKV 添加 built-in 函数

SQL 语句发送到 TiDB 后经过 parser 生成 AST(抽象语法树),再经过 Query Optimizer 生成执行计划,执行计划切分成很多子任务,...

15230
来自专栏Java帮帮-微信公众号-技术文章全总结

Java面试系列17-编程题-读取服务器字符、实现序列化、计数器、1000阶乘、n出列问题等

一,Java的通信编程,编程题(或问答),用JAVA SOCKET编程,读服务器几个字符,再写入本地显示? Server端程序: package test;...

48180
来自专栏菩提树下的杨过

Jboss EAP:native management API学习

上一节已经学习了CLI命令行来控制JBOSS,如果想在程序中以编码方式来控制JBOSS,可以参考下面的代码,实际上在前面的文章,用代码控制Jboss上的Data...

22290
来自专栏小巫技术博客

Android热补丁动态更新实践

20930
来自专栏小勇DW3

Mybatis使用动态代理实现拦截器功能

  拦截器顾名思义为拦截某个功能的一个武器,在众多框架中均有“拦截器”。这个Plugin有什么用呢?或者说拦截器有什么用呢?可以想想拦截器是怎么实现的。Plug...

54220
来自专栏流媒体

Resources和AssetManager创建过程

到这里AssetManager创建完毕。然后设置相关的路径 AssetManager assets = new AssetManager(); // res...

14850
来自专栏牛肉圆粉不加葱

[Spark源码剖析] DAGScheduler提交stage

DAGScheduler通过调用submitStage来提交stage,实现如下:

8220

扫码关注云+社区

领取腾讯云代金券