前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[spark] TaskScheduler 任务提交与调度源码解析

[spark] TaskScheduler 任务提交与调度源码解析

作者头像
UFO
发布2018-09-04 15:45:26
9280
发布2018-09-04 15:45:26
举报
文章被收录于专栏:Spark生态圈Spark生态圈

在DAGScheduler划分为Stage并以TaskSet的形式提交给TaskScheduler后,再由TaskScheduler通过TaskSetMagager对taskSet的task进行调度与执行。

代码语言:javascript
复制
taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

submitTasks方法的实现在TaskScheduler的实现类TaskSchedulerImpl中。先看整个实现:

代码语言:javascript
复制
override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }
代码语言:javascript
复制
val manager = createTaskSetManager(taskSet, maxTaskFailures)

先为当前TaskSet创建TaskSetManager,TaskSetManager负责管理一个单独taskSet的每一个task,决定某个task是否在一个executor上启动,如果task失败,负责重试task直到task重试次数,并通过延迟调度来执行task的位置感知调度。

代码语言:javascript
复制
val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager

key为stageId,value为一个HashMap,其中key为stageAttemptId,value为TaskSet。

代码语言:javascript
复制
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }

isZombie是TaskSetManager中所有tasks是否不需要执行(成功执行或者stage被删除)的一个标记,如果该TaskSet没有被完全执行并且已经存在和新进来的taskset一样的另一个TaskSet,则抛出异常,确保一个stage不能有两个taskSet同时运行。

代码语言:javascript
复制
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

将当前taskSet添加到调度池中,schedulableBuilder的类型是SchedulerBuilder的一个trait,有两个实现FIFOSchedulerBuilder和 FairSchedulerBuilder,并且默认采用的是FIFO方式。

schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。

代码语言:javascript
复制
def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }
代码语言:javascript
复制
backend.reviveOffers()

接下来调用了SchedulerBackend的riviveOffers方法向schedulerBackend申请资源。backend也是通过scheduler.initialize(backend)的参数传递过来的,具体是在SparkContext 中被创建的。

代码语言:javascript
复制
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

回到向schedulerBackend申请资源, 调用CoarseGrainedSchedulerBackend的reviveOffers方法,该方法给driverEndpoint发送ReviveOffer消息。

代码语言:javascript
复制
 override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }

driverEndpoint收到ReviveOffer消息后调用makeOffers方法。

代码语言:javascript
复制
private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

该方法先过滤出活跃的executor并封装成WorkerOffer,WorkerOffer包含executorId、host、可用的cores三个信息。这里的executorDataMap是HashMap[String, ExecutorData]类型,key为executorId,value为对应executor的信息,包括host、RPC信息、totalCores、freeCores。

在客户端向Master注册Application的时候,Master已经为Application分配并启动好Executor,然后注册给CoarseGrainedSchedulerBackend,注册信息就是存储在executorDataMap数据结构中。

代码语言:javascript
复制
launchTasks(scheduler.resourceOffers(workOffers))

先看里面的scheduler.resourceOffers(workOffers),TaskSchedulerImpl调用resourceOffers方法通过准备好的资源获得要被执行的Seq[TaskDescription],交给CoarseGrainedSchedulerBackend分发到各个executor上执行。下面看具体实现:

代码语言:javascript
复制
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    //标记是否有新的executor加入
    var newExecAvail = false
    // 更新executor,host,rack信息
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // 随机打乱offers,避免多个task集中分配到某些节点上,为了负载均衡
    val shuffledOffers = Random.shuffle(offers)
    // 建一个二维数组,保存每个Executor上将要分配的那些task
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    //每个executor上可用的cores
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //返回排序过的TaskSet队列,有FIFO及Fair两种排序规则,默认为FIFO
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) { // 如果该executor是新分配来的
        taskSet.executorAdded() // 重新计算TaskSetManager的就近原则
      }
    }

    // 利用双重循环对每一个taskSet依照调度的顺序,依次按照本地性级别顺序尝试启动task
    // 根据taskSet及locality遍历所有可用的executor,找出可以在各个executor上启动的task,
    // 加到tasks:Seq[Seq[TaskDescription]]中
    // 数据本地性级别顺序:PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
       //将计算资源按照就近原则分配给taskSet,用于执行其中的task
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

跟进resourceOfferSingleTaskSet方法:

代码语言:javascript
复制
private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    //遍历所有executor
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      //若当前executor可用的core数满足一个task所需的core数
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          //获取taskSet哪些task可以在该executor上启动
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            //将需要在该executor启动的task添加到tasks中
            tasks(i) += task 
            val tid = task.taskId 
            taskIdToTaskSetManager(tid) = taskSet // task -> taskSetManager
            taskIdToExecutorId(tid) = execId // task -> executorId
            executorIdToTaskCount(execId) += 1 //该executor上的task+1
            executorsByHost(host) += execId // host -> executorId
            availableCpus(i) -= CPUS_PER_TASK //该executor上可用core数减去对应task的core数
            assert(availableCpus(i) >= 0)
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }

这个方法主要是遍历所有可用的executor,在core满足一个task所需core的条件下,通过resourceOffer方法获取taskSet能在该executor上启动的task,并添加到tasks中予以返回。下面具体看resourceOffer的实现:

