前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【技术干货】kube-scheduler里的调度框架

【技术干货】kube-scheduler里的调度框架

作者头像
CNCF
发布2021-05-27 15:55:11
8090
发布2021-05-27 15:55:11
举报
文章被收录于专栏:CNCF

作者/王一钧

概述

本文主要介绍k8s.io/kubernetes/pkg/scheduler/framework的调度流程。

调度框架为kube-scheduler提供一组插件式api,这些插件编译到kube-scheduler中,也可以以插件的姿势实现更多的调度特性,这样可以保证核心组件简单、易维护。

Scheduling Cycle& Binding Cycle

每一个需要调度的pod都会经历两个流程Scheduling Cycle和Binding Cycle. Scheduling Cycle给pod挑选一个node,Binding Cycle则将该决策应用到集群中,Scheduling Cycle可能是顺序执行,Binding Cycles阶段可能是并发运行。

如果确定一个pod无法调度或者出现内部错误,Scheduling Cycle和Binding Cycle将会终止,pod将会重新回到队列等待重试;如果在Binding Cycle被终止,将会调用Reserve插件Unreserve方法。

扩展点

如图展示一个pod的调度流程和调度框架公开的扩展点;Filter也就是谓词过滤器(predicate filter),Scoring等同于优先算法(Priority function),注册的插件将会在对应的扩展点被调用。

1. queue sort

queue sort插件用于排序在调度队列中的pod,该插件只能启动一个。

pkg/scheduler/framework/v1alpha1/interface.go

代码语言:javascript
复制
type QueueSortPlugin interface {
   Plugin
   Less(*QueuedPodInfo, *QueuedPodInfo) bool
}

默认实现了一个优先调度插件即获取pod中的spec.Priority字段两个pod进行比较,每一个进入队列的pod都需要经过该方法对其进行排序,Priority字段最大的将会在队列的最前面。

pkg/scheduler/framework/plugins/queuesort/priority_sort.go

代码语言:javascript
复制
1.func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
2.   // 获取pod的 spec.Priority 字段
3.   p1 := pod.GetPodPriority(pInfo1.Pod)
4.   p2 := pod.GetPodPriority(pInfo2.Pod)
5.   return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
6.}

2. PreFilter

这些插件用于预处理pod的信息,并去检测集群或者pod需要达成的某种特定条件。

preFilter插件必须实现 PreFilter方法,并按照顺序调用每一个插件,如果其中一个插件preFilter返回error整个Scheduler Cycle将会中止pod被送入不可调度队列。

pkg/scheduler/framework/v1alpha1/interface.go

代码语言:javascript
复制
1.type PreFilterPlugin interface {
2.   Plugin
3.   PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status
4.   PreFilterExtensions() PreFilterExtensions
5.}

看一个默认实现的PreFilter的插件NodePort plugin:

pkg/scheduler/framework/plugins/nodeports/node_ports.go

代码语言:javascript
复制
func getContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort {
   ports := []*v1.ContainerPort{}
   for _, pod := range pods {
      for j := range pod.Spec.Containers {
         container := &pod.Spec.Containers[j]
         for k := range container.Ports {
            ports = append(ports, &container.Ports[k])
         }
      }
   }
   return ports
}
// PreFilter invoked at the prefilter extension point.
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
   s := getContainerPorts(pod)
   cycleState.Write(preFilterStateKey, preFilterState(s))
   return nil
}

遍历pod的spec.containers.ports字段搜集所有需要开放的port,并写到一个Map中。

另外Pre-filter插件还可以实现一个可选项 PreFilterExtensions接口,该接口提供两个方法AddPod和RemovePod去修改预处理信息。调度框架保证这些函数在preFilter后被调用,而且有可能被调用多次。

3. Filter

该步骤会过滤掉那些不能运行的pod的node,每一个node都会按照配置的顺序调用每一个插件,如果任何一个插件标记该节点不合适,剩下的插件将不会被调用,该过程可能是并行的,某些时候可能会调用多次。

pkg/scheduler/framework/v1alpha1/interface.go

代码语言:javascript
复制
1.type FilterPlugin interface {
2.   Plugin
3.   Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
4.}

同样看Node ports插件的Filter方法,这里会查看对应的node上是否有pod所需的端口,若端口冲突直接过滤掉这个node。

pkg/scheduler/framework/plugins/nodeports/node_ports.go

代码语言:javascript
复制
1.func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
2.   // 获取pre-filter 获取的字段
3.   wantPorts, err := getPreFilterState(cycleState)
4.   if err != nil {
5.      return framework.NewStatus(framework.Error, err.Error())
6.   }
7.   // 判断node 的port 和pod 的port 是否有冲突
8.   fits := fitsPorts(wantPorts, nodeInfo)
9.   if !fits {
10.      return framework.NewStatus(framework.Unschedulable, ErrReason)
11.   }
12.
13.   return nil
14.}
15.
16.// Fits checks if the pod fits the node.
17.func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
18.   return fitsPorts(getContainerPorts(pod), nodeInfo)
19.}
20.
21.func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
22.   // try to see whether existingPorts and wantPorts will conflict or not
23.   existingPorts := nodeInfo.UsedPorts
24.   for _, cp := range wantPorts {
25.      if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
26.         return false
27.      }
28.   }
29.   return true
30.}

