前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler

Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler

作者头像
西凉风雷
发布2022-11-23 19:35:54
2390
发布2022-11-23 19:35:54
举报

0. 前言

  • 继续上一篇博客阅读 Kubernetes 源码,参照《k8s 源码阅读》首先学习 Kubernetes 的一些核心组件,首先是 kube-scheduler
  • 本文严重参考原文:《k8s 源码阅读》之 2.2 章节:scheduler,加入部分自己阅读的体会作为自己的阅读笔记
  • 感谢《k8s 源码阅读》的作者们辛苦编写教材,在此郑重表示感谢,望大家多多支持!~

1. 整体设计

1.1 概述

  • 官网描述:
    • The Kubernetes scheduler runs as a process alongside the other master components such as the API server.
    • Its interface to the API server is to watch for Pods with an empty PodSpec.NodeName, and for each Pod,
    • it posts a binding indicating where the Pod should be scheduled.
  • Scheduler 是相对独立的一个组件,主动访问 API server,寻找等待调度的 Pod(PodSpec.NodeName 为空)
  • 然后通过一系列调度算法寻找哪个 Node 适合跑这个 Pod
  • 然后将这个 Pod 和 Node 的绑定关系发给 API server,即整个调度流程

1.2 源码层次

  • cmd/kube-scheduler/scheduler.go:main() 函数入口位置,在 Scheduler 过程开始前的一系列初始化工作
  • pkg/scheduler/scheduler.go:调度框架整体逻辑,抽象调用各个算法的 Interface
  • pkg/scheduler/core/generic_scheduler.go:计算哪些 Node适合跑哪些 Pod 的具体算法

1.3 调度流程

  • 通过一系列的 Predicates(预选) 过滤掉不能运行 Pod 的 Node
    • 比如一个 Pod 需要 500M 的内存,有些节点剩余内存只有 100M 了,就会被剔除
  • 通过一系列的 Priorities(优选)给剩下的 Node 排一个分数,寻找能够运行 Pod 的 Node 中最合适的一个
  • 得分最高的一个 Node 胜出,获得了运行 Pod 的资格
  • 若是上述预选过程中没有找到任何一个合适的 Node,则进入抢占模式,移除部分低优先级的 Pod,再选择 Node

1.4 Predicates 和 Priorities 策略

  • Predicates 是用于过滤不合适 Node的策略
  • Priorities 是用于计算 Node 分数的策略(作用在通过 Predicates 过滤的 Node上)
  • Kubernetes 默认内建了一些 Predicates 和 Priorities 策略,代码分别在:
    • pkg/scheduler/algorithm/predicates/predicates.go
    • pkg/scheduler/algorithm/priorities/

1.5 调度策略的修改

  • 默认调度策略是通过 defaultPredicates() 和 defaultPriorities() 函数定义的
  • 代码在 pkg/scheduler/algorithmprovider/defaults/defaults.go
  • 可以通过命令行 flag --policy-config-file 来覆盖默认行为
    • 可以修改 pkg/scheduler/algorithm/predicates/predicates.go/pkg/scheduler/algorithm/priorities/,然后注册到 defaultPredicates() 和 defaultPriorities() 来实现
    • 或者通过配置文件:
代码语言:javascript
复制
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
    {"name" : "PodFitsHostPorts"},
    {"name" : "PodFitsResources"},
    {"name" : "NoDiskConflict"},
    {"name" : "NoVolumeZoneConflict"},
    {"name" : "MatchNodeSelector"},
    {"name" : "HostName"}
    ],
"priorities" : [
    {"name" : "LeastRequestedPriority", "weight" : 1},
    {"name" : "BalancedResourceAllocation", "weight" : 1},
    {"name" : "ServiceSpreadingPriority", "weight" : 1},
    {"name" : "EqualPriority", "weight" : 1}
    ],
"hardPodAffinitySymmetricWeight" : 10,
"alwaysCheckAllPredicates" : false
}

2. 启动前逻辑

  • Scheduler 可以分为三层,第一层是调度器启动前的逻辑,包括:命令行参数解析、参数校验、调度器初始化等一系列逻辑

2.1 Cobra

2.1.1 简介

  • Cobra 既是一个创建强大的现代化命令行程序的库,又是一个用于生成应用和命令行文件的程序。有很多流行的 Golang 项目用了 Cobra(Kubernetes 和 Docker)

2.1.2 具体使用

  • Go 1.11 版本以上先暂时关闭 go modules,安装 Cobra:go get -u github.com/spf13/cobra/cobra
    • 若是网卡,可能导致安装不成功,可以先下载报错的依赖包,再重新 go get 安装
    • 安装成功后会看到 $GOPATH/
  • 进入 $GOPATH/src 目录,初始化项目:cobra init myapp --pkg-name myapp
代码语言:javascript
复制
$ cobra init myapp --pkg-name myapp
Your Cobra applicaton is ready at
/home/ao/go/src/myapp
$ ls myapp 
cmd  LICENSE  main.go
$ pwd
/home/ao/go/src
  • 本地可以看到一个 main.go 和一个 cmd 目录
  • 查看 cmd/root.go:
