在 Kubernetes 中,调度 (scheduling) 指的是确保 Pod 匹配到合适的节点, 以便 kubelet 能够运行它们。 调度的工作由调度器和控制器协调完成。
调度器通过 Kubernetes 的监测(Watch)机制来发现集群中新创建且尚未被调度到节点上的 Pod。 调度器会将所发现的每一个未调度的 Pod 调度到一个合适的节点上来运行。 调度器会依据下文的调度原则来做出调度选择。控制器则会将调度写入 Kubernetes 的API Server中。
kube-scheduler 是 Kubernetes 集群的默认调度器,并且是集群 控制面 的一部分。对每一个新创建的 Pod 或者是未被调度的 Pod,kube-scheduler 会选择一个最优的节点去运行这个 Pod。
在一个集群中,满足一个 Pod 调度请求的所有节点称之为 可调度节点。 如果没有任何一个节点能满足 Pod 的资源请求, 那么这个 Pod 将一直停留在未调度状态直到调度器能够找到合适的 Node。
调度器先在集群中找到一个 Pod 的所有可调度节点,然后根据一系列函数对这些可调度节点打分, 选出其中得分最高的节点来运行 Pod。之后,调度器将这个调度决定通知给 kube-apiserver,这个过程叫做 绑定(bind)。
在做调度决定时需要考虑的因素包括:单独和整体的资源请求、硬件/软件/策略限制、 亲和以及反亲和要求、数据局部性、负载间的干扰等等。
kube-scheduler 给一个 Pod 做调度选择时包含两个步骤:
Pod 内的每一个容器对资源都有不同的需求, 而且 Pod 本身也有不同的需求。因此,Pod 在被调度到节点上之前, 根据这些特定的调度需求,需要对集群中的节点进行一次过滤。
过滤阶段会将所有满足 Pod 调度需求的节点选出来。例如, PodFitsResources 过滤函数会检查候选节点的可用资源能否满足 Pod 的资源请求。 在过滤之后,得出一个节点列表,里面包含了所有可调度节点;通常情况下, 这个节点列表包含不止一个节点。如果这个列表是空的,代表这个 Pod 不可调度。
在打分阶段,调度器会为 Pod 从所有可调度节点中选取一个最合适的节点。 根据当前启用的打分规则,调度器会给每一个可调度节点进行打分。最后, kube-scheduler 会将 Pod 调度到得分最高的节点上。 如果存在多个得分最高的节点, kube-scheduler 会从中随机选取一个。
你可以通过修改配置文件 (KubeSchedulerConfiguration) 中的调度策略 (Scheduling Policies) 和调度配置 (Scheduling Profiles) ,可以定义自己的配置调度器的过滤和打分行为。调度策略(Scheduling Policies) 允许你配置过滤所用的 断言 (Predicates) 和打分所用的 优先级 (Priorities);调度配置(Scheduling Profiles)允许你配置实现不同调度阶段的插件,包括:QueueSort、Filter、Score、Bind、Reserve、Permit 等等。
Kubernetes 调度器中大多数的调度功能,通过调度框架 (framework) 这一插件架构中一个一个具体的调度插件实现。它通过向现有的调度器添加了一组新的“插件” API,编译过程中插件编与调度器打包。
调度框架 (framework) 定义了一些扩展点。调度器插件注册后在一个或多个扩展点处被调用。 这些插件中的一些可以改变调度决策,而另一些仅用于提供信息。
每次调度一个 Pod 的尝试都分为两个阶段,调度周期和绑定周期。
调度周期为 Pod 选择一个节点,绑定周期将该决策应用于集群。 调度周期和绑定周期一起被称为“调度上下文”。
调度周期是串行运行的,而绑定周期可能是同时运行的。
如果确定 Pod 不可调度或者存在内部错误,则可以终止调度周期或绑定周期。 Pod 将返回队列并重试。
下图显示了一个 Pod 的调度上下文以及调度框架公开的扩展点。 在此图片中,“过滤器”等同于“断言”,“评分”相当于“优先级函数”。
一个插件可以在多个扩展点处注册,以执行更复杂或有状态的任务。
例如,假设一个 BlinkingLightScorer 插件基于具有的闪烁指示灯数量来对节点进行排名。
func ScoreNode(_ *v1.pod, n *v1.Node) (int, error) {
return getBlinkingLightCount(n)
}
然而,最大的闪烁灯个数值可能比 NodeScoreMax 小。要解决这个问题, BlinkingLightScorer 插件还应该注册该扩展点。
func NormalizeScores(scores map[string]int) {
highest := 0
for _, score := range scores {
highest = max(highest, score)
}
for node, score := range scores {
scores[node] = score*NodeScoreMax/highest
}
}
如果任何 NormalizeScore 插件返回错误,则调度阶段将终止。
说明: 希望执行“预保留”工作的插件应该使用 NormalizeScore 扩展点。
这个是调度周期的最后一步。 一旦 Pod 处于保留状态,它将在绑定周期结束时触发 Unreserve 插件 (失败时)或 PostBind 插件(成功时)。
如果任何 PreBind 插件返回错误,则 Pod 将被 拒绝 并且 退回到调度队列中。
使用此扩展点的插件通常也使用 Reserve。
插件 API 分为两个步骤。首先,插件必须完成注册并配置,然后才能使用扩展点接口。 扩展点接口具有以下形式。
type Plugin interface {
Name() string
}
type QueueSortPlugin interface {
Plugin
Less(*v1.pod, *v1.pod) bool
}
type PreFilterPlugin interface {
Plugin
PreFilter(context.Context, *framework.CycleState, *v1.pod) error
}
// ...
下面分析一下 kube-scheduler 中自带的 nodeports 插件的代码,该插件负责筛选 Pod 所需端口还未被占用的节点。这里只贴一下 nodeports.go 文件中的代码, PreFilter 和 Filter 插件在kube-scheduler被调用的代码有兴趣的小伙伴可以自己去扒一下。
// kubernetes/pkg/scheduler/framework/plugins/nodeports/nodeports.go
package nodeports
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)
type NodePorts struct{}
// 实现了NodePorts结构实现了 framework.PreFilterPlugin、framework.FilterPlugin、framework.EnqueueExtensions三个接口
var _ framework.PreFilterPlugin = &NodePorts{}
var _ framework.FilterPlugin = &NodePorts{}
var _ framework.EnqueueExtensions = &NodePorts{}
const (
Name = names.NodePorts
// 将插件名放入preFilterStateKey 防止与其他插件命名冲突
preFilterStateKey = "PreFilter" + Name
// 错误提示
ErrReason = "node(s) didn't have free ports for the requested pod ports"
)
type preFilterState []*v1.ContainerPort
// 复制 prefilter 的状态.
func (s preFilterState) Clone() framework.StateData {
// The state is not impacted by adding/removing existing pods, hence we don't need to make a deep copy.
return s
}
// Name函数返回插件名
func (pl *NodePorts) Name() string {
return Name
}
// 查询并返回所有 Pods 中用到的端口
func getContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort {
ports := []*v1.ContainerPort{}
for _, pod := range pods {
for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j]
for k := range container.Ports {
ports = append(ports, &container.Ports[k])
}
}
}
return ports
}
// PreFilter 扩展点
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
s := getContainerPorts(pod)
cycleState.Write(preFilterStateKey, preFilterState(s))
return nil, nil
}
// 未定义 PreFilterExtensions 扩展点
func (pl *NodePorts) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
// 如果 cycleState 中 preFilterState 不存在, 很有可能是因为 PreFilter 还未被调用
return nil, fmt.Errorf("reading %q from cycleState: %w", preFilterStateKey, err)
}
s, ok := c.(preFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to nodeports.preFilterState error", c)
}
return s, nil
}
// 反馈 一个 Pod 调度失败可能的错误原因
func (pl *NodePorts) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
// Due to immutable fields `spec.containers[*].ports`, pod update events are ignored.
{Resource: framework.Pod, ActionType: framework.Delete},
{Resource: framework.Node, ActionType: framework.Add | framework.Update},
}
}
// Filter 扩展点
func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
wantPorts, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
fits := fitsPorts(wantPorts, nodeInfo)
if !fits {
return framework.NewStatus(framework.Unschedulable, ErrReason)
}
return nil
}
// 检查 Pod 是否能被调度到节点上(端口是否被占用)
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
return fitsPorts(getContainerPorts(pod), nodeInfo)
}
func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
// existingPorts 和 wantPorts 存在冲突,则节点不适合被调度
existingPorts := nodeInfo.UsedPorts
for _, cp := range wantPorts {
if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
return false
}
}
return true
}
// 插件初始化
func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &NodePorts{}, nil
}
你可以在调度器配置中启用或禁用插件。 如果你在使用 Kubernetes v1.18 或更高版本,大部分调度插件都在使用中且默认启用。
kube-scheduler的主流程如下图, kube-scheduler 初始化一个 informer 队列,存放未调度的 pod ;初始化一个 informer 缓存队列,存放拥有调度中间状态的pod、node等对象。实际执行调度任务的是sched.scheduleOne方法,它每次从未调度队列中娶取出一个 Pod ,经过预选与优选算法,最终选出一个最优 node ,上述步骤都成功则更新缓存队列并异步执行 bind 操作,也就是更新 pod 的 nodeName 字段,失败则进入抢占逻辑,至此一个 pod 的调度工作完成。
看一下 kube-scheduler 入口函数的代码:
// kubernetes/cmd/kube-scheduler/app/server.go
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// 启动事件处理流程
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()
// 启动健康检查
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
// 启动健康检查服务
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// 启动所有Informer
cc.InformerFactory.Start(ctx.Done())
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// 等待调度完成后,更新缓存队列
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
// 如果开启了选主选,则进入选主流程。选主结束后开始 Pod 调度
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
klog.InfoS("Requested to terminate, exiting")
os.Exit(0)
default:
klog.ErrorS(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// 没有开启选主,则直接进入 Pod 调度流程。
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
调度器会先执行schedule函数,从未调度队列中取出一个 Pod 后,带着这个 Pod 的相关信息进入 scheduleOne 函数开始对这个 Pod 进行调度:
// kubernetes/pkg/scheduler/schedule_one.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
// 没有找到一下一个 Pod 时,结束本次调度
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
fwk, err := sched.frameworkForPod(pod)
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
klog.ErrorS(err, "Error occurred")
return
}
if sched.skipPodSchedule(fwk, pod) {
return
}
klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, assumedPodInfo := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, podsToActivate, start)
if scheduleResult.FeasibleNodes == 0 {
return
}
// 异步执行 Pod Bind 任务
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start)
}()
}
schedulingCycle 中封装了整个调度过程的逻辑主体,调度框架的插件扩展点函数大多数在这里被调度器调度,下面看 chedulingCycle 函数的代码:
func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, podInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) (ScheduleResult, *framework.QueuedPodInfo) {
pod := podInfo.Pod
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
if err != nil {
// 如果调取器无法为 Pod 分配一个适合调度的节点, 则尝试进入抢占逻辑,
// 抢占逻辑试图找到一个节点, 在该节点中删除一个或多个优先级较低的 Pod,如果找到这样的节点,这些优先级较低的 Pod 会被从节点中驱逐。
var nominatingInfo *framework.NominatingInfo
reason := v1.PodReasonUnschedulable
if fitError, ok := err.(*framework.FitError); ok {
if !fwk.HasPostFilterPlugins() {
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
} else {
// 调用 PostFilter 插件
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
if status.Code() == framework.Error {
klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
} else {
fitError.Diagnosis.PostFilterMsg = status.Message()
klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
}
if result != nil {
nominatingInfo = result.NominatingInfo
}
}
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to
// schedule it. (hopefully)
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
} else if err == ErrNoNodesAvailable {
nominatingInfo = clearNominatedNode
// No nodes available is counted as unschedulable rather than an error.
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
} else {
nominatingInfo = clearNominatedNode
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonSchedulerError
}
sched.FailureHandler(ctx, fwk, podInfo, err, reason, nominatingInfo)
return ScheduleResult{}, nil
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// 为了让我们不用等待 binding 事件完成,先将调度完成的状态写入缓存队列中
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// 返回错误
sched.FailureHandler(ctx, fwk, assumedPodInfo, err, v1.PodReasonSchedulerError, clearNominatedNode)
return ScheduleResult{}, nil
}
// 调用 reserve 插件
if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), v1.PodReasonSchedulerError, clearNominatedNode)
return ScheduleResult{}, nil
}
// 调用 permit 插件
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
var reason string
if runPermitStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonSchedulerError
}
// One of the plugins returned status different from success or wait.
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
return ScheduleResult{}, nil
}
// 完成调度
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Clear the entries after activation.
podsToActivate.Map = make(map[string]*v1.Pod)
}
return scheduleResult, assumedPodInfo
}
在配置文件中,对每个扩展点,你可以禁用默认插件或者是启用自己的插件,例如:
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
profiles:
- plugins:
score:
disabled:
- name: PodTopologySpread
enabled:
- name: MyCustomPluginA
weight: 2
- name: MyCustomPluginB
weight: 1
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。