前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >k8s源码-揭开scheduler的算法面纱(下)

k8s源码-揭开scheduler的算法面纱(下)

原创
作者头像
ascehuang
修改2019-12-25 10:02:48
2.1K0
修改2019-12-25 10:02:48
举报
文章被收录于专栏:k8s源码解析k8s源码解析

上文主要对每个预选算法进行了源码解读,本文则对优选的策略进行详细的解密。这篇文章拖了有点久,最近事情比较多。

1. 调度策略概览

预选

CheckNodeUnschedulablePred (Node是否可调度) GeneralPred (检测资源是否充足,pod的host,port,selector是否匹配) HostNamePred (pod指定的node名称是否和node名称相同) PodFitsHostPortsPred (请求的pod的port,在该node上是否已经被占用) MatchNodeSelectorPred (NodeSelect匹配及亲和度匹配, label的匹配) PodFitsResourcesPred (资源检测) NoDiskConflictPred (检测挂载的卷和已经存在的卷是否有冲突) PodToleratesNodeTaintsPred (检测pod的容忍度能否容忍这个node上的污点 ) PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred (检测NodeLabel是否存在) CheckServiceAffinityPred (-) MaxEBSVolumeCountPred (过时) MaxGCEPDVolumeCountPred (过时) MaxCSIVolumeCountPred (检测Node的Volume数量是否超过最大值) MaxAzureDiskVolumeCountPred (过时) MaxCinderVolumeCountPred (过时) CheckVolumeBindingPred (检查该node的PV是否满足PVC) NoVolumeZoneConflictPred (Volume的Zone是否冲突) EvenPodsSpreadPred (node是否满足拓扑传播限制) MatchInterPodAffinityPred (检查是否打破pod Affinity与anti Affinity)

优选

EqualPriority 平等权限,配置时直接跳过 MostRequestedPriority 请求的资源占可分配的比例越大,得分越高 RequestedToCapacityRatioPriority 分段按请求资源比例计算得分 SelectorSpreadPriority 同一个svc、RC、RS、StatefulSet的pod尽量调度到不同的node上,也支持zone ServiceSpreadingPriority 同上,只支持svc的匹配 InterPodAffinityPriority 指定哪些pod调度到不同的拓扑域中 LeastRequestedPriority 按请求最低使用率计算得分,与MostRequestedPriority几乎相反 BalancedResourceAllocation cpu, memory, volume资源均衡申请,必须与LeastRequestedPriority一起使用 NodePreferAvoidPodsPriority 根据node的annotation: scheduler.alpha.kubernetes.io/preferAvoidPods进行调度 NodeAffinityPriority 根据node亲和度计算得分,如果亲和,则加上对应weight TaintTolerationPriority 根据node的taint类型,pod的容忍度的effect:PreferNoSchedule计算得分 ImageLocalityPriority 当前镜像是否在node上,得分根据镜像大小及传播度决定 ResourceLimitsPriority node上的资源是否满足pod的limits EvenPodsSpreadPriority 满足拓扑传递限制的pod的个数计算得分

2. 优选算法

先回顾下优选算法执行的流程,针对每个pod,先使用16个协程并行进行Map操作,Map操作每次要遍历配置的所有的的优选算法,建立node和算法的映射关系,然后并行进行Reduce操作(有的算法是没有reduce的),这里Map使用的ParallelizeUntil的方法,为什么Reduce没有使用ParallelizeUntil的方法呢?值得思考的问题,ParallelizeUntil的代码可以好好看看。另外这里代码个人觉得不是很好,golang的设计模式是推荐不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存,这里还是用了共享内存的方式。Map-Reduce之后,将得分进行加权求和,然后在检测extenders,最终加权得到总分。优选算法并没有指定odering数组,但调用也是按照配置的顺序执行。