4. PostFilter

这些插件将会在Filter后且发现无Node可选时被调用,一个典型的例子时抢占式,如果任何一个插件标记某个节点可被调度,那么剩下的插件将不会被调用。

pkg/scheduler/core/generic_scheduler.go

5. PreScore

这里也是采集一些需要打分的指标,采集后放到一个Map中用于后续Score阶段评分,当任意一个插件运行失败,剩下的插件同样无法继续执行。

pkg/scheduler/framework/v1alpha1/interface.go

代码语言:javascript
复制
1.type PreScorePlugin interface {
2.   Plugin
3.   PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
4.}

例如ResourceLimits plugin对每个容器的Limit资源进行相加,并获取到Init容器所需要的最大资源。

pkg/scheduler/framework/plugins/noderesources/resource_limits.go

代码语言:javascript
复制
1.func (rl *ResourceLimits) PreScore(
2.   pCtx context.Context,
3.   cycleState *framework.CycleState,
4.   pod *v1.Pod,
5.   nodes []*v1.Node,
6.) *framework.Status {
7.   if len(nodes) == 0 {
8.      // No nodes to score.
9.      return nil
10.   }
11.
12.   if rl.handle.SnapshotSharedLister() == nil {
13.      return framework.NewStatus(framework.Error, fmt.Sprintf("empty shared lister"))
14.   }
15.   s := &preScoreState{
16.      // 将每个Container 需要的资源相加 并获取InitContainer所需最大的资源
17.      podResourceRequest: getResourceLimits(pod),
18.   }
19.   cycleState.Write(preScoreStateKey, s)
20.   return nil
21.}

6. Score

这个阶段将会调用所有实现了Score的plugin对通过过滤器的所有Node进行打分,NormalizeScore阶段后将会将每个插件返回分值按照他们配置的比重进行合并。

如下是ResourceList plugin的Score阶段代码:

1.如果node没有发布可分配的资源,对节点评分为0。

2.如果pod未指定CPU或内存限制,对节点评分为0。

3.如果CPU和内存都符合,则这个节点分数为1。

pkg/scheduler/framework/plugins/noderesources/resource_limits.go

代码语言:javascript
复制
1.func (rl *ResourceLimits) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
2.   nodeInfo, err := rl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
3.   if err != nil || nodeInfo.Node() == nil {
4.      return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
5.   }
6.
7.   podLimits, err := getPodResource(state)
8.   if err != nil {
9.      return 0, framework.NewStatus(framework.Error, err.Error())
10.   }
11.
12.   cpuScore := computeScore(podLimits.MilliCPU, nodeInfo.Allocatable.MilliCPU)
13.   memScore := computeScore(podLimits.Memory, nodeInfo.Allocatable.Memory)
14.
15.   score := int64(0)
16.   if cpuScore == 1 || memScore == 1 {
17.      score = 1
18.   }
19.   return score, nil
20.}

7. NormalizeScore

在Scheduler给Node最终得分之前修改分数,该步骤主要用于修正分数到合理数值(0~100),例如Node affinity的Score阶段,会将配置的wight全部相加,该字段为用户自定义配置,所以这里就需要对其进行修正。

pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go

代码语言:javascript
复制
1.func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
2.   nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
3.   if err != nil {
4.      return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
5.   }
6.
7.   node := nodeInfo.Node()
8.   if node == nil {
9.      return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
10.   }
11.
12.   affinity := pod.Spec.Affinity
13.
14.   var count int64
15.   // A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
16.   // An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
17.   // empty PreferredSchedulingTerm matches all objects.
18.   if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
19.      // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
20.      for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
21.         preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
22.         if preferredSchedulingTerm.Weight == 0 {
23.            continue
24.         }
25.
26.         // TODO: Avoid computing it for all nodes if this becomes a performance problem.
27.         nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
28.         if err != nil {
29.            return 0, framework.NewStatus(framework.Error, err.Error())
30.         }
31.
32.         if nodeSelector.Matches(labels.Set(node.Labels)) {
33.            count += int64(preferredSchedulingTerm.Weight)
34.         }
35.      }
36.   }
37.
38.   return count, nil
39.}
40.
41.// NormalizeScore invoked after scoring all nodes.
42.func (pl *NodeAffinity) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
43.   return pluginhelper.DefaultNormalizeScore(framework.MaxNodeScore, false, scores)
44.}

DefaultNormalizeScore默认的NormalizeScore函数,作用是让评分处于[0, maxPriority ] 之间,可以对分数进行反转(当前分数 = 最大分数 - 当前分数)。

pkg/scheduler/framework/plugins/helper/normalize_score.go

