本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正。为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容。
第一部分内容见:
Spark学习:Spark源码和调优简介 Spark Core (一)
下面是重头戏submitMissingTasks,这个方法负责生成 TaskSet,并且将它提交给 TaskScheduler 低层调度器。 partitionsToCompute计算有哪些分区是待计算的。根据 Stage 类型的不同,findMissingPartitions的计算方法也不同。
// DAGScheduler.scala
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
...
// ResultStage.scala
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
// ActiveJob.scala
val numPartitions = finalStage match {
// 对于ResultStage,不一定得到当前rdd的所有分区,例如first()和lookup()的Action,
// 因此这里是r.