代码语言:txt
复制
func (g *genericScheduler) prioritizeNodes(
	ctx context.Context,
	state *framework.CycleState,
	pod *v1.Pod,
	meta interface{},
	nodes []*v1.Node,
) (framework.NodeScoreList, error) {
	// If no priority configs are provided, then all nodes will have a score of one.
	// This is required to generate the priority list in the required format
	if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
  ... ... 
	workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
		nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
		for i := range g.prioritizers {
			var err error
			results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
			if err != nil {
				appendError(err)
				results[i][index].Name = nodes[index].Name
			}
		}
	})

	for i := range g.prioritizers {
		if g.prioritizers[i].Reduce == nil {
			continue
		}
		wg.Add(1)
		go func(index int) {
			metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()
			defer func() {
				metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
				wg.Done()
			}()
			if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
				appendError(err)
			}
			... ...
		}(i)
	}
	... ...
	for i := range nodes {
		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
		for j := range g.prioritizers {
			result[i].Score += results[j][i].Score * g.prioritizers[j].Weight
		}

		for j := range scoresMap {
			result[i].Score += scoresMap[j][i].Score
		}
	}

	if len(g.extenders) != 0 && nodes != nil {
		... ...
			go func(extIndex int) {
				... ...
				prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
			... ...
				for i := range *prioritizedList {
					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
					... ...
					combinedScores[host] += score * weight
				}
				mu.Unlock()
			}(i)
		}
... ...
	return result, nil
}

2.1 EqualPriority

EqualPriority没有做什么事情,在初始化创建优选算法列表的时候,直接被跳过了。

代码语言:txt
复制
for _, priority := range policy.Priorities {
			if priority.Name == priorities.EqualPriority {
				klog.V(2).Infof("Skip registering priority: %s", priority.Name)
				continue
			}
			klog.V(2).Infof("Registering priority: %s", priority.Name)
			priorityKeys.Insert(RegisterCustomPriorityFunction(priority, c.configProducerArgs))
		}

2.2 MostRequestedPriority

注册函数如下:

代码语言:txt
复制
	scheduler.RegisterPriorityMapReduceFunction(priorities.MostRequestedPriority, priorities.MostRequestedPriorityMap, nil, 1)

计算方法:

代码语言:txt
复制
func (r *ResourceAllocationPriority) PriorityMap(
	pod *v1.Pod,
	meta interface{},
	nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}
	if r.resourceToWeightMap == nil {
		return framework.NodeScore{}, fmt.Errorf("resources not found")
	}
	requested := make(ResourceToValueMap, len(r.resourceToWeightMap))
	allocatable := make(ResourceToValueMap, len(r.resourceToWeightMap))
	for resource := range r.resourceToWeightMap {
		allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)
	}
	var score int64

	// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
	if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
		score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
	} else {
		score = r.scorer(requested, allocatable, false, 0, 0)
	}

	... ...

	return framework.NodeScore{
		Name:  node.Name,
		Score: score,
	}, nil
}

记住上面这个方法,后面好几个算法调用了这个方法,大致流程:

  1. 根据资源(cpu, mem等)类型获取node资源requested的和allocable,mostRequest只关注cpu和mem,且权重为1:1
  2. 调用各个算法的score方法计算得到score

score方法最终调用如下方法:

代码语言:txt
复制
// The used capacity is calculated on a scale of 0-10
func mostRequestedScore(requested, capacity int64) int64 {
	if capacity == 0 {
		return 0
	}
	if requested > capacity {
		return 0
	}

	return (requested * framework.MaxNodeScore) / capacity
}

该方法很简单了,framework.MaxNodeScore是个常量,它的值是100,这和注释里面说的0-10有点出入,应该是后面做了扩展,计算requested在可分配的capaciy中的比重是多大,比重越大,则得分越高,如果requested==capacity,则这里得分应该是100,似乎注释写的有误了。该策略和LeastRequestedPriority几乎是相反的。注释也举了个简单的例子:

代码语言:txt
复制
(cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2

2.3 RequestedToCapacityRatioPriority

注册函数如下:

代码语言:txt
复制
cheduler.RegisterPriorityMapReduceFunction(
		priorities.RequestedToCapacityRatioPriority,
		priorities.RequestedToCapacityRatioResourceAllocationPriorityDefault().PriorityMap,
		nil,
		1)

调用方法与MostRequestedPriority类似,主要看下RequestedToCapacityRatioPriority的score方法buildRequestedToCapacityRatioScorerFunction:

代码语言:txt
复制
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape FunctionShape, resourceToWeightMap ResourceToWeightMap) func(ResourceToValueMap, ResourceToValueMap, bool, int, int) int64 {
	rawScoringFunction := buildBrokenLinearFunction(scoringFunctionShape)
	err := validateResourceWeightMap(resourceToWeightMap)
	if err != nil {
		klog.Error(err)
	}
	resourceScoringFunction := func(requested, capacity int64) int64 {
		if capacity == 0 || requested > capacity {
			return rawScoringFunction(maxUtilization)
		}

		return rawScoringFunction(maxUtilization - (capacity-requested)*maxUtilization/capacity)
	}
	return func(requested, allocable ResourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
		var nodeScore, weightSum int64
		for resource, weight := range resourceToWeightMap {
			resourceScore := resourceScoringFunction(requested[resource], allocable[resource])
			if resourceScore > 0 {
				nodeScore += resourceScore * weight
				weightSum += weight
			}
		}
		if weightSum == 0 {
			return 0
		}
		return int64(math.Round(float64(nodeScore) / float64(weightSum)))
	}
}