代码语言:javascript
复制
1.func DefaultNormalizeScore(maxPriority int64, reverse bool, scores framework.NodeScoreList) *framework.Status {
2.   var maxCount int64
3.   for i := range scores {
4.      if scores[i].Score > maxCount {
5.         maxCount = scores[i].Score
6.      }
7.   }
8.
9.   if maxCount == 0 {
10.      if reverse {
11.         for i := range scores {
12.            scores[i].Score = maxPriority
13.         }
14.      }
15.      return nil
16.   }
17.
18.   for i := range scores {
19.      score := scores[i].Score
20.
21.      score = maxPriority * score / maxCount
22.      if reverse {
23.         score = maxPriority - score
24.      }
25.
26.      scores[i].Score = score
27.   }
28.   return nil
29.}

7. Reserve

要扩展Reserve需要实现两个方法Reserve and Unreserve。

这两个信息调度阶段分别称为Reserve和Unreserve,维护有状态插件时当一个节点上的资源被保留(Reserve) 和未被给定pod保留(Unreserve),应该使用该阶段去通知调度程序,Reserve发生在Scheduler真正绑定pod到node之前。

每个Reserve插件的Reserve方法可能成功也可能失败,如果一个Reserve方法调用失败,后续的插件就不会执行,并且认为Reserve阶段已经失败。

如果所有插件的Reserve方法成功,则认为Reserve阶段成功,并执行Scheduling Cycle 和Binding Cycle的其余部分,当Reserve阶段或后续阶段失败时,会触发Unreserve,当这种情况发生时,所有的Reserve插件的Unreserve方法将以与Reserve方法调用相反的顺序执行,此阶段的存在是为了清除与保留Pod相关联的状态。

VolumeBinding plugin Reserve:

pkg/scheduler/framework/plugins/volumebinding/volume_binding.go

代码语言:javascript
复制
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
   // AssumePodVolumes将:
   // 1. 取未绑定PVC的PV匹配项并更新PV缓存,假设PV已预先绑定到PVC。
   // 2. 获取需要配置的PVC,并使用相关的注释集更新PVC缓存。
   //如果所有卷都已绑定,则返回true
   allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName)
   if err != nil {
      return framework.NewStatus(framework.Error, err.Error())
   }
   cs.Write(allBoundStateKey, stateData{allBound: allBound})
   return nil
}

Unreserve:

代码语言:javascript
复制
func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
   pl.Binder.DeletePodBindings(pod)
   return
}

8. permit

该阶段是Scheduling cycle的最后一个阶段,主要用于阻止或者延期pod绑定到node上。

pkg/scheduler/framework/v1alpha1/interface.go

代码语言:javascript
复制
type PermitPlugin interface {
   Plugin
   Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
}

这些插件做三件事:

Approve:一旦所以插件都准许,那么pod将会到binding Cycle。

deny:如果任何一个插件拒绝,该pod将会重新回到调度队列,并触发前面的Unreserve操作。

wait(with a timeout):如果一个插件返回wait,那么该pod就会进入一个等待队列中,pod开始进入到binding Cycle但是是阻塞的知道允许;如果timeout发生,那么将会从timeout状态变成deny状态。

9. PreBind

执行pod的绑定前操作。

pkg/scheduler/framework/plugins/volumebinding/volume_binding.go

代码语言:javascript
复制
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
   state, err := cs.Read(allBoundStateKey)
   if err != nil {
      return framework.NewStatus(framework.Error, err.Error())
   }
   s, ok := state.(stateData)
   if !ok {
      return framework.NewStatus(framework.Error, "unable to convert state into stateData")
   }
   if s.allBound {
      // no need to bind volumes
      return nil
   }
   klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", pod.Namespace, pod.Name)
   `// 绑定pod volume` 
   err = pl.Binder.BindPodVolumes(pod)
   if err != nil {
      klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", pod.Namespace, pod.Name, err)
      return framework.NewStatus(framework.Error, err.Error())
   }
   klog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", pod.Namespace, pod.Name)
   return nil
}

10. bind

将pod绑定到Node, 只有所以preBind操作完成后,才会执行该步骤,每个bind plugin将会配置的顺序执行,bind plugin可以选择性的对pod进行操作,如果其中一个插件处理过了一个pod,那么剩下的将会被跳过。

pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

代码语言:javascript
复制
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
   klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)
   binding := &v1.Binding{
      ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
      Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
   }
   err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
   if err != nil {
      return framework.NewStatus(framework.Error, err.Error())
   }
   return nil
}

11. postBind

这是一个信息扩展点,绑定后插件在Pod成功绑定后被调用,这是一个绑定循环的结束,可以用来清理相关的资源。

pkg/scheduler/framework/plugins/volumebinding/volume_binding.go

代码语言:javascript
复制
func (pl *VolumeBinding) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
   pl.Binder.DeletePodBindings(pod)
   return
}

至此所有绑定操作结束。

相关推荐:

1.【技术干货】K8S日志管理(二)

2.KubeEdge云边隧道Stream源码解析

3.【技术干货】Thanos的架构剖析

-End-

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-05-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CNCF 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档