专栏首页大数据-数据人生TaskScheduler源码解读
原创

TaskScheduler源码解读

在DAGScheduler中提交task方法入口:

taskScheduler.submitTasks(new TaskSet(
  tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

这里调用了taskScheduler接口,我们打开TaskScheduler trait,trait在scala里就是接口,在IDEA中查看实现的类,使用快捷键:ctrl+H,或者直接使用快捷键:ctrl + alt +B查看实现

查看submitTasks方法实现:

override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
   //给每一个TaskSet创建一个TaskSetManager
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    //将manager进行内存缓存,manager负责对task进行跟踪管理,包括重试
    stageTaskSets(taskSet.stageAttemptId) = manager
    val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
      ts.taskSet != taskSet && !ts.isZombie
    }
    if (conflictingTaskSet) {
      throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
        s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
    }
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }
  //这里的backend就是SparkContext创建好的SparkDeploySchedulerBackend
  //这里backend负责创建AppClient,向Master注册Application
  //参见SparkContext代码createTaskScheduler方法
  backend.reviveOffers()
}

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark DAGScheduler源码解读1-stage划分

    这里创建一个stage,并且将stage放入scheduler的HashMap中进行管理:

    幽鸿
  • Spark DAGScheduler源码解读2-task创建

    在上一篇文章中,我们分析了DAGScheduler的代码,重点了解了stage的创建和划分,是重中之重。这篇文章重点分析下task的创建:

    幽鸿
  • DAGScheduler源码解读2-task创建

    在上一篇文章中,我们分析了DAGScheduler的代码,重点了解了stage的创建和划分,是重中之重。这篇文章重点分析下task的创建:

    幽鸿
  • 机器人会消灭工人阶级吗?

    周政华   腾讯研究院高级研究员   在北京很多老旧居民楼里,电梯门开时,迎接你的是一个五十岁上下的中年妇女,她通常面无表情地坐在张油漆斑驳的靠背椅上,手持...

    腾讯研究院
  • 直接内存回收中的等待队列

    在直接内存回收过程中,有可能会造成当前需要分配内存的进程被加入一个等待队列,当整个node的空闲页数量满足要求时,由kswapd唤醒它重新获取内存。这个等待队列...

    233333
  • 用带注意力机制的模型分析评论者是否满意

    本内容取之电子工业出版社出版、李金洪编著的《深度学习之TensorFlow工程化项目实战》一书的实例36。

    代码医生工作室
  • 常用开源框架中设计模式使用分析-工厂模式(Factory Pattern)

    工厂模式是创建型模式,他封装了对象的创建过程,调用者使用具体的工厂方法根据参数就可以获取对应的对象。

    加多
  • 注册中心 Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态

    这里要注意下,不是应用实例的状态( status ),而是覆盖状态( overridestatus ) 。代码如下:

    芋道源码
  • Elasticsearch6.2.2 X-Pack部署及使用详解

    X-Pack已经开源,预计Elasticsearch6.3版本会全面集成,不再收费。 赶紧体验一下强大的X-Pack吧! 1、 X-Pack 概览 X-Pac...

    铭毅天下
  • Docker镜像pull不下来最终解决方法

    发现是因为docker加速器超时导致pull不下来 查看加速器:/etc/docker/daemon.json

    IT小马哥

扫码关注云+社区

领取腾讯云代金券