func buildBrokenLinearFunction(shape FunctionShape) func(int64) int64 {
	n := len(shape)
	return func(p int64) int64 {
		for i := 0; i < n; i++ {
			if p <= shape[i].Utilization {
				if i == 0 {
					return shape[0].Score
				}
				return shape[i-1].Score + (shape[i].Score-shape[i-1].Score)*(p-shape[i-1].Utilization)/(shape[i].Utilization-shape[i-1].Utilization)
			}
		}
		return shape[n-1].Score
	}
}

这里计算score的方法是buildBrokenLinearFunction,采用分段的方式计算score,按照资源使用率来分段,默认定义的有两段:

Utilization

Score

0

100

100

0

这里计算Utilization的公式为:

代码语言:txt
复制
maxUtilization - (capacity-requested)*maxUtilization/capacity

maxUtilization为100,对该公式化简下:

代码语言:txt
复制
maxUtilization * requested / capacity

也就是说申请资源越大,使用率越高,得分越低。那这里和LeastRequestedPriority有什么区别呢?其实这里两个段之间是线性计算的,按照默认的两段的话,就等于 1 - Utilization, 但这里做的好处是用户可以定义对应的段,比如定义使用率为50的时候,分数为80, 那么当使用率小于50的时候,分数就线性分布在80-100之间,这样使用率小于50的分数就都比较高,而使用率大约50的分数就分布在0-80之间,些许的变动就造成分数差异很大。

2.4 SelectorSpreadPriority

尽量将同一个svc、replication controller等的pod调度到不同的node上,包括Map和Reduce两个方法:

代码语言:txt
复制
func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	var selector labels.Selector
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}

	priorityMeta, ok := meta.(*priorityMetadata)
	if ok {
		selector = priorityMeta.podSelector
	} else {
		selector = getSelector(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
	}

	count := countMatchingPods(pod.Namespace, selector, nodeInfo)
	return framework.NodeScore{
		Name:  node.Name,
		Score: int64(count),
	}, nil
}

map方法是针对每个node,计算当前node上match该pod的个数即为得分,match是通过svc,rs等的selector的组合来进行筛选,这里看出match的count越大,则得分越高,其实这个分数是反的,将在Reduce进行计算。

代码语言:txt
复制
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
	countsByZone := make(map[string]int64, 10)
	maxCountByZone := int64(0)
	maxCountByNodeName := int64(0)

	for i := range result {
		if result[i].Score > maxCountByNodeName {
			maxCountByNodeName = result[i].Score
		}
		nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
		if err != nil {
			return err
		}
		zoneID := utilnode.GetZoneKey(nodeInfo.Node())
		if zoneID == "" {
			continue
		}
		countsByZone[zoneID] += result[i].Score
	}

	for zoneID := range countsByZone {
		if countsByZone[zoneID] > maxCountByZone {
			maxCountByZone = countsByZone[zoneID]
		}
	}

	haveZones := len(countsByZone) != 0

	maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
	maxCountByZoneFloat64 := float64(maxCountByZone)
	MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)

	for i := range result {
		// initializing to the default/max node score of maxPriority
		fScore := MaxNodeScoreFloat64
		if maxCountByNodeName > 0 {
			fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
		}
		// If there is zone information present, incorporate it
		if haveZones {
			nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
			if err != nil {
				return err
			}

			zoneID := utilnode.GetZoneKey(nodeInfo.Node())
			if zoneID != "" {
				zoneScore := MaxNodeScoreFloat64
				if maxCountByZone > 0 {
					zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
				}
				fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
			}
		}
		result[i].Score = int64(fScore)
		if klog.V(10) {
			klog.Infof(
				"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Name, int64(fScore),
			)
		}
	}
	return nil
}

这里计算score的公式为:

代码语言:txt
复制
fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)

