Kubernetes 1.8抢占式调

Author: xidianwangtao@gmail.com

阅读本博文前,建议先阅读解析Kubernetes 1.8中的基于Pod优先级的抢占式调度。

ScheduleAlgorithm的变化

在Kubernetes 1.8中,对ScheduleAlgorithm Interface的定义发生了改变,多了一个Preempt(...)。因此,我在博文Kubernetes Scheduler原理解析(当时是基于kubernetes 1.5)中对scheduler调度过程开的一句话概括“将PodSpec.NodeName为空的Pods逐个地,经过预选(Predicates)和优选(Priorities)两个步骤,挑选最合适的Node作为该Pod的Destination。”将不再准确了。

现在应该一句话这样描述才算准确了:“将PodSpec.NodeName为空的Pods逐个地,经过预选(Predicates)和优选(Priorities)两个步骤,挑选最合适的Node作为该Pod的Destination。如果经过预选和优选仍然没有找到合适的节点,并且启动了Pod Priority,那么该Pod将会进行Preempt抢占式调度找到最合适的节点及需要Evict的Pods。”

// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
type ScheduleAlgorithm interface {
	Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
	// Preempt receives scheduling errors for a pod and tries to create room for
	// the pod by preempting lower priority pods if possible.
	// It returns the node where preemption happened, a list of preempted pods, and error if any.
	Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error)
	// Predicates() returns a pointer to a map of predicate functions. This is
	// exposed for testing.
	Predicates() map[string]FitPredicate
	// Prioritizers returns a slice of priority config. This is exposed for
	// testing.
	Prioritizers() []PriorityConfig
}

我的博文Kubernetes Scheduler源码分析(当时是基于kubernetes 1.5)对schedule的全过程做过全面的代码解读,当时的描述是这样子的:Scheduler.scheduleOne开始真正的调度逻辑,每次负责一个Pod的调度,逻辑如下:

  • 从PodQueue中获取一个Pod。
  • 执行对应Algorithm的Schedule,进行预选和优选。
  • AssumePod
  • Bind Pod, 如果Bind Failed,ForgetPod。

在1.8中,但预选和优选调度完整没有找到合适node时(其实一定会是预选没有找到nodes,优选只是挑更好的),还会调用sched.preempt进行抢占式调度。

plugin/pkg/scheduler/scheduler.go:293

func (sched *Scheduler) scheduleOne() {
	pod := sched.config.NextPod()
	if pod.DeletionTimestamp != nil {
		sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
		glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
		return
	}

	glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

	// Synchronously attempt to find a fit for the pod.
	start := time.Now()
	suggestedHost, err := sched.schedule(pod)
	metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
	if err != nil {
		// schedule() may have failed because the pod would not fit on any host, so we try to
		// preempt, with the expectation that the next time the pod is tried for scheduling it
		// will fit due to the preemption. It is also possible that a different pod will schedule
		// into the resources that were preempted, but this is harmless.
		if fitError, ok := err.(*core.FitError); ok {
			sched.preempt(pod, fitError)
		}
		return
	}

	// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
	// This allows us to keep scheduling without waiting on binding to occur.
	assumedPod := *pod
	// assume modifies `assumedPod` by setting NodeName=suggestedHost
	err = sched.assume(&assumedPod, suggestedHost)
	if err != nil {
		return
	}

	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
	go func() {
		err := sched.bind(&assumedPod, &v1.Binding{
			ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
			Target: v1.ObjectReference{
				Kind: "Node",
				Name: suggestedHost,
			},
		})
		metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
		if err != nil {
			glog.Errorf("Internal error binding pod: (%v)", err)
		}
	}()
}

Scheduler.preemt

好的,关于预选和优选,我这里不做过多解读,因为整个源码逻辑和1.5是一样,不同的是1.8增加了更多的Predicate和Priority Policys及其实现。下面只看抢占式调度Preempt的代码。

plugin/pkg/scheduler/scheduler.go:191

