前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划16 | 通过ExecutorAllocationManager实现动态Executor分配

Spark Core源码精读计划16 | 通过ExecutorAllocationManager实现动态Executor分配

作者头像
大数据真好玩
发布2019-08-19 11:27:20
8760
发布2019-08-19 11:27:20
举报
文章被收录于专栏:暴走大数据暴走大数据

目录

  • 前言
  • 初始化
  • ExecutorAllocationManager类的成员属性
    • Executor相关配置
    • 时长配置
    • 计数器、缓存与其他
  • ExecutorAllocationManager具体实现
    • 启动
    • 调度动态调整逻辑
    • 重新计算Executor数量
    • 减少Executor
    • 增加Executor
  • 总结

前言

按照SparkContext初始化的顺序,接下来就轮到调度系统的三大金刚——SchedulerBackend、TaskScheduler、DAGScheduler——出场了。与它们相关的细节非常多,绝不是一两篇文章能够讲清楚的,所以我们之后讲到Spark作业执行时,再自然地回过头详细看它们。本篇来讲解SparkContext初始化的倒数第二个组件:Executor分配管理器,即ExecutorAllocationManager。前面已经讲过,ExecutorAllocationManager可以通过与集群管理器联系,根据当前的负载动态增加或删除Executor,是一个比较智能的机制。

初始化

SparkContext中初始化ExecutorAllocationManager的细节在代码#2.13中有,因此这里只讲解其初始化的具体流程:

  • 判断是否要启用Executor动态分配。如果配置项spark.dynamicAllocation.enabled为true,并且满足以下两条件之一:配置项spark.dynamicAllocation.testing为true,或者当前不是本地模式,就启用Executor动态分配。(为节省篇幅,后面会用s.d指代spark.dynamicAllocation前缀)
  • 判断SchedulerBackend的实现类是否继承了ExecutorAllocationClient特征,目前只有CoarseGrainedSchedulerBackend是如此。如果是,就用SchedulerBackend、ListenerBus、SparkConf和BlockManagerMaster的实例构造出ExecutorAllocationManager。
  • 调用ExecutorAllocationManager.start()方法启动之。

ExecutorAllocationManager类的成员属性

ExecutorAllocationManager类的成员属性甚多,以下清单基本示出了全部的。

代码#16.1 - o.a.s.ExecutorAllocationManager类的成员属性

代码语言:javascript
复制
  private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
  private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
  private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)

  private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
    "spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
  private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
    "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")
  private val executorIdleTimeoutS = conf.getTimeAsSeconds(
    "spark.dynamicAllocation.executorIdleTimeout", "60s")
  private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
    "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Integer.MAX_VALUE}s")

  private val tasksPerExecutor =
    conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

  private var numExecutorsToAdd = 1
  private var numExecutorsTarget = initialNumExecutors

  private val executorsPendingToRemove = new mutable.HashSet[String]
  private val executorIds = new mutable.HashSet[String]

  private var addTime: Long = NOT_SET
  private val removeTimes = new mutable.HashMap[String, Long]

  val listener = new ExecutorAllocationListener
  private val executor =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
  private var localityAwareTasks = 0
  private var hostToLocalTaskCount: Map[String, Int] = Map.empty

下面我们分类来描述这些成员属性的含义。

Executor相关配置
  • minNumExecutors/maxNumExecutors:分别对应配置项s.d.minExecutors/maxExecutors,代表动态分配过程中最小和最大的Executor数量,默认值为0和无穷大。
  • initialNumExecutors:Executor的初始数量,用Utils.getDynamicAllocationInitialExecutors()方法来确定,其值是s.d.minExecutors、s.d.initialExecutors、spark.executor.instances三个参数的较大值。
  • tasksPerExecutor:每个Executor执行的Task数的近似值,由spark.executor.cores与spark.task.cpus两个参数共同决定。