MaxNodeScoreFloat64为100, 做了减法,用最大count减去Map得到的count再除以最大count,这样count越大,则得分越低了。

另外还根据zone进行了计算,计算方法类似,zone设置了权重,上面计算的fScore占1/3, zone的权重占2/3。

2.5 ServiceSpreadingPriority

方法和SelectorSpreadPriority是一样的,不过该算法只检测svc。

2.6 InterPodAffinityPriority

指定哪些pod调度到不同的拓扑域中

注册方法:

代码语言:txt
复制
scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, priorities.CalculateInterPodAffinityPriorityMap, priorities.CalculateInterPodAffinityPriorityReduce, 1)

Map方法:

代码语言:txt
复制
func CalculateInterPodAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}

	var topologyScore topologyPairToScore
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		topologyScore = priorityMeta.topologyScore
	}

	var score int64
	for tpKey, tpValues := range topologyScore {
		if v, exist := node.Labels[tpKey]; exist {
			score += tpValues[v]
		}
	}

	return framework.NodeScore{Name: node.Name, Score: score}, nil
}

Map方法是检测当前node是否包含对应的拓扑域的label,如果有则加上该key对应的value。这里有个方法稍微复杂点:buildTopologyPairToScore, 根据亲和度、反亲和度计算拓扑域的得分。

Reduce方法:

代码语言:txt
复制
func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister,
	result framework.NodeScoreList) error {
	var topologyScore topologyPairToScore
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		topologyScore = priorityMeta.topologyScore
	}
	if len(topologyScore) == 0 {
		return nil
	}

	var maxCount, minCount int64
	for i := range result {
		score := result[i].Score
		if score > maxCount {
			maxCount = score
		}
		if score < minCount {
			minCount = score
		}
	}

	maxMinDiff := maxCount - minCount
	for i := range result {
		fScore := float64(0)
		if maxMinDiff > 0 {
			fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff))
		}

		result[i].Score = int64(fScore)
	}

	return nil
}

Reduce计算fScore的公式:

代码语言:txt
复制
fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff))

这里可以理解为对Map的score做了个标准化

2.5 LeastRequestedPriority

注册方法:

代码语言:txt
复制
scheduler.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1)

与MostRequestedPriority类似,直接看对应的score方法:

代码语言:txt
复制
func leastRequestedScore(requested, capacity int64) int64 {
	if capacity == 0 {
		return 0
	}
	if requested > capacity {
		return 0
	}

	return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
}

requested越大,得分越低。

2.6 BalancedResourceAllocation

cpu, memory, volume资源均衡使用。

注册方法:

代码语言:txt
复制
scheduler.RegisterPriorityMapReduceFunction(priorities.BalancedResourceAllocation, priorities.BalancedResourceAllocationMap, nil, 1)

与MostRequestedPriority,LeastRequestedPriority流程类似,主要看score方法balancedResourceScorer:

代码语言:txt
复制
func balancedResourceScorer(requested, allocable ResourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
	cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU])
	memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory])
	// This to find a node which has most balanced CPU, memory and volume usage.
	if cpuFraction >= 1 || memoryFraction >= 1 {
		// if requested >= capacity, the corresponding host should never be preferred.
		return 0
	}

	if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 {
		volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)
		if volumeFraction >= 1 {
			// if requested >= capacity, the corresponding host should never be preferred.
			return 0
		}
		// Compute variance for all the three fractions.
		mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
		variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
		// Since the variance is between positive fractions, it will be positive fraction. 1-variance lets the
		// score to be higher for node which has least variance and multiplying it with 10 provides the scaling
		// factor needed.
		return int64((1 - variance) * float64(framework.MaxNodeScore))
	}

	// Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1
	// respectively. Multiplying the absolute value of the difference by 10 scales the value to
	// 0-10 with 0 representing well balanced allocation and 10 poorly balanced. Subtracting it from
	// 10 leads to the score which also scales from 0 to 10 while 10 representing well balanced.
	diff := math.Abs(cpuFraction - memoryFraction)
	return int64((1 - diff) * float64(framework.MaxNodeScore))
}

BalancedResourceAllocation不能单独使用,必须和LeastRequestedPriority一起使用。这里的资源均衡使用,指的是单个node上的资源cpu,memory的申请比例要尽量保持一致,比如cpu申请了占可分配的50%,那memory也尽量申请的是占可分配的50%。