代码语言:javascript
复制
package cmd

import (
  "fmt"
  "os"
  "github.com/spf13/cobra"

  homedir "github.com/mitchellh/go-homedir"
  "github.com/spf13/viper"

)


var cfgFile string


// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
  Use:   "myapp",
  Short: "A brief description of your application",
  Long: `A longer description that spans multiple lines and likely contains
examples and usage of using your application. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
  // Uncomment the following line if your bare application
  // has an action associated with it:
  //    Run: func(cmd *cobra.Command, args []string) { },
}
  • 查看 main.go:
代码语言:javascript
复制
package main

import "myapp/cmd"

func main() {
  cmd.Execute()
}
  • main.go 里 import了 myapp/cmd(也就是 root.go 文件)
    • 在 Execute 里面调用了 rootCmd.Execute() 方法
    • rootCmd 是*cobra.Command 类型
  • 添加新的命令:cobra add version
代码语言:javascript
复制
$ cobra add version                
version created at /home/ao/go/src/myapp% 
  • 查看 cmd/version.go:
    • init() 函数里面调用 rootCmd.AddCommand(versionCmd),子命令就是 version
代码语言:javascript
复制
package cmd

import (
    "fmt"

    "github.com/spf13/cobra"
)

// versionCmd represents the version command
var versionCmd = &cobra.Command{
    Use:   "version",
    Short: "A brief description of your command",
    Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
    Run: func(cmd *cobra.Command, args []string) {
        fmt.Println("version called")
    },
}

func init() {
    rootCmd.AddCommand(versionCmd)
    ......
}
  • 添加多级子命令:
代码语言:javascript
复制
$ cobra add server 
server created at /home/ao/go/src/myapp%                                                                                         
$ cobra add create -p serverCmd
create created at /home/ao/go/src/myapp%    
  • 查看 cmd/create.go 文件:
    • init() 函数调用 serverCmd.AddCommand(createCmd),表示 create 为子命令
代码语言:javascript
复制
package cmd

import (
    "fmt"

    "github.com/spf13/cobra"
)

// createCmd represents the create command
var createCmd = &cobra.Command{
    Use:   "create",
    Short: "A brief description of your command",
    Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
    Run: func(cmd *cobra.Command, args []string) {
        fmt.Println("create called")
    },
}

func init() {
    serverCmd.AddCommand(createCmd)
    ......
}

2.2 Scheduler 的 main 函数

  • 下面正式进入 Kubernetes 的 Scheduler 源码阅读
  • 查看 cmd/kube-scheduler/scheduler.go,删除非主干代码:
代码语言:javascript
复制
func main() {
    command := app.NewSchedulerCommand()
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}
  • 查看定义 NewSchedulerCommand 的 cmd/kube-scheduler/app/server.go,删除非主干代码:
代码语言:javascript
复制
/ NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }
    return cmd
}
  • Schduler 启动时调用了 runCommand(cmd, args, opts),查看 cmd/kube-scheduler/app/server.go 关于 runCommand 定义,删除非主干代码:
代码语言:javascript
复制
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
    c, err := opts.Config()
    stopCh := make(chan struct{})
    // Get the completed config
    cc := c.Complete()
    return Run(cc, stopCh)
}
  • 处理配置问题后调用了一个 Run() 函数,Run() 的作用是基于给定的配置启动 Scheduler,它只会在出错时或者 channel stopCh 被关闭时才退出,查看 cmd/kube-scheduler/app/server.go 关于 Run 定义,删除非主干代码:
代码语言:javascript
复制
// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName))

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) 
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}
  • 最终是要跑 sched.Run() 这个方法来启动 Scheduler,sched.Run() 方法就是 Scheduler 框架真正运行的逻辑
  • 上述有一个 sched 变量,linux 里经常会看到一些软件叫 ***d,d 也就是 daemon,守护进程的意思,也就是一直跑在后台的一个程序
    • 这里的 sched 也就是指 “Scheduler Daemon“
    • sched 的其实是 *Scheduler 类型,定义在 pkg/scheduler/scheduler.go
      • Scheduler 监控新创建的未被调度的 Pods,然后尝试寻找合适的 Node,回写一个绑定关系到 API Server
      • 这里也可以体会到 Daemon 的感觉,我们平时搭建的 k8s 集群中运行着一个 Daemon 进程叫做 kube-scheduler,在程序里面也就对应这样一个对象:Scheduler
代码语言:javascript
复制
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
    config *factory.Config
}
  • Scheduler 结构体中的 Config 属性对象定义在 pkg/scheduler/factory/factory.go,主要属性如下:
代码语言:javascript
复制
// Config is an implementation of the Scheduler's configured input data.
type Config struct {
    // It is expected that changes made via SchedulerCache will be observed
    // by NodeLister and Algorithm.
    SchedulerCache schedulerinternalcache.Cache
    // Ecache is used for optimistically invalid affected cache items after
    // successfully binding a pod
    Ecache     *equivalence.Cache
    NodeLister algorithm.NodeLister
    Algorithm  algorithm.ScheduleAlgorithm
    GetBinder  func(pod *v1.Pod) Binder
    // PodConditionUpdater is used only in case of scheduling errors. If we succeed
    // with scheduling, PodScheduled condition will be updated in apiserver in /bind
    // handler so that binding and setting PodCondition it is atomic.
    PodConditionUpdater PodConditionUpdater
    // PodPreemptor is used to evict pods and update pod annotations.
    PodPreemptor PodPreemptor

    // NextPod should be a function that blocks until the next pod
    // is available. We don't use a channel for this, because scheduling
    // a pod may take some amount of time and we don't want pods to get
    // stale while they sit in a channel.
    NextPod func() *v1.Pod

    // SchedulingQueue holds pods to be scheduled
    SchedulingQueue internalqueue.SchedulingQueue
}

3. 整体调度框架

3.1 启动

  • 调度的第二层:负责 Scheduler 除了具体 Node 过滤算法外的工作逻辑
  • Scheduler 对象绑定了一个 Run() 方法,定义如下:
代码语言:javascript
复制
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
  • Run 函数在 cmd/kube-scheduler/app/server.go 调用:
代码语言:javascript
复制
    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }
  • 调用了 sched.Run() 之后就等待 ctx.Done()
  • wait.Until 实现:每隔 n 时间调用 f 一次,除非 channel c 被关闭
    • 这里的 n 就是 0,也就是一直调用,前一次调用返回下一次调用就开始了
    • f 指 sched.scheduleOne,c 指 sched.config.StopEverything

3.2 一个 Pod 的基本调度流程

  • scheduleOne 实现 1 个 Pod 的完整调度工作流
  • 这个过程是顺序执行的,也就是非并发的
    • 即前一个 Pod 的 scheduleOne 一完成,下一个 Pod 的 ScheduleOne 立马接着执行
  • scheduleOne 的主要逻辑:
代码语言:javascript
复制
func (sched *Scheduler) scheduleOne() {
    pod := sched.config.NextPod()
    suggestedHost, err := sched.schedule(pod)
    if err != nil {
        if fitError, ok := err.(*core.FitError); ok {
            preemptionStartTime := time.Now()
            sched.preempt(pod, fitError)
        }
        return
    }
    assumedPod := pod.DeepCopy()
    allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
    err = sched.assume(assumedPod, suggestedHost)
    go func() {
        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: suggestedHost,
            },
        })
    }()
}
  • 参考流程如下:

3.3 Schedule 算法计算合适 Node

  • 主流程核心步骤是 suggestedHost, err := sched.schedule(pod)
  • 这里完成了非抢占模式下 Node 的计算,包括预选过程、优选过程等
  • sched.schedule(pod) 方法定义见 pkg/scheduler/scheduler.go
代码语言:javascript
复制
// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
    host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
    if err != nil {
        pod = pod.DeepCopy()
        sched.config.Error(pod, err)
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
        sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
            Type:          v1.PodScheduled,
            Status:        v1.ConditionFalse,
            LastProbeTime: metav1.Now(),
            Reason:        v1.PodReasonUnschedulable,
            Message:       err.Error(),
        })
        return "", err
    }
    return host, err
}
  • 里面调用了 sched.config.Algorithm.Schedule() 方法(本身为接口),定义见 pkg/scheduler/algorithm/scheduler_interface.go
代码语言:javascript
复制
type ScheduleAlgorithm interface {
    Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
    Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
    Predicates() map[string]FitPredicate
    Prioritizers() []PriorityConfig
}
  • 这个接口有 4 个方法,实现 ScheduleAlgorithm 接口的对象实现如何调度 Pods 到 Nodes 上
  • 默认的实现是 pkg/scheduler/core/generic_scheduler.gogenericScheduler 对象
    • Schedule():给定 Pod 和 Nodes,计算出一个适合跑 Pod 的 Node 并返回
    • Preempt():抢占模式
    • Predicates():预选过程
    • Prioritizers():优选过程

4. 一般调度过程

6. 参考文献

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0. 前言
  • 1. 整体设计
    • 1.1 概述
      • 1.2 源码层次
        • 1.3 调度流程
          • 1.4 Predicates 和 Priorities 策略
            • 1.5 调度策略的修改
            • 2. 启动前逻辑
              • 2.1 Cobra
                • 2.1.1 简介
                • 2.1.2 具体使用
              • 2.2 Scheduler 的 main 函数
              • 3. 整体调度框架
                • 3.1 启动
                  • 3.2 一个 Pod 的基本调度流程
                    • 3.3 Schedule 算法计算合适 Node
                    • 4. 一般调度过程
                    • 6. 参考文献
                    相关产品与服务
                    命令行工具
                    腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档