func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
	if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) {
		glog.V(3).Infof("Pod priority feature is not enabled. No preemption is performed.")
		return "", nil
	}
	preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
	if err != nil {
		glog.Errorf("Error getting the updated preemptor pod object: %v", err)
		return "", err
	}
	node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
	if err != nil {
		glog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
		return "", err
	}
	if node == nil {
		return "", err
	}
	glog.Infof("Preempting %d pod(s) on node %v to make room for %v/%v.", len(victims), node.Name, preemptor.Namespace, preemptor.Name)
	annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name}
	err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
	if err != nil {
		glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
		return "", err
	}
	for _, victim := range victims {
		if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
			glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
			return "", err
		}
		sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, node.Name)
	}
	return node.Name, err
}
  • 检查FeaturesGate中是否开启了PodPriority,如果没开启,则不会进行后续Preemption操作;
  • 由于该Pod在Predicate/Priortiy调度过程失败后,会更新PodCondition,记录调度失败状态及失败原因。因此需要从apiserver中获取PodCondition更新后的Pod Object;
  • 调用ScheduleAlgorithm.Preempt进行抢占式调度,选出最佳node和待preempt pods(称为victims);
  • 调用apiserver给该pod(称为Preemptor)打上Annotation:NominatedNodeName=nodeName;
  • 遍历victims,调用apiserver进行逐个删除这些pods;

注意:在scheduler调用shed.schedule(pod)进行预选和优选调度失败时,Pod Bind Node失败,该Pod会requeue unscheduled Cache podqueue中,如果在这个pod调度过程中又有新的pod加入到待调度队列,那么该pod requeue时它前面就有其他pod,下一次调度就是先调度在它前面的pod,而这些pod的调度有可能会调度到刚刚通过Preempt释放资源的Node上,导致把刚才Preemptor释放的resource消耗掉。当再次轮到上次的Preemptor调度时,可能又需要触发一次某个节点的Preempt。

genericScheduler.Preempt

ScheduleAlgorithm.Preempt是抢占式调度的关键实现,其对应的实现在genericScheduler中:

plugin/pkg/scheduler/core/generic_scheduler.go:181

// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns the node and the list of preempted pods if such a node is found.
// TODO(bsalamat): Add priority-based scheduling. More info: today one or more
// pending pods (different from the pod that triggered the preemption(s)) may
// schedule into some portion of the resources freed up by the preemption(s)
// before the pod that triggered the preemption(s) has a chance to schedule
// there, thereby preventing the pod that triggered the preemption(s) from
// scheduling. Solution is given at:
// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanics
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
	// Scheduler may return various types of errors. Consider preemption only if
	// the error is of type FitError.
	fitError, ok := scheduleErr.(*FitError)
	if !ok || fitError == nil {
		return nil, nil, nil
	}
	err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
	if err != nil {
		return nil, nil, err
	}
	if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
		glog.V(5).Infof("Pod %v is not eligible for more preemption.", pod.Name)
		return nil, nil, nil
	}
	allNodes, err := nodeLister.List()
	if err != nil {
		return nil, nil, err
	}
	if len(allNodes) == 0 {
		return nil, nil, ErrNoNodesAvailable
	}
	potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates)
	if len(potentialNodes) == 0 {
		glog.V(3).Infof("Preemption will not help schedule pod %v on any node.", pod.Name)
		return nil, nil, nil
	}
	nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer)
	if err != nil {
		return nil, nil, err
	}
	for len(nodeToPods) > 0 {
		node := pickOneNodeForPreemption(nodeToPods)
		if node == nil {
			return nil, nil, err
		}
		passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders)
		if passes && pErr == nil {
			return node, nodeToPods[node], err
		}
		if pErr != nil {
			glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
		}
		// Remove the node from the map and try to pick a different node.
		delete(nodeToPods, node)
	}
	return nil, nil, err
}

sched.schedule error检查

  • 只有前面sched.schedule()返回的error为FitError类型时,才会触发后续的Preemption。FitError就是表示pod在Predicate阶段进行某些PredicateFunc筛选时不通过。也就是说只有预选失败的Pod才会进行抢占式调度。

更新scheduler cache中的NodeInfo

  • 更新scheduler cache中NodeInfo,主要是更新Node上scheduled 和Assumed Pods,作为后续Preempt Pods时的考虑范围,确保Preemption是正确的。