时长配置
  • schedulerBacklogTimeoutS:由配置项s.d.schedulerBacklogTimeout指定,表示当有Task等待超过该时长时,就开始动态分配资源,默认1s。
  • sustainedSchedulerBacklogTimeoutS:由配置项s.d.sustainedSchedulerBacklogTimeout指定,表示动态分配资源仍未达标时,每次再分配的时间间隔,默认与schedulerBacklogTimeoutS相同。
  • executorIdleTimeoutS:由配置项s.d.executorIdleTimeout指定,表示Executor处于空闲状态(没有执行Task)的超时,超时后会移除Executor,默认值为60s。
  • cachedExecutorIdleTimeoutS:由配置项s.d.cachedExecutorIdleTimeout指定,表示持有缓存块的Executor的空闲超时。由于缓存不能随意被清理,因此其默认值为无穷大。
计数器、缓存与其他
  • numExecutorsToAdd:下次动态分配要添加的Executor数量。
  • numExecutorsTarget:在当前时刻的Executor目标数量。这个计数主要是为了在Executor突然大量丢失的异常情况下,能够快速申请到需要的数目。
  • executorsPendingToRemove:即将被移除但还没被杀掉的Executor ID缓存。
  • executorIds:所有目前已知的Executor ID缓存。
  • addTime:本次触发Executor添加的时间戳。
  • removeTimes:Executor将要被删除时的ID与时间戳的映射。
  • listener:ExecutorAllocationListener类型的监听器,用于监听与Executor相关的事件,包括Stage和Task提交与完成,Executor添加与删除等等。
  • executor:单线程的调度线程池,用来执行周期性检查并动态分配Executor的任务。
  • localityAwareTasks:所有当前活跃的Stage中,具有本地性偏好(就是数据尽量位于本地节点)的Task数量,
  • hostToLocalTaskCount:每个物理节点上运行的Task数目的近似值。

ExecutorAllocationManager具体实现

启动

在初始化过程中,已经调用了ExecutorAllocationManager.start()方法,下面来看该方法的代码。

代码#16.2 - o.a.s.ExecutorAllocationManager.start()方法

代码语言:javascript
复制
  def start(): Unit = {
    listenerBus.addToManagementQueue(listener)
    val scheduleTask = new Runnable() {
      override def run(): Unit = {
        try {
          schedule()
        } catch {
          case ct: ControlThrowable =>
            throw ct
          case t: Throwable =>
            logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
        }
      }
    }
    executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
    client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
  }

可见,ExecutorAllocationManager启动时,会先将ExecutorAllocationListener注册到LiveListenerBus中。然后会创建执行schedule()方法的任务,并用调度线程池executor以默认100ms的间隔定期执行。最后,调用ExecutorAllocationClient(其实就是CoarseGrainedSchedulerBackend)的requestTotalExecutors()方法,请求分配Executor,该方法在今后讲解SchedulerBackend时会提到。

下面先来看看schedule()方法。

调度动态调整逻辑

代码#16.3 - o.a.s.ExecutorAllocationManager.schedule()方法

代码语言:javascript
复制
  private def schedule(): Unit = synchronized {
    val now = clock.getTimeMillis
    updateAndSyncNumExecutorsTarget(now)

    val executorIdsToBeRemoved = ArrayBuffer[String]()
    removeTimes.retain { case (executorId, expireTime) =>
      val expired = now >= expireTime
      if (expired) {
        initializing = false
        executorIdsToBeRemoved += executorId
      }
      !expired
    }
    if (executorIdsToBeRemoved.nonEmpty) {
      removeExecutors(executorIdsToBeRemoved)
    }
  }

可见,schedule()方法做了两件事:调用updateAndSyncNumExecutorsTarget()方法重新计算并同步当前所需的Executor数量,调用removeExecutors()方法删掉那些已经判定为过期的Executor。

重新计算Executor数量

以下就是updateAndSyncNumExecutorsTarget()方法以及其调用的maxNumExecutorsNeeded()方法的源码。