这里也对存储进行了考虑,如果没有存储,则公式为:

代码语言:txt
复制
diff := math.Abs(cpuFraction - memoryFraction)
return int64((1 - diff) * float64(framework.MaxNodeScore))

diff相差越小,则得分越高,反之得分越低。

考虑存储,则公式为:

代码语言:txt
复制
		mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
		variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3))
		return int64((1 - variance) * float64(framework.MaxNodeScore))

这里计算了3个申请占可分配比例的方差,也是方差越小,则得分越高。和上面diff的含义一直。

2.7 NodePreferAvoidPodsPriority

根据node的annotation: scheduler.alpha.kubernetes.io/preferAvoidPods进行调度

代码语言:txt
复制
func CalculateNodePreferAvoidPodsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}
	var controllerRef *metav1.OwnerReference
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		controllerRef = priorityMeta.controllerRef
	} else {
		// We couldn't parse metadata - fallback to the podspec.
		controllerRef = metav1.GetControllerOf(pod)
	}

	if controllerRef != nil {
		// Ignore pods that are owned by other controller than ReplicationController
		// or ReplicaSet.
		if controllerRef.Kind != "ReplicationController" && controllerRef.Kind != "ReplicaSet" {
			controllerRef = nil
		}
	}
	if controllerRef == nil {
		return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil
	}

	avoids, err := v1helper.GetAvoidPodsFromNodeAnnotations(node.Annotations)
	if err != nil {
		// If we cannot get annotation, assume it's schedulable there.
		return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil
	}
	for i := range avoids.PreferAvoidPods {
		avoid := &avoids.PreferAvoidPods[i]
		if avoid.PodSignature.PodController.Kind == controllerRef.Kind && avoid.PodSignature.PodController.UID == controllerRef.UID {
			return framework.NodeScore{Name: node.Name, Score: 0}, nil
		}
	}
	return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil
}

这里就是比较node的标注中是否和该pod匹配,匹配项包括PodSignature.PodController.Kind 和 PodSignature.PodController.UID,如果匹配,则得分为0,否则得分为100。

2.8 NodeAffinityPriority

根据node亲和度计算得分

代码语言:txt
复制
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)

Map方法:

代码语言:txt
复制
func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}

	// default is the podspec.
	affinity := pod.Spec.Affinity
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		// We were able to parse metadata, use affinity from there.
		affinity = priorityMeta.affinity
	}

	var count int32
	// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
	// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
	// empty PreferredSchedulingTerm matches all objects.
	if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
		// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
		for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
			preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
			if preferredSchedulingTerm.Weight == 0 {
				continue
			}

			// TODO: Avoid computing it for all nodes if this becomes a performance problem.
			nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
			if err != nil {
				return framework.NodeScore{}, err
			}
			if nodeSelector.Matches(labels.Set(node.Labels)) {
				count += preferredSchedulingTerm.Weight
			}
		}
	}

	return framework.NodeScore{
		Name:  node.Name,
		Score: int64(count),
	}, nil
}

首先一定要有nodeAffinity.preferredDuringSchedulingIgnoredDuringExecution标签,每命中一个preferredSchedulingTerm, 则得分加上该preferredSchedulingTerm的weight。

Reduce则比较简单:

代码语言:txt
复制
var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false)

func NormalizeReduce(maxPriority int64, reverse bool) PriorityReduceFunction {
	return func(
		_ *v1.Pod,
		_ interface{},
		_ schedulerlisters.SharedLister,
		result framework.NodeScoreList) error {

		var maxCount int64
		for i := range result {
			if result[i].Score > maxCount {
				maxCount = result[i].Score
			}
		}

		if maxCount == 0 {
			if reverse {
				for i := range result {
					result[i].Score = maxPriority
				}
			}
			return nil
		}

		for i := range result {
			score := result[i].Score

			score = maxPriority * score / maxCount
			if reverse {
				score = maxPriority - score
			}

			result[i].Score = score
		}
		return nil
	}
}

将得分标准化到0-100之间, 主要计算得分公式为:

代码语言:txt
复制
score = maxPriority * score / maxCount

2.9 TaintTolerationPriority

根据node的taint类型,pod的容忍度的effect:PreferNoSchedule计算得分

注册方法:

代码语言:txt
复制
scheduler.RegisterPriorityMapReduceFunction(priorities.TaintTolerationPriority, priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1)

Map方法:

代码语言:txt
复制
func ComputeTaintTolerationPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}
	// To hold all the tolerations with Effect PreferNoSchedule
	var tolerationsPreferNoSchedule []v1.Toleration
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		tolerationsPreferNoSchedule = priorityMeta.podTolerations

	} else {
		tolerationsPreferNoSchedule = getAllTolerationPreferNoSchedule(pod.Spec.Tolerations)
	}

	return framework.NodeScore{
		Name:  node.Name,
		Score: int64(countIntolerableTaintsPreferNoSchedule(node.Spec.Taints, tolerationsPreferNoSchedule)),
	}, nil
}

Map方法中得到的score为高node的taint中PreferNoSchedule 且 该pod不能容忍的个数。那个数越多,得分自然越低,这个步骤在Reduce中:

代码语言:txt
复制
var ComputeTaintTolerationPriorityReduce = NormalizeReduce(framework.MaxNodeScore, true)

Reduce也是进行了标准化,后面reverse参数则传入了true,Map的得分越高,则最终得分越低。

2.10 ImageLocalityPriority

该算法根据名称很容易判断,当前node本地是否存在pod所需的镜像计算得分。

注册方法:

代码语言:txt
复制
scheduler.RegisterPriorityMapReduceFunction(priorities.ImageLocalityPriority, priorities.ImageLocalityPriorityMap, nil, 1)

Map方法:

代码语言:txt
复制
func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}

	var score int
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
	} else {
		// if we are not able to parse priority meta data, skip this priority
		score = 0
	}

	return framework.NodeScore{
		Name:  node.Name,
		Score: int64(score),
	}, nil
}

Map中的score计算方法为:

代码语言:txt
复制
score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))

则主要关注calculatePriority和sumImageScores方法。

sumImageScores:

代码语言:txt
复制
func sumImageScores(nodeInfo *schedulernodeinfo.NodeInfo, containers []v1.Container, totalNumNodes int) int64 {
	var sum int64
	imageStates := nodeInfo.ImageStates()

	for _, container := range containers {
		if state, ok := imageStates[normalizedImageName(container.Image)]; ok {
			sum += scaledImageScore(state, totalNumNodes)
		}
	}

	return sum
}

func scaledImageScore(imageState *schedulernodeinfo.ImageStateSummary, totalNumNodes int) int64 {
	spread := float64(imageState.NumNodes) / float64(totalNumNodes)
	return int64(float64(imageState.Size) * spread)
}

sumImageScores是计算当前pod的所有container的镜像是否在该node上,这里是不包括initContainers的,因为initContainers的镜像一般比较小,所以developer这里默认忽略了。那不同镜像怎么计算得分呢?这里根据的是镜像的大小 乘以 镜像的传播度, 镜像越大,传播的越广,则得分越高(主要还是看镜像大小,传播度只是一个scale,引入传播度的概念是防止pod总是被调度到一个node或者少数几个node上,出现 node heating problem)。

calculatePriority:

代码语言:txt
复制
func calculatePriority(sumScores int64) int {
	if sumScores < minThreshold {
		sumScores = minThreshold
	} else if sumScores > maxThreshold {
		sumScores = maxThreshold
	}

	return int(int64(framework.MaxNodeScore) * (sumScores - minThreshold) / (maxThreshold - minThreshold))
}

calculatePriority做了标准化的操作,另外做了阈值限制,小于23M的和大于1000M的,都直接等于边界值。

2.11 ResourceLimitsPriority

node上的资源是否满足pod的limits

代码语言:txt
复制
func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}

	allocatableResources := nodeInfo.AllocatableResource()

	// compute pod limits
	var podLimits *schedulernodeinfo.Resource
	if priorityMeta, ok := meta.(*priorityMetadata); ok && priorityMeta != nil {
		// We were able to parse metadata, use podLimits from there.
		podLimits = priorityMeta.podLimits
	} else {
		// We couldn't parse metadata - fallback to computing it.
		podLimits = getResourceLimits(pod)
	}

	cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)
	memScore := computeScore(podLimits.Memory, allocatableResources.Memory)

	score := int64(0)
	if cpuScore == 1 || memScore == 1 {
		score = 1
	}

	... ... 

	return framework.NodeScore{
		Name:  node.Name,
		Score: score,
	}, nil
}

func computeScore(limit, allocatable int64) int64 {
	if limit != 0 && allocatable != 0 && limit <= allocatable {
		return 1
	}
	return 0
}

