前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[Spark源码剖析]Pool-Standalone模式下的队列Pool-Spark Standalone模式下的队列

[Spark源码剖析]Pool-Standalone模式下的队列Pool-Spark Standalone模式下的队列

作者头像
codingforfun
发布2018-08-24 14:54:23
3780
发布2018-08-24 14:54:23
举报

Pool-Spark Standalone模式下的队列

org.apache.spark.scheduler.Pool是 Spark Standalone 模式下的队列。从其重要成员及成员函数来剖析这个在 TaskScheduler 调度中起关键作用的类。

成员

下图展示了 Pool 的所有成员及一些简要说明

其中,taskSetSchedulingAlgorithm的类型由schedulingMode决定,下文会对FairSchedulingAlgorithmFIFOSchedulingAlgorithm做详细分析

代码语言:javascript
复制
  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }

成员函数

先来看看如何向一个 Pool 中添加 TaskSetManager 或 Pool,说明都写在注释中。

代码语言:javascript
复制
  override def addSchedulable(schedulable: Schedulable) {
    //<f 判断 schedulable 不为 null
    require(schedulable != null)
    //< 往队列中添加schedulable 对象,可以是taskSet,也可以是子队列
    schedulableQueue.add(schedulable)
    schedulableNameToSchedulable.put(schedulable.name, schedulable)
    //< 将该 schedulable 对象的父亲设置为自己
    schedulable.parent = this
  }

以下为如何 remove 一个 TaskSetManager 或 Pool,需要注意的是schedulableQueue为ConcurrentLinkedQueue类型,其 remove 方法可以删除与参数值相等的元素

代码语言:javascript
复制
  override def removeSchedulable(schedulable: Schedulable) {
    schedulableQueue.remove(schedulable)
    schedulableNameToSchedulable.remove(schedulable.name)
  }

当有 executor 丢失时,会调用 executorLost 方法

代码语言:javascript
复制
  override def executorLost(executorId: String, host: String) {
    schedulableQueue.foreach(_.executorLost(executorId, host))
  }

若该队列中某个元素为 TaskSetManager 类型,会调用 TaskSetManager.executorLost 方法,该方法将查找是否有自己管理的 task 在 lost 的 executor 上运行,若有,则重新将该 lost 的 task 插入队列,等待执行;若某元素为 Pool 类型,即子队列,那么 Pool.executorLost 方法会对其schedulableQueue的所有元素调用 executorLost 方法,这样一来,若根 Pool 调用 executorLost 方法,则该队列下的所有 TaskSetManager 对象都能调用 executorLost 方法,那么因某个 executor lost 而 lost 的 task 都将被重新插入队列执行

getSortedTaskSetQueue方法是 Pool 最重要的方法,它将以该 Pool 为根队列的所有 TaskSetManager 排序后存在一个数组中,下标越小的数组越早被执行。代码如下:

代码语言:javascript
复制
  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

这个函数的实现逻辑主要分为两步,假设现在调用 tmpPool.getSortedTaskSetQueue,tmpPool 为 Pool 类型:

  1. 对 tmpPool 的直接子 Pool 和 TaskSetManager 进行排序,排序的算法根据Pool 的 schedulingMode 而定,FAIR 和 FIFO 不相同。排序后得到sortedSchedulableQueue
  2. 遍历sortedSchedulableQueue所有元素。若元素为 TaskSetManager 类型,则将该元素添加到sortedTaskSetQueue: ArrayBuffer[TaskSetManager]尾部,若为 Pool 类型,则执行第一步
  3. 返回包含对 tmpPool 下所有 TaskSetManager 排序过后的数组

经过这几部,就能将一个 Pool 下的所有 TaskSetManager 排序,也就能确定哪个 TaskSetManager 的 tasks 要优先被 TaskScheduler 调度。

如上所述,排序的关键是taskSetSchedulingAlgorithm.comparator,上文中已经提到taskSetSchedulingAlgorithm根据schedulingMode值的不同,可以有FairSchedulingAlgorithmFIFOSchedulingAlgorithm两种类型。先来看 FIFOSchedulingAlgorithm的排序

代码语言:javascript
复制
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

FIFOSchedulingAlgorithm比较逻辑很简单,可概括为下面两句话:

  1. 首先比较优先级值,优先级值越小的更优先(好拗口)
  2. 若优先级值相等,则比较 stageId 值,stageId 值越小的越优先

FairSchedulingAlgorithm的比较逻辑会复杂一些,代码如下:

代码语言:javascript
复制
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0

    if (s1Needy && !s2Needy) {
      //< s1中正在执行的 tasks 个数小于 s1的最小 cpu 核数;且s2中正在执行的 tasks 个数等于 s2的最小 cpu 核数。则 s1优先
      return true
    } else if (!s1Needy && s2Needy) {
      //< s2中正在执行的 tasks 个数小于 s2的最小 cpu 核数;且s1中正在执行的 tasks 个数等于 s1的最小 cpu 核数。则 s2优先
      return false
    } else if (s1Needy && s2Needy) {
      //< s1,s2中正在执行的 tasks 个数小于其最小 cpu 核数。则比较各自 runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble 的比值,小的优先
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      //< s1,s2中正在执行的 tasks 个数等于其最小 cpu 核数。则比较runningTasks1.toDouble / s1.weight.toDouble,小的优先
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      //< 若以上比较都相等,则比较 s1和 s2的名字
      s1.name < s2.name
    }
  }
}

FairSchedulingAlgorithm的比较规则以在上面代码的注释中说明

PS

Pool 的成员stageId 初始值为-1,但搜遍整个 Spark 源码也没有找到哪里有对该值的重新赋值。这个 stageId 的具体含义及如何发挥作用还没有完全搞明白,若哪位朋友知道,麻烦告知,多谢


本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016.01.02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Pool-Spark Standalone模式下的队列
    • 成员
      • 成员函数
        • PS
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档