podEligibleToPreemptOthers检查pod是否有资格进行抢占式调度

  • invoke podEligibleToPreemptOthers来判断该pod是否适合进行后续的Preemption,判断逻辑是:
    • 如果该Pod已经包含Annotation:NominatedNodeName=nodeName(说明该pod之前已经Preempted),并且Annotation中的这个Node有比该pod优先级更低的pod正在Terminating,则认为该pod不适合进行后续的Preemption,流程结束。
    • 除此之外,继续后续的流程。
 对应代码如下:
 plugin/pkg/scheduler/core/generic_scheduler.go:756

func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
	if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {
		if nodeInfo, found := nodeNameToInfo[nodeName]; found {
			for _, p := range nodeInfo.Pods() {
				if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
					// There is a terminating pod on the nominated node.
					return false
				}
			}
		}
	}
	return true
}

nodesWherePreemptionMightHelp筛选出Potential Nodes

  • invoke nodesWherePreemptionMightHelp来获取potential nodes。nodesWherePreemptionMightHelp的逻辑是:
    • 遍历所有的nodes,对每个nodes在sched.schedule()在预选阶段失败的Predicate策略(failedPredicates)进行扫描,如果failedPredicates包含以下Policy,则说明该node不适合作为Preempt的备选节点。
      • NodeSelectorNotMatch,
      • PodNotMatchHostName,
      • TaintsTolerationsNotMatch,
      • NodeLabelPresenceViolated,
      • NodeNotReady,
      • NodeNetworkUnavailable,
      • NodeUnschedulable,
      • NodeUnknownCondition
    • 除此之外的Node均作为Potential Nodes。
 对应代码如下:
 
func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
	potentialNodes := []*v1.Node{}
	for _, node := range nodes {
		unresolvableReasonExist := false
		failedPredicates, found := failedPredicatesMap[node.Name]
		// If we assume that scheduler looks at all nodes and populates the failedPredicateMap
		// (which is the case today), the !found case should never happen, but we'd prefer
		// to rely less on such assumptions in the code when checking does not impose
		// significant overhead.
		for _, failedPredicate := range failedPredicates {
			switch failedPredicate {
			case
				predicates.ErrNodeSelectorNotMatch,
				predicates.ErrPodNotMatchHostName,
				predicates.ErrTaintsTolerationsNotMatch,
				predicates.ErrNodeLabelPresenceViolated,
				predicates.ErrNodeNotReady,
				predicates.ErrNodeNetworkUnavailable,
				predicates.ErrNodeUnschedulable,
				predicates.ErrNodeUnknownCondition:
				unresolvableReasonExist = true
				break
				// TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors.
			}
		}
		if !found || !unresolvableReasonExist {
			glog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
			potentialNodes = append(potentialNodes, node)
		}
	}
	return potentialNodes
}
 

selectNodesForPreemption和selectVictimsOnNode选出可行Nodes及其对应的victims

  • invoke selectNodesForPreemption从Potential Nodes中找出所有可行的Nodes及对应的victim Pods,其对应的逻辑如为:启动max(16, potentialNodesNum)个worker(对应goruntine)通过WaitGroups并发等待所有node的check完成:
    • 遍历该node上所有的scheduled pods(包括assumed pods),将优先级比Preemptor更低的Pods都加入到Potential victims List中,并且将这些victims从NodeInfoCopy中删除,下次进行Predicate时就意味着Node上有更多资源可用。
    • 对Potential victims中元素进行排序,排序规则是按照优先级从高到底排序的,index为0的对应的优先级最高。
    • 检查Preemptor是否能scheduler配置的所有Predicates Policy(基于前面将这些victims从NodeInfoCopy中删除,将所有更低优先级的pods资源全部释放了),如果不通过则返回,表示该node不合适。All Predicate通过后,继续下面流程。
    • 遍历所有的Potential victims list item(已经按照优先级从高到底排序),试着把Potential victims中第一个Pod(优先级最高)加回到NodeInfoCopy中,再检查Preemptor是否能scheduler配置的所有Predicates Policy,如果不满足就把该pod再从NodeInfoCopy中删除,并且正式加入到victims list中。接着对Potential victims中第2,3...个Pod进行同样处理。这样做,是为了保证尽量保留优先级更高的Pods,尽量删除更少的Pods。
    • 最终返回每个可行node及其对应victims list。
 selectNodesForPreemption代码如下,其实核心代码在selectVictimsOnNode。
 plugin/pkg/scheduler/core/generic_scheduler.go:583

