Yarn 自带了两个支持多用户、多队列的调度器,分别是 Capacity Scheduler(容量调度器) 和 Fair Scheduler(公平调度器),前文YARN Capacity Scheduler(容量调度器)对 Capacity Scheduler 进行了介绍,本文通过将通过比较 Fair Scheduler 与 Capacity Scheduler 进行比较的方式来介绍 Fair Scheduler 并说明两者的异同点。
上面这张表展示了Capacity Scheduler 和 Fair Scheduler 在各个特性上的差异,下面我们主要对两者的资源分配策略进行进一步说明。
资源分配策略也就是当出现空闲资源时,应该在哪个队列给哪个 Application 分配资源。Capacity Scheduler 采用三级资源分配策略,它会一次选择队列、应用程序和 Container 使用该资源。如下图:
Step1:选择队列
从根队列开始,使用深度优先遍历算法,从根队列开始,依次遍历子队列找出资源占用率最小的子队列。若子队列为叶子队列,则选择该队列;若子队列为非叶子队列,则以该子队列为根队列重复前面的过程直到找到一个资源使用率最小的叶子队列为止
Step2:选择应用
在Step1中选好了叶子队列后,取该队列中最早提交的应用程序(实际排序时用的 Application ID,提交时间越早的应用程序,Application ID 越小)
Step2:选择 Container
在 Step2中选好应用程序之后,选择该应用程序中优先级最高的 Container。对于优先级相同的 Containers,优选选择满足本地性的 Container,会依次选择 node local、rack local、no local
在 Capacity Scheduler 中,在比较资源占用率时,不同的资源比较器对资源定义是不一样的。默认的是 DefaultResourceCalculator,它只考虑内存资源。另外一种是 DominantResourceCalculator,采用了 DRF 比较算法,同时考虑内存和 cpu 两种资源。通过参数 yarn.scheduler.capacity.resource-calculator
来设置。
Fair Scheduler 与 Capacity Scheduler 一样也是依次选择队列、应用,最后选择 Container,其中选择队列和应用策略相同,采用了 FairShareComparator 比较器对队列或者应用程序进行排序,然后从头从头开始选择。
在介绍具体实现前还需介绍以下两个定义:
比较的具体实现如下:
@Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
阅读以上代码,我们可以知道对于两个可调度项(队列和应用程序均是可调度项),假设为 s1,s2,公平调度算法流程如下:
知道了比较算法,也就知道如何选出队列和应用程序了。选出应用程序之后,选择 Container 的逻辑与Capacity Scheduler 一致。