如果node的可分配资源(cpu或者memory)大于limit,则score固定为1。

2.12 EvenPodsSpreadPriority

这里有个拓扑传递限制的概念,参考该文章,

该限制可以定义你的pod是按照什么级别(node,可用区,地域)进行扩散,例如你可以指定你的pod在不同地域是均匀分配的。

注册方法:

代码语言:txt
复制
scheduler.RegisterPriorityMapReduceFunction(
			priorities.EvenPodsSpreadPriority,
			priorities.CalculateEvenPodsSpreadPriorityMap,
			priorities.CalculateEvenPodsSpreadPriorityReduce,
			1,
		)

Map方法:

代码语言:txt
复制
func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
	node := nodeInfo.Node()
	if node == nil {
		return framework.NodeScore{}, fmt.Errorf("node not found")
	}

	var m *podTopologySpreadMap
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		m = priorityMeta.podTopologySpreadMap
	}
	if m == nil {
		return framework.NodeScore{Name: node.Name, Score: 0}, nil
	}

	// no need to continue if the node is not qualified.
	if _, ok := m.nodeNameSet[node.Name]; !ok {
		return framework.NodeScore{Name: node.Name, Score: 0}, nil
	}

	// For each present <pair>, current node gets a credit of <matchSum>.
	// And we sum up <matchSum> and return it as this node's score.
	var score int64
	for _, c := range m.constraints {
		if tpVal, ok := node.Labels[c.topologyKey]; ok {
			pair := topologyPair{key: c.topologyKey, value: tpVal}
			matchSum := *m.topologyPairToPodCounts[pair]
			score += matchSum
		}
	}
	return framework.NodeScore{Name: node.Name, Score: score}, nil
}

Map主要计算的是该node上满足拓扑传递限制的pod的数量。数量越多,则得分越高,但实际上应当保持均衡,所以在Reduce里面进行取反。

Reduce方法:

代码语言:txt
复制
func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister,
	result framework.NodeScoreList) error {
	var m *podTopologySpreadMap
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		m = priorityMeta.podTopologySpreadMap
	}
	if m == nil {
		return nil
	}

	// Calculate the summed <total> score and <minScore>.
	var minScore int64 = math.MaxInt64
	var total int64
	for _, score := range result {
		// it's mandatory to check if <score.Name> is present in m.nodeNameSet
		if _, ok := m.nodeNameSet[score.Name]; !ok {
			continue
		}
		total += score.Score
		if score.Score < minScore {
			minScore = score.Score
		}
	}

	maxMinDiff := total - minScore
	for i := range result {
		nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name)
		if err != nil {
			return err
		}
		node := nodeInfo.Node()
		// Debugging purpose: print the score for each node.
		// Score must be a pointer here, otherwise it's always 0.
		if klog.V(10) {
			defer func(score *int64, nodeName string) {
				klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score)
			}(&result[i].Score, node.Name)
		}

		if maxMinDiff == 0 {
			result[i].Score = framework.MaxNodeScore
			continue
		}

		if _, ok := m.nodeNameSet[node.Name]; !ok {
			result[i].Score = 0
			continue
		}

		flippedScore := total - result[i].Score
		fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff))
		result[i].Score = int64(fScore)
	}
	return nil
}

Reduce方法则对Map的Score进行标准化,并取反。

3. 后记

优选算法是在预算算法的基础上计算各个node的得分,每种算法计算出加权得分形成为最终的总分。相对于预选算法来说(可以或者不可以调度),则融入了更多计算得分的策略。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 调度策略概览
  • 2. 优选算法
    • 2.1 EqualPriority
      • 2.2 MostRequestedPriority
        • 2.3 RequestedToCapacityRatioPriority
          • 2.4 SelectorSpreadPriority
            • 2.5 ServiceSpreadingPriority
              • 2.6 InterPodAffinityPriority
                • 2.5 LeastRequestedPriority
                  • 2.6 BalancedResourceAllocation
                    • 2.7 NodePreferAvoidPodsPriority
                      • 2.8 NodeAffinityPriority
                        • 2.9 TaintTolerationPriority
                          • 2.10 ImageLocalityPriority
                            • 2.11 ResourceLimitsPriority
                              • 2.12 EvenPodsSpreadPriority
                              • 3. 后记
                              相关产品与服务
                              容器服务
                              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档