前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >k8s源码-scheduler流程深度剖析

k8s源码-scheduler流程深度剖析

原创
作者头像
ascehuang
修改2019-12-22 17:06:50
3.2K0
修改2019-12-22 17:06:50
举报
文章被收录于专栏:k8s源码解析k8s源码解析

1. 简介

1.1 scheduler的作用:

  • 监听API server,获取还没有bind到node上的pod
  • 根据 预选,优先,抢占 策略,将pod调度到合适的node上
  • 调用API server,将调度信息写入到etcd

1.2 scheduler的原则:

  • 公平:确保每个pod都要被调度,即使因为资源不够而无法调用
  • 资源合理分配:根据多种策略选择合适的node,并且使资源利用率尽量高
  • 可自定义:内部支持多种调度策略,用户可以选择亲和性、优先级、污点等控制调度结果,另外也支持自定义schduler的方式进行扩展

2. 流程概览

调度大致流程
调度大致流程

3. 调度策略一览,按照优先顺序

预选

优选

CheckNodeUnschedulablePred GeneralPred HostNamePred PodFitsHostPortsPred MatchNodeSelectorPred PodFitsResourcesPred NoDiskConflictPred PodToleratesNodeTaintsPred PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred CheckServiceAffinityPred MaxEBSVolumeCountPred MaxGCEPDVolumeCountPred MaxCSIVolumeCountPred MaxAzureDiskVolumeCountPred MaxCinderVolumeCountPred CheckVolumeBindingPred NoVolumeZoneConflictPred EvenPodsSpreadPred MatchInterPodAffinityPred

EqualPriority MostRequestedPriority RequestedToCapacityRatioPriority SelectorSpreadPriority ServiceSpreadingPriority InterPodAffinityPriority LeastRequestedPriority BalancedResourceAllocation NodePreferAvoidPodsPriority NodeAffinityPriority TaintTolerationPriority ImageLocalityPriority ResourceLimitsPriority EvenPodsSpreadPriority

4. 源码分析

4.1 入口函数 cmd/kube-scheduler/schduler.go

代码语言:txt
复制
func main() {
	... ...
	command := app.NewSchedulerCommand()
	... ...
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

入口函数里NewSchdulerCommand, kubernetes所有组件都使用common cli的形式,可参考cobra,NewSchedulerCommand后面会介绍,返回cobra.Command, 然后Execute该command。

4.2 scheduler服务封装 cmd/kube-scheduler/app/server.go

代码语言:txt
复制
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	opts, err := options.NewOptions()
	... ...

	cmd := &cobra.Command{
		Use: "kube-scheduler",
		Long: `... ... `,
		Run: func(cmd *cobra.Command, args []string) {
			if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
				... ...
			}
		},
	}
	fs := cmd.Flags()
	namedFlagSets := opts.Flags()
	verflag.AddFlags(namedFlagSets.FlagSet("global"))
	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
	for _, f := range namedFlagSets.FlagSets {
		fs.AddFlagSet(f)
	}

	... ...

	return cmd
}

// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {
	... ...
	return Run(ctx, cc, registryOptions...)
}

// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
 ... ...

	// Create the scheduler.
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.PodInformer,
		cc.Recorder,
		ctx.Done(),
		scheduler.WithName(cc.ComponentConfig.SchedulerName),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
		scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithFrameworkPlugins(cc.ComponentConfig.Plugins),
		scheduler.WithFrameworkPluginConfig(cc.ComponentConfig.PluginConfig),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
	)
  ... ...

	// Prepare the event broadcaster.
	if cc.Broadcaster != nil && cc.EventClient != nil {
		cc.Broadcaster.StartRecordingToSink(ctx.Done())
	}
	if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
		cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
	}
  ... ...

	// Start all informers.
	go cc.PodInformer.Informer().Run(ctx.Done())
	cc.InformerFactory.Start(ctx.Done())
   
   ... ...

	// If leader election is enabled, runCommand via LeaderElector until done and exit.
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			OnStartedLeading: sched.Run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		}
    ... ...

		leaderElector.Run(ctx)

		return fmt.Errorf("lost lease")
	}

	// Leader election is disabled, so runCommand inline until done.
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

主要有两个公有方法:NewSchedulerCommand和Run, 在入口函数中调用的commond.Execute则会执行runCommandd,继而调用到Run方法。

  • NewSchedulerCommand a. NewOptions: 建立一个新的options,该options返回kube-scheduler的默认配置 b. &cobra.Command{}: 定义command c. AddFlags: 显示的注册flags
  • Run a. scheduler.New: 创建scheuler实例 b. cc.InformerFactory.Start: 开启所有的事件通知 c. leaderElector.Run: 进行leader选举,如果配置中设置为True的话 d. sched.Run: 无论是否进行leader election,最后都会执行该方法,开启真正的调度

4.3 scheduler主类 pkg/scheduler/scheduler.go