代码#16.4 - o.a.s.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget()方法

代码语言:javascript
复制
  private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
    val maxNeeded = maxNumExecutorsNeeded

    if (initializing) {
      0
    } else if (maxNeeded < numExecutorsTarget) {
      val oldNumExecutorsTarget = numExecutorsTarget
      numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
      numExecutorsToAdd = 1

      if (numExecutorsTarget < oldNumExecutorsTarget) {
        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
        logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
          s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
      }
      numExecutorsTarget - oldNumExecutorsTarget
    } else if (addTime != NOT_SET && now >= addTime) {
      val delta = addExecutors(maxNeeded)
      logDebug(s"Starting timer to add more executors (to " +
        s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
      addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
      delta
    } else {
      0
    }
  }

  private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
  }

该方法的流程如下:

  • 调用maxNumExecutorsNeeded()计算出当前所需的最大Executor数量maxNeeded。其计算方法是:从监听器取得等待中的Task计数与运行中的Task计数,将两者相加并减1,最后除以每个Executor上运行Task数的估计值。
  • 如果ExecutorAllocationManager仍然在初始化,就直接返回0。注意该方法的返回值是Executor数量的变化量,而不是总数。
  • 检查maxNeeded与上述numExecutorsTarget值的大小关系。如果目标Executor数量超过了最大需求数,就将numExecutorsTarget设置为maxNeeded与minNumExecutors的较大值,然后调用ExecutorAllocationClient.requestTotalExecutors()方法。此时会通知集群管理器取消未执行的Executor,并且不再添加新的Executor,返回减少的Executor数量。
  • 如果目标Executor数量小于最大需求数,并且当前的时间戳比上一次添加Executor的时间戳要新,就调用addExecutors()方法。此时会通知集群管理器新添加Executor,更新addTime记录的时间戳,返回增加的Executor数量。

下面我们就分别来看看减少与增加Executor的逻辑。先来看减少Executor的方法。

减少Executor

下面是Executor空闲时的回调方法,该方法由监听器调用。受限于篇幅,本文不展开讲ExecutorAllocationListener的细节,并且不是很难,看官可以自行参考。

代码#16.5 - o.a.s.ExecutorAllocationManager.onExecutorIdle()方法

代码语言:javascript
复制
  private def onExecutorIdle(executorId: String): Unit = synchronized {
    if (executorIds.contains(executorId)) {
      if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
        val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
        val now = clock.getTimeMillis()
        val timeout = {
          if (hasCachedBlocks) {
            now + cachedExecutorIdleTimeoutS * 1000
          } else {
            now + executorIdleTimeoutS * 1000
          }
        }
        val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow
        removeTimes(executorId) = realTimeout
        logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
          s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)")
      }
    } else {
      logWarning(s"Attempted to mark unknown executor $executorId idle")
    }
  }

该方法首先确定removeTimes和executorsPendingToRemove缓存中都不存在当前的Executor ID,然后判断该Executor是否缓存了块。如果有缓存块,就将其超时时间设为无限大,否则就按正常的空闲超时来处理。最后将这个Executor的ID与其计划被删除的时间戳存入removeTimes映射。接下来就是代码#16.3中调用的removeExecutors()方法。

代码#16.6 - o.a.s.ExecutorAllocationManager.removeExecutors()方法

