前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)

Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)

原创
作者头像
thinktothings
修改2019-01-17 16:04:27
6830
修改2019-01-17 16:04:27
举报
文章被收录于专栏:Spark2.4.0Spark2.4.0
代码语言:txt
复制
# Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)

## 更多资源
- github: https://github.com/opensourceteams/spark-scala-maven-2.4.0
 
## 时序图
- https://github.com/opensourceteams/spark-scala-maven-2.4.0/blob/master/md/image/example/spark-sql-dataset/worldCount/worldCount.FinallStage.jpg

![worldCount.FinallStage.jpg](https://ask.qcloudimg.com/draft/2871328/4wchpnv15s.jpg)


## 主要内容描述
- 理解FinalStage的转化(即Stage的划分)

## Stage 图解
- https://github.com/opensourceteams/spark-scala-maven-2.4.0/blob/master/md/image/example/spark-sql-dataset/worldCount/dagVisualization/worldCount-stage.png



## 源码分析

### DAGSchedulerEventProcessLoop.onReceive
- 调用DAGSchedulerEventProcessLoop.doOnReceive()函数处理事件

/**

  • The main event loop of the DAG scheduler. */ override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { doOnReceive(event) } finally { timerContext.stop() } }
代码语言:txt
复制
### DAGSchedulerEventProcessLoop.onReceive
- 处理JobSubmitted事件,即调用dagScheduler.handleJobSubmitted()函数处理

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

代码语言:txt
复制
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
代码语言:txt
复制
  dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
代码语言:txt
复制
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
代码语言:txt
复制
  dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
代码语言:txt
复制
case StageCancelled(stageId, reason) =>
代码语言:txt
复制
  dagScheduler.handleStageCancellation(stageId, reason)
代码语言:txt
复制
case JobCancelled(jobId, reason) =>
代码语言:txt
复制
  dagScheduler.handleJobCancellation(jobId, reason)
代码语言:txt
复制
case JobGroupCancelled(groupId) =>
代码语言:txt
复制
  dagScheduler.handleJobGroupCancelled(groupId)
代码语言:txt
复制
case AllJobsCancelled =>
代码语言:txt
复制
  dagScheduler.doCancelAllJobs()
代码语言:txt
复制
case ExecutorAdded(execId, host) =>
代码语言:txt
复制
  dagScheduler.handleExecutorAdded(execId, host)
代码语言:txt
复制
case ExecutorLost(execId, reason) =>
代码语言:txt
复制
  val workerLost = reason match {
代码语言:txt
复制
    case SlaveLost(_, true) => true
代码语言:txt
复制
    case _ => false
代码语言:txt
复制
  }
代码语言:txt
复制
  dagScheduler.handleExecutorLost(execId, workerLost)
代码语言:txt
复制
case WorkerRemoved(workerId, host, message) =>
代码语言:txt
复制
  dagScheduler.handleWorkerRemoved(workerId, host, message)
代码语言:txt
复制
case BeginEvent(task, taskInfo) =>
代码语言:txt
复制
  dagScheduler.handleBeginEvent(task, taskInfo)
代码语言:txt
复制
case SpeculativeTaskSubmitted(task) =>
代码语言:txt
复制
  dagScheduler.handleSpeculativeTaskSubmitted(task)
代码语言:txt
复制
case GettingResultEvent(taskInfo) =>
代码语言:txt
复制
  dagScheduler.handleGetTaskResult(taskInfo)
代码语言:txt
复制
case completion: CompletionEvent =>
代码语言:txt
复制
  dagScheduler.handleTaskCompletion(completion)
代码语言:txt
复制
case TaskSetFailed(taskSet, reason, exception) =>
代码语言:txt
复制
  dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
代码语言:txt
复制
case ResubmitFailedStages =>
代码语言:txt
复制
  dagScheduler.resubmitFailedStages()

}

代码语言:txt
复制
### dagScheduler.handleJobSubmitted()
- JobSubmitted事件处理的方法
- 本节内容关注该函数中FinalStage是怎么得到的,即Stage的划分,调用函数createResultStage(FinalRDD)得到FinalStage
- 得到FinalStage之后,再调用DagScheduler.submitStage(FinalStage)函数对FinalStage进行提交处理(其它内容分析)