代码语言:txt
复制
// New returns a Scheduler
func New(client clientset.Interface,
	  ... ...

	options := defaultSchedulerOptions
    ... ...

	configurator := &Configurator{
		... ...
	}

	var sched *Scheduler
	source := options.schedulerAlgorithmSource
	switch {
	case source.Provider != nil:
		// Create the config from a named algorithm provider.
		sc, err := configurator.CreateFromProvider(*source.Provider)
		... ...
	case source.Policy != nil:
		// Create the config from a user specified policy source.
		... ...
		sc, err := configurator.CreateFromConfig(*policy)
		... ...
		sched = sc

	}
	

	AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
	return sched, nil
}

将option转化为Configurator,然后指定调度算法源(预选、优选的算法),通过provider和config的方式。config方式最终会调用CreateFromKeys,通过指定key选择指定的算法。

  • AddEventHandlers:指定pod,node, svc, pv等的事件回调处理,pod queue也是这里维护
  • Run: 会一直调用scheduleOne方法,逐一的对没有bind的pod进行调度
代码语言:txt
复制
// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
	fwk := sched.Framework

	podInfo := sched.NextPod()
    ... ...
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
	if err != nil {
		... ... 
			if sched.DisablePreemption {
				... ... 
			} else {
				... ...
				sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
			}
	}
		... ...

	// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
	err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
	... ...
	go func() {
		... ... 

		err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
		... ...
	}
}
  • sched.NextPod(): 从维护的internalqueue取出pod
  • sched.Algorithm.Schedule: 开始调度,选出合适的node,封装在generic_scheduler.go中
  • sched.preempt:如果调度失败(当前没有适合的node调度),所以判断是否需要抢占调度,也封装在generic_scheduler.go中,抢占调度成功只有,会将牺牲(被抢占)的pods进行移除
  • sched.assume: 对于调度成功的pod做假设,给该pod的NodeName添加了调度的SuggestHost,写入到cache中,后续才是真正的bind,因为bind比较耗时,后面异步去做
  • sched.bind:使用协程,异步绑定pod到node上, bind方法比较简单,调用api server方法进行bind: b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)

4.4 预选、优选 及 抢占 pkg/scheduler/core/generic_scheduler.go

代码语言:txt
复制
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	... ...
	filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod)
	... ...
	// When only one node after predicate, just use it.
	if len(filteredNodes) == 1 {
		... ...
		return ScheduleResult{
			SuggestedHost:  filteredNodes[0].Name,
			EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses),
			FeasibleNodes:  1,
		}, nil
	}
    ... ...
	priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes)
	... ...
	host, err := g.selectHost(priorityList)
	trace.Step("Prioritizing done")

	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses),
		FeasibleNodes:  len(filteredNodes),
	}, err
}
  • g.findNodesThatFit: 预选算法,找到合适的nodes
  • g.prioritizeNodes: 如果预选算法只有一个node,则直接使用,立即return,如果有多个,则需要进行优选算法,优选算法会对每一个node进行打分
  • g.selectHost:从优选的结果中选出得分最高的,如果最高分有多个,则随机选取一个node
代码语言:txt
复制
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
	var filtered []*v1.Node
	... ...

		checkNode := func(i int) {
			// We check the nodes starting from where we left off in the previous scheduling cycle,
			// this is to make sure all nodes have the same chance of being examined across pods.
			nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes]
			fits, failedPredicates, status, err := g.podFitsOnNode(
				ctx,
				state,
				pod,
				meta,
				nodeInfo,
				g.alwaysCheckAllPredicates,
			)
			... ...
			if fits {
				length := atomic.AddInt32(&filteredLen, 1)
				if length > numNodesToFind {
					cancel()
					atomic.AddInt32(&filteredLen, -1)
				} else {
					filtered[length-1] = nodeInfo.Node()
				}
			} 
			... ...
		}

		// Stops searching for more nodes once the configured number of feasible nodes
		// are found.
		workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)
		... ...

	if len(filtered) > 0 && len(g.extenders) != 0 {
		for _, extender := range g.extenders {
			... ...
			filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
			... ...
		}
	}
	return filtered, failedPredicateMap, filteredNodesStatuses, nil
}
  • workqueue.ParallelizeUntil: 预选法会使用并行调用checkNode, 这里使用16个协程。
  • g.podFitsOnNode: 计算pod调度到这个node上是否合适,需要注意,这里指定了numNodesToFind,如果大于numNodesToFind,则cancle,外部没有修改percentageOfNodesToScore使其大于等于1,则这个值是100,超过100则需要按照比例计算。该方法在预选 和 抢占 都会被调用。这里有执行两次和添加nominated的逻辑。
  • extender.Filter: 如果适合的node个数大于0 且 有extender,则会调用extender的filter方法,如果filter之后个数为0,则break返回
