专栏首页大数据-数据人生DAGScheduler源码解读2-task创建
原创

DAGScheduler源码解读2-task创建

在上一篇文章中,我们分析了DAGScheduler的代码,重点了解了stage的创建和划分,是重中之重。这篇文章重点分析下task的创建:

private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}    

DAGScheduler在提交stage的时候,还会创建task,创建的task数量与partition数量一致。

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
  logDebug("submitMissingTasks(" + stage + ")")
  // Get our pending tasks and remember them in our pendingTasks entry
  stage.pendingPartitions.clear()

  // First figure out the indexes of partition ids to compute.
  //计算partition数量
  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

  // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
  // with this Stage
  val properties = jobIdToActiveJob(jobId).properties

  runningStages += stage
  // SparkListenerStageSubmitted should be posted before testing whether tasks are
  // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
  // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
  // event.
  stage match {
    case s: ShuffleMapStage =>
      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
    case s: ResultStage =>
      outputCommitCoordinator.stageStart(
        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
  }

  stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  // the serialized copy of the RDD and for each task we will deserialize it, which means each
  // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  // might modify state of objects referenced in their closures. This is necessary in Hadoop
  // where the JobConf/Configuration object is not thread-safe.
  var taskBinary: Broadcast[Array[Byte]] = null
  try {
    // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
    // For ResultTask, serialize and broadcast (rdd, func).
    val taskBinaryBytes: Array[Byte] = stage match {
      case stage: ShuffleMapStage =>
        JavaUtils.bufferToArray(
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
      case stage: ResultStage =>
        JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
    }

    taskBinary = sc.broadcast(taskBinaryBytes)
  } catch {
    // In the case of a failure during serialization, abort the stage.
    case e: NotSerializableException =>
      abortStage(stage, "Task not serializable: " + e.toString, Some(e))
      runningStages -= stage

      // Abort execution
      return
    case NonFatal(e) =>
      abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
      runningStages -= stage
      return
  }

  //重点:为stage创建指定数量task
  val tasks: Seq[Task[_]] = try {
    stage match {
      case stage: ShuffleMapStage =>
        //每一个partitions数量都会创建task
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
        }

      case stage: ResultStage =>
        val job = stage.activeJob.get
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = taskIdToLocations(id)
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
        }
    }
  } catch {
    case NonFatal(e) =>
      abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
      runningStages -= stage
      return
  }

  if (tasks.size > 0) {
    logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
    stage.pendingPartitions ++= tasks.map(_.partitionId)
    logDebug("New pending partitions: " + stage.pendingPartitions)
    //关键点:最后创建TaskSet提交任务
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  } else {
    // Because we posted SparkListenerStageSubmitted earlier, we should mark
    // the stage as completed here in case there are no tasks to run
    markStageAsFinished(stage, None)

    val debugString = stage match {
      case stage: ShuffleMapStage =>
        s"Stage ${stage} is actually done; " +
          s"(available: ${stage.isAvailable}," +
          s"available outputs: ${stage.numAvailableOutputs}," +
          s"partitions: ${stage.numPartitions})"
      case stage : ResultStage =>
        s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
    }
    logDebug(debugString)
  }
}

上面有一个关键点,在计算task的位置的时候,会调用内部方法taskIdToLocations,特地将该方法从上面主类中抽出来:

val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  stage match {
    case s: ShuffleMapStage =>
      partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    case s: ResultStage =>
      val job = s.activeJob.get
      partitionsToCompute.map { id =>
        val p = s.partitions(id)
        (id, getPreferredLocs(stage.rdd, p))
      }.toMap
  }
} catch {
  case NonFatal(e) =>
    stage.makeNewStageAttempt(partitionsToCompute.size)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage
    return
}

这里会计算最合适的task位置,其思路是从最后一个rdd开始查找,找到rdd被缓存了、或者checkpoint的位置,作为task的最佳位置。task在最佳位置的节点上执行,就不需要计算之前的rdd了。

/**
 * Recursive implementation for getPreferredLocs.
 *
 * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
 * methods (getCacheLocs()); please be careful when modifying this method, because any new
 * DAGScheduler state accessed by it may require additional synchronization.
 */
private def getPreferredLocsInternal(
    rdd: RDD[_],
    partition: Int,
    visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
  // If the partition has already been visited, no need to re-visit.
  // This avoids exponential path exploration.  SPARK-695
  if (!visited.add((rdd, partition))) {
    // Nil has already been returned for previously visited partitions.
    return Nil
  }
  // If the partition is cached, return the cache locations
  val cached = getCacheLocs(rdd)(partition)
  if (cached.nonEmpty) {
    return cached
  }
  // If the RDD has some placement preferences (as is the case for input RDDs), get those
  //判断rdd的partition是否checkpoint了
  val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
  if (rddPrefs.nonEmpty) {
    return rddPrefs.map(TaskLocation(_))
  }

  // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
  // that has any placement preferences. Ideally we would choose based on transfer sizes,
  // but this will do for now.
  //递归寻找rdd的父rdd,判断最佳位置
  rdd.dependencies.foreach {
    case n: NarrowDependency[_] =>
      for (inPart <- n.getParents(partition)) {
        val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
        if (locs != Nil) {
          return locs
        }
      }

    case _ =>
  }
  //如果该stage没有最佳位置,就为NIL,需要使用默认taskScheduler来进行分配
  Nil
}

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark DAGScheduler源码解读2-task创建

    在上一篇文章中,我们分析了DAGScheduler的代码,重点了解了stage的创建和划分,是重中之重。这篇文章重点分析下task的创建:

    幽鸿
  • Spark DAGScheduler源码解读1-stage划分

    这里创建一个stage,并且将stage放入scheduler的HashMap中进行管理:

    幽鸿
  • TaskScheduler源码解读

    这里调用了taskScheduler接口,我们打开TaskScheduler trait,trait在scala里就是接口,在IDEA中查看实现的类,使用快捷键...

    幽鸿
  • Spark DAGScheduler源码解读2-task创建

    在上一篇文章中,我们分析了DAGScheduler的代码,重点了解了stage的创建和划分,是重中之重。这篇文章重点分析下task的创建:

    幽鸿
  • sql 查询目标数据库中所有的表以其关键信息

    1、查询目标库中的所有表 SELECT obj.name tablename, ---表名 schem.name schemname, ---表所属的方案 i...

    郑小超.
  • 聊聊flink的logback配置

    flink-release-1.7.1/flink-dist/src/main/flink-bin/bin/flink-daemon.sh

    codecraft
  • 聊聊flink的logback配置

    flink-release-1.7.1/flink-dist/src/main/flink-bin/bin/flink-daemon.sh

    codecraft
  • net core如何在windows通过 .bat文件开机启动

    C:\Users\administrator(换成具体的自己电脑上的用户)\AppData\Roaming\Microsoft\Windows\Start Me...

    跟着阿笨一起玩NET
  • django2.0入门教程第三节

    章鱼喵
  • 腾讯企点助力新冠抗疫心理支持热线 | 抗击疫情公益行动

    ? 自1月30日23点,腾讯企点发布【向防疫服务相关组织免费】公益行动起,我们每天收到的开通咨询络绎不绝。 昨天,我们接到了来自浙江大学心理健康教育与咨询中心...

    腾讯企点

扫码关注云+社区

领取腾讯云代金券