代码语言:javascript
复制
def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    if (!isZombie) {
      val curTime = clock.getTimeMillis()

      var allowedLocality = maxLocality

      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      dequeueTask(execId, host, allowedLocality) match {
        case Some((index, taskLocality, speculative)) =>
          // Found a task; do some bookkeeping and return a task description
          val task = tasks(index)
          val taskId = sched.newTaskId()
          // Do various bookkeeping
          copiesRunning(index) += 1
          val attemptNum = taskAttempts(index).size
          val info = new TaskInfo(taskId, index, attemptNum, curTime,
            execId, host, taskLocality, speculative)
          taskInfos(taskId) = info
          taskAttempts(index) = info :: taskAttempts(index)
          // Update our locality level for delay scheduling
          // NO_PREF will not affect the variables related to delay scheduling
          if (maxLocality != TaskLocality.NO_PREF) {
            currentLocalityIndex = getLocalityIndex(taskLocality)
            lastLaunchTime = curTime
          }
          // Serialize and return the task
          val startTime = clock.getTimeMillis()
          val serializedTask: ByteBuffer = try {
            Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          } catch {
            // If the task cannot be serialized, then there's no point to re-attempt the task,
            // as it will always fail. So just abort the whole task-set.
            case NonFatal(e) =>
              val msg = s"Failed to serialize task $taskId, not attempting to retry it."
              logError(msg, e)
              abort(s"$msg Exception during serialization: $e")
              throw new TaskNotSerializableException(e)
          }
          if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
              !emittedTaskSizeWarning) {
            emittedTaskSizeWarning = true
            logWarning(s"Stage ${task.stageId} contains a task of very large size " +
              s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
              s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
          }
          addRunningTask(taskId)

          // We used to log the time it takes to serialize the task, but task size is already
          // a good proxy to task serialization time.
          // val timeTaken = clock.getTime() - startTime
          val taskName = s"task ${info.id} in stage ${taskSet.id}"
          logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
            s" $taskLocality, ${serializedTask.limit} bytes)")

          sched.dagScheduler.taskStarted(task, info)
          return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
            taskName, index, serializedTask))
        case _ =>
      }
    }
    None
  }
代码语言:javascript
复制
 if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

getAllowedLocalityLevel(curTime)会根据延迟调度调整合适的Locality,目的都是尽可能的以最好的locality来启动每一个task,getAllowedLocalityLevel返回的是当前taskSet中所有未执行的task的最高locality,以该locality作为本次调度能容忍的最差locality,在后续的搜索中只搜索本地性比这个级别好的情况。allowedLocality 最终取以getAllowedLocalityLevel(curTime)返回的locality和maxLocality中级别较高的locality。

根据allowedLocality寻找合适的task,若返回不为空,则说明在该executor上分配了task,然后进行信息跟新,将taskid加入到runningTask中,跟新延迟调度信息,序列化task,通知DAGScheduler,最后返回taskDescription,我们来看看dequeueTask的实现:

代码语言:javascript
复制
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
      for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
        return Some((index, TaskLocality.PROCESS_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      for (index <- dequeueTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY, false))
      }
    }

    // find a speculative task if all others tasks have been scheduled
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  }

首先看是否存在execId对应的PROCESS_LOCAL类别的任务,如果存在,取出来调度,如果不存在,只在比allowedLocality大或者等于的级别上去查看是否存在execId对应类别的任务,若有则调度。

其中的dequeueTaskFromList是从execId对应类别(如PROCESS_LOCAL)的任务列表中尾部取出一个task返回其在taskSet中的taskIndex,跟进该方法:

代码语言:javascript
复制
private def dequeueTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
    var indexOffset = list.size
    while (indexOffset > 0) {
      indexOffset -= 1
      val index = list(indexOffset)
      if (!executorIsBlacklisted(execId, index)) {
        // This should almost always be list.trimEnd(1) to remove tail
        list.remove(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return Some(index)
        }
      }
    }
    None
  }

这里有个黑名单机制,利用executorIsBlacklisted方法查看该executor是否属于task的黑名单,黑名单记录task上一次失败所在的Executor Id和Host,以及其对应的“黑暗”时间,“黑暗”时间是指这段时间内不要再往这个节点上调度这个Task了。

代码语言:javascript
复制
private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
    if (failedExecutors.contains(taskId)) {
      val failed = failedExecutors.get(taskId).get
      return failed.contains(execId) &&
        clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
    }
    false
  }

可以看到在dequeueTask方法的最后一段代码:

代码语言:javascript
复制
 // find a speculative task if all others tasks have been scheduled
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}

这里是启动推测执行,推测任务是指对一个Task在不同的Executor上启动多个实例,如果有Task实例运行成功,则会干掉其他Executor上运行的实例,只会对运行慢的任务启动推测任务。

通过scheduler.resourceOffers(workOffers)方法返回了在哪些executor上启动哪些task的Seq[Seq[TaskDescription]]信息后,将调用launchTasks来启动各个task,实现如下:

代码语言:javascript
复制
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")

          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

先将task进行序列化, 如果当前task序列化后的大小超过了128MB-200KB,跳过当前task,并把对应的taskSetManager置为zombie模式,若大小不超过限制,则发送消息到executor启动task执行。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.10.16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档