privatescheduler def handleJobSubmitted(jobId: Int,

代码语言:txt
复制
  finalRDD: RDD[_],
代码语言:txt
复制
  func: (TaskContext, Iterator[_]) => _,
代码语言:txt
复制
  partitions: Array[Int],
代码语言:txt
复制
  callSite: CallSite,
代码语言:txt
复制
  listener: JobListener,
代码语言:txt
复制
  properties: Properties) {
代码语言:txt
复制
var finalStage: ResultStage = null
代码语言:txt
复制
try {
代码语言:txt
复制
  // New stage creation may throw an exception if, for example, jobs are run on a
代码语言:txt
复制
  // HadoopRDD whose underlying HDFS files have been deleted.
代码语言:txt
复制
  finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
代码语言:txt
复制
} catch {
代码语言:txt
复制
  case e: BarrierJobSlotsNumberCheckFailed =>
代码语言:txt
复制
    logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
代码语言:txt
复制
      "than the total number of slots in the cluster currently.")
代码语言:txt
复制
    // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
代码语言:txt
复制
    val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
代码语言:txt
复制
      new BiFunction[Int, Int, Int] {
代码语言:txt
复制
        override def apply(key: Int, value: Int): Int = value + 1
代码语言:txt
复制
      })
代码语言:txt
复制
    if (numCheckFailures <= maxFailureNumTasksCheck) {
代码语言:txt
复制
      messageScheduler.schedule(
代码语言:txt
复制
        new Runnable {
代码语言:txt
复制
          override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
代码语言:txt
复制
            partitions, callSite, listener, properties))
代码语言:txt
复制
        },
代码语言:txt
复制
        timeIntervalNumTasksCheck,
代码语言:txt
复制
        TimeUnit.SECONDS
代码语言:txt
复制
      )
代码语言:txt
复制
      return
代码语言:txt
复制
    } else {
代码语言:txt
复制
      // Job failed, clear internal data.
代码语言:txt
复制
      barrierJobIdToNumTasksCheckFailures.remove(jobId)
代码语言:txt
复制
      listener.jobFailed(e)
代码语言:txt
复制
      return
代码语言:txt
复制
    }
代码语言:txt
复制
  case e: Exception =>
代码语言:txt
复制
    logWarning("Creating new stage failed due to exception - job: " + jobId, e)
代码语言:txt
复制
    listener.jobFailed(e)
代码语言:txt
复制
    return
代码语言:txt
复制
}
代码语言:txt
复制
// Job submitted, clear internal data.
代码语言:txt
复制
barrierJobIdToNumTasksCheckFailures.remove(jobId)
代码语言:txt
复制
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
代码语言:txt
复制
clearCacheLocs()
代码语言:txt
复制
logInfo("Got job %s (%s) with %d output partitions".format(
代码语言:txt
复制
  job.jobId, callSite.shortForm, partitions.length))
代码语言:txt
复制
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
代码语言:txt
复制
logInfo("Parents of final stage: " + finalStage.parents)
代码语言:txt
复制
logInfo("Missing parents: " + getMissingParentStages(finalStage))
代码语言:txt
复制
val jobSubmissionTime = clock.getTimeMillis()
代码语言:txt
复制
jobIdToActiveJob(jobId) = job
代码语言:txt
复制
activeJobs += job
代码语言:txt
复制
finalStage.setActiveJob(job)
代码语言:txt
复制
val stageIds = jobIdToStageIds(jobId).toArray
代码语言:txt
复制
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
代码语言:txt
复制
listenerBus.post(
代码语言:txt
复制
  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
代码语言:txt
复制
submitStage(finalStage)

}

代码语言:txt
复制
### DagScheduler.createResultStage
- 该函数创建FinalStage,即FinalStage一定是ResultStage
- ResultStage是依赖上级Stage,所以计算ResultStage之前,是先计算ResultStage的上级Stage
- 调用函数DagScheduler.getOrCreateParentStages(RDD)得到上级Stage列表

