[spark] spark推测式执行

概述

推测任务是指对于一个Stage里面拖后腿的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上运行的实例。spark推测式执行默认是关闭的,可通过spark.speculation属性来开启。

检测是否有需要推测式执行的Task

在SparkContext创建了schedulerBackend和taskScheduler后,立即调用了taskScheduler 的start方法:

override def start() {
    backend.start()
    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

可以看到,TaskScheduler在启动SchedulerBackend后,在非local模式前提下检查推测式执行功能是否开启(默认关闭,可通过spark.speculation开启),若开启则会启动一个线程每隔SPECULATION_INTERVAL_MS(默认100ms,可通过spark.speculation.interval属性设置)通过checkSpeculatableTasks方法检测是否有需要推测式执行的tasks:

// Check for speculatable tasks in all our active jobs.
  def checkSpeculatableTasks() {
    var shouldRevive = false
    synchronized {
      shouldRevive = rootPool.checkSpeculatableTasks()
    }
    if (shouldRevive) {
      backend.reviveOffers()
    }
  }

然后又通过rootPool的方法判断是否有需要推测式执行的tasks,若有则会调用SchedulerBackend的reviveOffers去尝试拿资源运行推测任务。继续看看检测逻辑是什么样的:

override def checkSpeculatableTasks(): Boolean = {
    var shouldRevive = false
    for (schedulable <- schedulableQueue.asScala) {
      shouldRevive |= schedulable.checkSpeculatableTasks()
    }
    shouldRevive
  }

在rootPool里又调用了schedulable的方法,schedulable是ConcurrentLinkedQueue[Schedulable]类型,队列里面放的都是TaskSetMagager,再看TaskSetMagager的checkSpeculatableTasks方法,终于找到检测根源了:

 override def checkSpeculatableTasks(): Boolean = {
    //如果task只有一个或者所有task都不需要再执行了就没有必要再检测
    if (isZombie || numTasks == 1) {  
      return false
    }
    var foundTasks = false
    // 所有task数 * SPECULATION_QUANTILE(默认0.75,可通过spark.speculation.quantile设置) 
    val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
    logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
    //成功的task数是否超过总数的75%,并且成功的task是否大于0
    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
      val time = clock.getTimeMillis()
      // 过滤出成功执行的task的执行时间并排序
      val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
      Arrays.sort(durations)
     // 取这多个时间的中位数
      val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1))
      // 中位数 * SPECULATION_MULTIPLIER (默认1.5,可通过spark.speculation.multiplier设置)
      val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
      logDebug("Task length threshold for speculation: " + threshold)
      // 遍历该TaskSet中的task,取未成功执行、正在执行、执行时间已经大于threshold 、
      // 推测式执行task列表中未包括的task放进需要推测式执行的列表中speculatableTasks
      for ((tid, info) <- taskInfos) {
        val index = info.index
        if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
          !speculatableTasks.contains(index)) {
          logInfo(
            "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
              .format(index, taskSet.id, info.host, threshold))
          speculatableTasks += index
          foundTasks = true
        }
      }
    }
    foundTasks
  }

检查逻辑代码中注释很明白,当成功的Task数超过总Task数的75%(可通过参数spark.speculation.quantile设置)时,再统计所有成功的Tasks的运行时间,得到一个中位数,用这个中位数乘以1.5(可通过参数spark.speculation.multiplier控制)得到运行时间门限,如果在运行的Tasks的运行时间超过这个门限,则对它启用推测。简单来说就是对那些拖慢整体进度的Tasks启用推测,以加速整个Stage的运行。 算法大致流程如图:

推测式任务什么时候被调度

在TaskSetMagager在延迟调度策略下为一个executor分配一个task时会调用dequeueTask方法:

private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }
   ......
    // find a speculative task if all others tasks have been scheduled
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  }

该方法的最后一段就是在其他任务都被调度后为推测式任务进行调度,看看起实现:

protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
  {
    //从推测式执行任务列表中移除已经成功完成的task,因为从检测到调度之间还有一段时间,
    //某些task已经成功执行
    speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
     // 判断task是否可以在该executor对应的Host上执行,判断条件是:
     // task没有在该host上运行;
     // 该executor没有在task的黑名单里面(task在这个executor上失败过,并还在'黑暗'时间内)
    def canRunOnHost(index: Int): Boolean =
      !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
 
    if (!speculatableTasks.isEmpty) {
      // 获取能在该executor上启动的taskIndex
      for (index <- speculatableTasks if canRunOnHost(index)) {
        // 获取task的优先位置
        val prefs = tasks(index).preferredLocations 
        val executors = prefs.flatMap(_ match {
          case e: ExecutorCacheTaskLocation => Some(e.executorId)
          case _ => None
        });
        // 优先位置若为ExecutorCacheTaskLocation并且数据所在executor包含当前executor,
        // 则返回其task在taskSet的index和Locality Levels
        if (executors.contains(execId)) {
          speculatableTasks -= index
          return Some((index, TaskLocality.PROCESS_LOCAL))
        }
      }

      // 这里的判断是延迟调度的作用,即使是推测式任务也尽量以最好的本地性级别来启动
      if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
        for (index <- speculatableTasks if canRunOnHost(index)) {
          val locations = tasks(index).preferredLocations.map(_.host)
          if (locations.contains(host)) {
            speculatableTasks -= index
            return Some((index, TaskLocality.NODE_LOCAL))
          }
        }
      }

       ........
    }
    None
  }

代码太长只列了前面一部分,不过都是类似的逻辑,代码中注释也很清晰。先过滤掉已经成功执行的task,另外,推测执行task不在和正在执行的task同一Host执行,不在黑名单executor里执行,然后在延迟调度策略下根据task的优先位置来决定是否在该executor上以某种本地性级别被调度执行。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏精讲JAVA

详述 PO VO BO DTO DAO 和 POJO 的概念及区别

 说实话,我相信对于刚接触 PO、VO、BO、DTO、DAO 和 POJO 这些概念的同学来说,大都会有一种“这都是什么鬼?”的感觉,可谓是云里雾里,不知今...

1445
来自专栏Android 技术栈

java 常用十种设计模式示例归纳 | 已打包请带走

一个Demo,集合常用的十种设计模式,每个模式使用易被人们接受的案例讲述,按模式分包,使用设计模式前后对比,界面显示定义讲解,让你更深刻的了解每种设计模式。 ...

1.4K2
来自专栏JadePeng的技术博客

Flutter与Dart 入门

Flutter是google推出的,一个使用Dart语言开发的跨平台移动UI框架,通过自建绘制引擎,能高性能、高保真地进行Android和IOS开发。

1082
来自专栏精讲JAVA

详述 PO VO BO DTO DAO 和 POJO 的概念及区别

 说实话,我相信对于刚接触 PO、VO、BO、DTO、DAO 和 POJO 这些概念的同学来说,大都会有一种“这都是什么鬼?”的感觉,可谓是云里雾里,不知今...

1252
来自专栏Java架构沉思录

Kafka中的时间轮算法

简单说说时间轮吧,它是一个高效的延时队列,或者说定时器。实际上现在网上对于时间轮算法的解释很多,定义也很全,这里引用一下朱小厮博客里出现的定义:

1062
来自专栏数值分析与有限元编程

Python IDLE关联.py文件

为进一步提升Python IDLE可操作性,本文介绍如何在windows操作系统下默认使用python自带的IDLE编辑器关联后缀名为.py的文件。

4761
来自专栏大数据和云计算技术

伸手党福利-从零开始玩转图库

tinkerpop是一个图库标准,一个框架,学习图库,先从这个项目入手比较合适, neo4j, janusGraph只是它两个组件(图storage-engin...

7243
来自专栏平凡文摘

详述 PO VO BO DTO DAO 和 POJO 的概念及区别

1073
来自专栏互扯程序

毕业季,跳槽季,不刷点面试题怎么能行?

现在是资源共享的时代,同样也是知识分享的时代,如果你觉得本文能学到知识,请把知识与别人分享。 前言 马上就是一年一度的毕业季 跳槽季,找工作三大要素,简...

3485
来自专栏calvin

centos7 lldb 调试netcore应用的内存泄漏和死循环示例(dump文件调试)

lldb工具的安装,linux下netcore如何生成dump文件,查看下文 centos7使用lldb调试netcore应用转储dump文件

2093

扫码关注云+社区