func selectNodesForPreemption(pod *v1.Pod,
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
	potentialNodes []*v1.Node,
	predicates map[string]algorithm.FitPredicate,
	metadataProducer algorithm.PredicateMetadataProducer,
) (map[*v1.Node][]*v1.Pod, error) {

	nodeNameToPods := map[*v1.Node][]*v1.Pod{}
	var resultLock sync.Mutex

	// We can use the same metadata producer for all nodes.
	meta := metadataProducer(pod, nodeNameToInfo)
	checkNode := func(i int) {
		nodeName := potentialNodes[i].Name
		var metaCopy algorithm.PredicateMetadata
		if meta != nil {
			metaCopy = meta.ShallowCopy()
		}
		pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates)
		if fits {
			resultLock.Lock()
			nodeNameToPods[potentialNodes[i]] = pods
			resultLock.Unlock()
		}
	}
	workqueue.Parallelize(16, len(potentialNodes), checkNode)
	return nodeNameToPods, nil
}

pickOneNodeForPreemption从可行Nodes中找出最合适的一个Node

  • 如果上一步至少找到一个可行node,则调用pickOneNodeForPreemption按照以下逻辑选择一个最合适的node:
    • 选择victims中最高pod优先级最低的那个Node。
    • 如果上一步有不止一个Nodes满足条件,则再对选择所有victims优先级之和最小的那个Node。
    • 如果上一步有不止一个Nodes满足条件,则再选择victims pod数最少的Node。
    • 如果上一步有不止一个Nodes满足条件,则再随机选择一个Node。
    • 以上每一步的Nodes列表,都是基于上一步筛选后的Nodes。
plugin/pkg/scheduler/core/generic_scheduler.go:501

func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node {
	type nodeScore struct {
		node            *v1.Node
		highestPriority int32
		sumPriorities   int64
		numPods         int
	}
	if len(nodesToPods) == 0 {
		return nil
	}
	minHighestPriority := int32(math.MaxInt32)
	minPriorityScores := []*nodeScore{}
	for node, pods := range nodesToPods {
		if len(pods) == 0 {
			// We found a node that doesn't need any preemption. Return it!
			// This should happen rarely when one or more pods are terminated between
			// the time that scheduler tries to schedule the pod and the time that
			// preemption logic tries to find nodes for preemption.
			return node
		}
		// highestPodPriority is the highest priority among the victims on this node.
		highestPodPriority := util.GetPodPriority(pods[0])
		if highestPodPriority < minHighestPriority {
			minHighestPriority = highestPodPriority
			minPriorityScores = nil
		}
		if highestPodPriority == minHighestPriority {
			minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)})
		}
	}
	if len(minPriorityScores) == 1 {
		return minPriorityScores[0].node
	}
	// There are a few nodes with minimum highest priority victim. Find the
	// smallest sum of priorities.
	minSumPriorities := int64(math.MaxInt64)
	minSumPriorityScores := []*nodeScore{}
	for _, nodeScore := range minPriorityScores {
		var sumPriorities int64
		for _, pod := range nodesToPods[nodeScore.node] {
			// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
			// needed so that a node with a few pods with negative priority is not
			// picked over a node with a smaller number of pods with the same negative
			// priority (and similar scenarios).
			sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
		}
		if sumPriorities < minSumPriorities {
			minSumPriorities = sumPriorities
			minSumPriorityScores = nil
		}
		nodeScore.sumPriorities = sumPriorities
		if sumPriorities == minSumPriorities {
			minSumPriorityScores = append(minSumPriorityScores, nodeScore)
		}
	}
	if len(minSumPriorityScores) == 1 {
		return minSumPriorityScores[0].node
	}
	// There are a few nodes with minimum highest priority victim and sum of priorities.
	// Find one with the minimum number of pods.
	minNumPods := math.MaxInt32
	minNumPodScores := []*nodeScore{}
	for _, nodeScore := range minSumPriorityScores {
		if nodeScore.numPods < minNumPods {
			minNumPods = nodeScore.numPods
			minNumPodScores = nil
		}
		if nodeScore.numPods == minNumPods {
			minNumPodScores = append(minNumPodScores, nodeScore)
		}
	}
	// At this point, even if there are more than one node with the same score,
	// return the first one.
	if len(minNumPodScores) > 0 {
		return minNumPodScores[0].node
	}
	glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
	return nil
}