/**

  • Create a ResultStage associated with the provided jobId. */ private def createResultStage( rdd: RDD_, func: (TaskContext, Iterator_) => _, partitions: ArrayInt, jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }### DagScheduler.getOrCreateParentStages - 调用函当我getShuffleDependencies得到FinalRDD上级链的第一个ShuffleDependency - 调用DAGScheduler.getOrCreateShuffleMapStage(ShuffleDependency)函数得到ShuffleMapStage /**### DAGScheduler.getOrCreateShuffleMapStage - 根据shuffleId做为key,缓存数据中shuffleIdToMapStage,如果有就直接返回缓存中的数据 - 如果缓存中没有,说明还没有被计算过,调用函数DAGScheduler.getMissingAncestorShuffleDependencies继续判断上级RDD还有没有ShuffleDependency,如果上级RDD还有ShuffleDependency,就先创建上级ShuffleMapStage,如此类推,就变成,先从顶级ShuffleDependency开如,创建他的ShuffleMapStage,一直到最后一个ShuffleDependency和对应的ShuffleMapStage,这样就从FinalStage从始到顶上所有的Stage都已创建好了 - 调用DAGScheduler.createShuffleMapStage(ShuffleDep)函数创建ShuffleMapStage /**
  • Get or create the list of parent stages for a given RDD. The new Stages will be created with
  • the provided firstJobId. */ private def getOrCreateParentStages(rdd: RDD_, firstJobId: Int): ListStage = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
  • Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
  • shuffle map stage doesn't already exist, this method will create the shuffle map stage in
  • addition to any missing ancestor shuffle map stages. */ private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency, , _, firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage
代码语言:txt
复制
  case None =>
代码语言:txt
复制
    // Create stages for all missing ancestor shuffle dependencies.
代码语言:txt
复制
    getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
代码语言:txt
复制
      // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
代码语言:txt
复制
      // that were not already in shuffleIdToMapStage, it's possible that by the time we
代码语言:txt
复制
      // get to a particular dependency in the foreach loop, it's been added to
代码语言:txt
复制
      // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
代码语言:txt
复制
      // SPARK-13902 for more information.
代码语言:txt
复制
      if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
代码语言:txt
复制
        createShuffleMapStage(dep, firstJobId)
代码语言:txt
复制
      }
代码语言:txt
复制
    }
代码语言:txt
复制
    // Finally, create a stage for the given shuffle dependency.
代码语言:txt
复制
    createShuffleMapStage(shuffleDep, firstJobId)
代码语言:txt
复制
}

}

代码语言:txt
复制
### DAGScheduler.createShuffleMapStage(ShuffleDep)
- 先计算上级Stage,调用函数DAGScheduler.getOrCreateParentStages(RDD)
- new ShuffleMapStage(ParentStage)

/**

  • Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
  • previously run stage generated the same shuffle data, this function will copy the output
  • locations that are still available from the previous shuffle to avoid unnecessarily
  • regenerating data. */ def createShuffleMapStage(shuffleDep: ShuffleDependency, , _, jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
代码语言:txt
复制
stageIdToStage(id) = stage
代码语言:txt
复制
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
代码语言:txt
复制
updateJobIdStageIdMaps(jobId, stage)
代码语言:txt
复制
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
代码语言:txt
复制
  // Kind of ugly: need to register RDDs with the cache and map output tracker here
代码语言:txt
复制
  // since we can't do it in the RDD constructor because # of partitions is unknown
代码语言:txt
复制
  logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
代码语言:txt
复制
  mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
代码语言:txt
复制
}
代码语言:txt
复制
stage

}

代码语言:txt
复制
### DAGScheduler.getOrCreateParentStages(RDD)
- 等于是找上级RDD还有没有ShuffleDependency,如果有,就先处理上级ShuffleDependency,即给上级ShuffleDependency,找到对应的ShuffleMapStage
- 如果上级RDD没有ShuffleDependency,即为Nil,即ShuffleMapStage的上级Stage为Nil,此时的ShuffleMapStage为顶级Stage
- 向上返回DAGScheduler.createResultStage

/**

  • Get or create the list of parent stages for a given RDD. The new Stages will be created with### DAGScheduler.createResultStage - 函数DAGScheduler.getOrCreateParentStages()得到的为ShuffleMapStage - new ResultStage(ParentStage)作为FinalStage - 到这里FinalStage已经计算完成,(即Stage划分已经完成) /**
  • the provided firstJobId. */ private def getOrCreateParentStages(rdd: RDD_, firstJobId: Int): ListStage = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
  • Create a ResultStage associated with the provided jobId. */ private def createResultStage( rdd: RDD_, func: (TaskContext, Iterator_) => _, partitions: ArrayInt, jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
代码语言:txt
复制
end

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档