代码语言:txt
复制
func (g *genericScheduler) prioritizeNodes(
	ctx context.Context,
	state *framework.CycleState,
	pod *v1.Pod,
	meta interface{},
	nodes []*v1.Node,
) (framework.NodeScoreList, error) {
	// If no priority configs are provided, then all nodes will have a score of one.
	// This is required to generate the priority list in the required format
	if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
  ... ... 
	workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
		nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
		for i := range g.prioritizers {
			var err error
			results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
			if err != nil {
				appendError(err)
				results[i][index].Name = nodes[index].Name
			}
		}
	})

	for i := range g.prioritizers {
		if g.prioritizers[i].Reduce == nil {
			continue
		}
		wg.Add(1)
		go func(index int) {
			metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()
			defer func() {
				metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
				wg.Done()
			}()
			if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
				appendError(err)
			}
			... ...
		}(i)
	}
	... ...
	for i := range nodes {
		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
		for j := range g.prioritizers {
			result[i].Score += results[j][i].Score * g.prioritizers[j].Weight
		}

		for j := range scoresMap {
			result[i].Score += scoresMap[j][i].Score
		}
	}

	if len(g.extenders) != 0 && nodes != nil {
		... ...
			go func(extIndex int) {
				... ...
				prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
			... ...
				for i := range *prioritizedList {
					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
					... ...
					combinedScores[host] += score * weight
				}
				mu.Unlock()
			}(i)
		}
... ...
	return result, nil
}

优选算法的大致流程和预选算法类似,16协程并发计算得分,得分范围为0-10,最终各个算法的得分相加得到总分,不过这里沿用了Map-Reduce的思想,也支持extender计算score(指定权重)

代码语言:txt
复制
func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
	// Scheduler may return various types of errors. Consider preemption only if
	// the error is of type FitError.
	fitError, ok := scheduleErr.(*FitError)
	if !ok || fitError == nil {
		return nil, nil, nil, nil
	}
	if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
		klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
		return nil, nil, nil, nil
	}
	if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 {
		return nil, nil, nil, ErrNoNodesAvailable
	}
	potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError)
	if len(potentialNodes) == 0 {
		klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
		// In this case, we should clean-up any existing nominated node name of the pod.
		return nil, nil, []*v1.Pod{pod}, nil
	}
	var (
		pdbs []*policy.PodDisruptionBudget
		err  error
	)
	if g.pdbLister != nil {
		pdbs, err = g.pdbLister.List(labels.Everything())
		if err != nil {
			return nil, nil, nil, err
		}
	}
	nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs)
	if err != nil {
		return nil, nil, nil, err
	}

	// We will only check nodeToVictims with extenders that support preemption.
	// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
	// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
	nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
	if err != nil {
		return nil, nil, nil, err
	}

	candidateNode := pickOneNodeForPreemption(nodeToVictims)
	if candidateNode == nil {
		return nil, nil, nil, nil
	}

	// Lower priority pods nominated to run on this node, may no longer fit on
	// this node. So, we should remove their nomination. Removing their
	// nomination updates these pods and moves them to the active queue. It
	// lets scheduler find another place for them.
	nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
	if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
		return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
	}

	return nil, nil, nil, fmt.Errorf(
		"preemption failed: the target node %s has been deleted from scheduler cache",
		candidateNode.Name)
}
  • podEligibleToPreemptOthers: 决定一个pod是否有资格抢占其他pod,是否开启了抢占,是否已经抢占过了 以及优先级判定
  • nodesWherePreemptionMightHelp: 从node上移除预选失败的pods
  • g.selectNodesForPreemption: 选出所有备选的可以抢占的node,16个协程并发执行selectVictimsOnNode,这里有PDB规则的约束
  • pickOneNodeForPreemption: 从备选的可抢占的node中选出一个,有对应规则:违反PDB规则最少、最低优先级的pod被牺牲、被牺牲的pod的优先级之和最小,优先级之和相同则找出pod数最少,pod数也相同则找出时间创建最早
  • g.getLowerPriorityNominatedPods: 找出更低优先级的并且在该node上不满足的pod,从nominated中移除,准备下次调度

后记

scheduler的代码不是很多,流程及思路也比较清晰,但其中有比较多的细节,本文中并没有写出来。另外在1.17.0-rc.1中存在很多framework的代码,支持自定义插件,并在对应的执行流程中得到调用,增加的框架的灵活性。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 简介
    • 1.1 scheduler的作用:
      • 1.2 scheduler的原则:
      • 2. 流程概览
      • 3. 调度策略一览,按照优先顺序
      • 4. 源码分析
        • 4.1 入口函数 cmd/kube-scheduler/schduler.go
          • 4.2 scheduler服务封装 cmd/kube-scheduler/app/server.go
            • 4.3 scheduler主类 pkg/scheduler/scheduler.go
              • 4.4 预选、优选 及 抢占 pkg/scheduler/core/generic_scheduler.go
              • 后记
              相关产品与服务
              容器服务
              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档