最合适的Node仍然要交给extender(if configed)检查

  • 如果scheduler配置extender scheduler,则还需要通过invoke nodePassesExtendersForPreemption再次将该pod和(假设)剔除victims的该node交给extender.Filter进行一下检查,只有检查通过了才返回该node作为最终选择的Preempt node。
  • 关于extender的理解,请参考如何对kubernetes scheduler进行二次开发和Kubernetes Scheduler源码分析。其实用的场景不多,现在支持自定义调度器了,就更少需要使用scheduler extender了。

总结

整个抢占式调度的逻辑归纳为:

  • 检查FeaturesGate中是否开启了PodPriority;
  • 调用ScheduleAlgorithm.Preempt进行抢占式调度,选出最佳node和待preempt pods(称为victims);
    • podEligibleToPreemptOthers检查pod是否有资格进行抢占式调度;
    • nodesWherePreemptionMightHelp筛选出Potential Nodes;
    • selectNodesForPreemption和selectVictimsOnNode选出可行Nodes及其对应的victims;
    • pickOneNodeForPreemption从可行Nodes中找出最合适的一个Node;
    • 最合适的Node仍然要交给extender(if configed)检查;
  • 调用apiserver给该pod(称为Preemptor)打上Annotation:NominatedNodeName=nodeName;
  • 遍历victims,调用apiserver进行逐个删除这些pods;

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏恰童鞋骚年

ASP.Net MVC开发基础学习笔记:五、区域、模板页与WebAPI初步

  为了方便大规模网站中的管理大量文件,在ASP.NET MVC 2.0版本中引入了一个新概念—区域(Area)。

1852
来自专栏Kubernetes

深度解析Kubernetes Local Persistent Volume(二)

摘要:上一篇博客”深度解析Kubernetes Local Persistent Volume(一)“对local volume的基本原理和注意事项进行了分析,...

1.6K3
来自专栏菩提树下的杨过

ExtJs学习笔记(20)-利用ExtJs的Ajax与服务端WCF交互

ExtJs是一套非常不错的javascript UI库(第一次接触ExtJs的,可到官方网站http://www.extjs.com/deploy/dev/ex...

2217
来自专栏一个番茄说

Swift中防止ptrace依附

在移动开发中,安全是一个很重要的话题,当然安全是没有绝对的,只能说尽可能的提高安全性。在iOS的开发中,为了防止别人窥视我们的App,我们得采用一些手段来进行防...

1293
来自专栏技术小讲堂

iBatis.Net(5):Data Map(了解)

总算,总算,能写点示例啦,呵呵,其实前面的几篇,我感觉自己写的也很生硬,没有Demo理解起来是很困难,很多名词,反正我初次接触iBatis的时候,是一点也不理解...

3406
来自专栏逸鹏说道

我这么玩Web Api(一)

帮助页面或用户手册(Microsoft and Swashbuckle Help Page) 前言   你需要为客户编写Api调用手册?你需要测试你的Api接口...

3125
来自专栏互联网技术栈

Netflix Archaius 分布式配置管理依赖构件

archaius是Netflix公司开源项目之一,基于java的配置管理类库,主要用于多配置存储的动态获取。主要功能是对apache common config...

1532
来自专栏程序员笔记

celery 定时任务实现

4174
来自专栏函数式编程语言及工具

Akka(10): 分布式运算:集群-Cluster

   Akka-Cluster可以在一部物理机或一组网络连接的服务器上搭建部署。用Akka开发同一版本的分布式程序可以在任何硬件环境中运行,这样我们就可以确定以...

5979
来自专栏老码农专栏

原 荐 一场版本升级引发的性能血案 - 之数

2363

扫码关注云+社区

领取腾讯云代金券