代码语言:javascript
复制
  private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
    val executorIdsToBeRemoved = new ArrayBuffer[String]

    logInfo("Request to remove executorIds: " + executors.mkString(", "))
    val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size

    var newExecutorTotal = numExistingExecutors
    executors.foreach { executorIdToBeRemoved =>
      if (newExecutorTotal - 1 < minNumExecutors) {
        logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
          s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)")
      } else if (newExecutorTotal - 1 < numExecutorsTarget) {
        logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
          s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)")
      } else if (canBeKilled(executorIdToBeRemoved)) {
        executorIdsToBeRemoved += executorIdToBeRemoved
        newExecutorTotal -= 1
      }
    }
    if (executorIdsToBeRemoved.isEmpty) {
      return Seq.empty[String]
    }

    val executorsRemoved = if (testing) {
      executorIdsToBeRemoved
    } else {
      client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
        countFailures = false, force = false)
    }

    client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
    newExecutorTotal = numExistingExecutors
    if (testing || executorsRemoved.nonEmpty) {
      executorsRemoved.foreach { removedExecutorId =>
        newExecutorTotal -= 1
        logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
          s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
        executorsPendingToRemove.add(removedExecutorId)
      }
      executorsRemoved
    } else {
      logWarning(s"Unable to reach the cluster manager to kill executor/s " +
        s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!")
      Seq.empty[String]
    }
  }

该方法的执行流程如下:

  • 计算剩余的Executor数目。
  • 遍历要删除的Executor ID列表,判断删除之后剩余的Executor数是否小于最小允许的Executor数量与目标Executor数量,如果是的话,该Executor就不能删除。反之,如果根据canBeKilled()方法判断出executorIds缓存中存在该Executor,并且尚未进入executorsPendingToRemove,就将其标记为可删除。
  • 调用ExecutorAllocationClient.killExecutor()方法,真正地杀掉Executor。再调用requestTotalExecutors()方法,重新申请新的Executor数目。
  • 如果要删除的Executor列表中有最终未被杀掉的,就将它们再次加入executorsPendingToRemove缓存中,等待删除。

最后,监听器会调用Executor减少后的回调方法onExecutorRemoved(),该方法主要是清理各个缓存,逻辑很简单,不再赘述。

增加Executor

最后来看代码#16.4中调用的addExecutors()方法。

代码#16.7 - o.a.s.ExecutorAllocationManager.addExecutors()方法

代码语言:javascript
复制
  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
    if (numExecutorsTarget >= maxNumExecutors) {
      logDebug(s"Not adding executors because our current target total " +
        s"is already $numExecutorsTarget (limit $maxNumExecutors)")
      numExecutorsToAdd = 1
      return 0
    }
    val oldNumExecutorsTarget = numExecutorsTarget
    numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
    numExecutorsTarget += numExecutorsToAdd
    numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
    numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)

    val delta = numExecutorsTarget - oldNumExecutorsTarget
    if (delta == 0) {
      if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
        numExecutorsTarget =
          math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors)
      } else {
        numExecutorsToAdd = 1
        return 0
      }
    }

    val addRequestAcknowledged = try {
      testing ||
        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
    } catch {
      case NonFatal(e) =>
        logInfo("Error reaching cluster manager.", e)
        false
    }
    if (addRequestAcknowledged) {
      val executorsString = "executor" + { if (delta > 1) "s" else "" }
      logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
        s" (new desired total will be $numExecutorsTarget)")
      numExecutorsToAdd = if (delta == numExecutorsToAdd) {
        numExecutorsToAdd * 2
      } else {
        1
      }
      delta
    } else {
      logWarning(
        s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
      numExecutorsTarget = oldNumExecutorsTarget
      0
    }
  }

Executor目标值numExecutorsTarget的计算逻辑用语言描述很麻烦,因此不再多讲,看官读一遍代码就能明白个大概。唯一特别需要注意的是numExecutorsToAdd * 2这句话,它说明增加Executor时,每次申请的新Executor数目是指数级别增长的。为什么要采用这种策略?根据经验,多数App在启动时只需要少量的Executor就可以满足计算需求,但一旦资源紧张时,用指数增长可以使申请到满足需求的资源的次数降低。

— THE END —

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 初始化
  • ExecutorAllocationManager类的成员属性
    • Executor相关配置
      • 时长配置
        • 计数器、缓存与其他
        • ExecutorAllocationManager具体实现
          • 启动
            • 调度动态调整逻辑
              • 重新计算Executor数量
                • 减少Executor
                  • 增加Executor
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档