在早期的版本中 NodeController 只有一种,v1.16 版本中 NodeController 已经分为了 NodeIpamController 与 NodeLifecycleController,本文主要介绍 NodeLifecycleController。
NodeLifecycleController 主要功能是定期监控 node 的状态并根据 node 的 condition 添加对应的 taint 标签或者直接驱逐 node 上的 pod。
在介绍 NodeLifecycleController 的源码前有必要先介绍一下 taint 的作用,因为 NodeLifecycleController 功能最终的结果有很大一部分都体现在 node taint 上。
taint 使用效果(Effect):
PreferNoSchedule
:调度器尽量避免把 pod 调度到具有该污点的节点上,如果不能避免(如其他节点资源不足),pod 也能调度到这个污点节点上,已存在于此节点上的 pod 不会被驱逐;NoSchedule
:不容忍该污点的 pod 不会被调度到该节点上,通过 kubelet 管理的 pod(static pod)不受限制,之前没有设置污点的 pod 如果已运行在此节点(有污点的节点)上,可以继续运行;NoExecute
:不容忍该污点的 pod 不会被调度到该节点上,同时会将已调度到该节点上但不容忍 node 污点的 pod 驱逐掉;在 NodeLifecycleController 用到了多个 feature-gates,此处先进行解释下:
NodeLease
:该特性在 v1.12 引入,v1.14 为 Beta 版本并默认启用,在 v1.17 中 GA;NodeDisruptionExclusion
:该特性在 v1.16 引入,Alpha 版本,默认为 false,其功能是当 node 存在 node.kubernetes.io/exclude-disruption
标签时,当 node 网络中断时其节点上的 pod 不会被驱逐掉;LegacyNodeRoleBehavior
:该特性在 v1.16 中引入,Alpha 版本且默认为 true,在创建 load balancers 以及中断处理时不会忽略具有 node-role.kubernetes.io/master
label 的 node,该功能在 v1.19 中将被移除;TaintBasedEvictions
:该特性从 v1.13 开始为 Beta 版本,默认为 true。其功能是当 node 处于 NodeNotReady
、NodeUnreachable
状态时为 node 添加对应的 taint,TaintBasedEvictions
添加的 taint effect 为 NoExecute
,即会驱逐 node 上对应的 pod;TaintNodesByCondition
:该特性从 v1.12 开始为 Beta 版本,默认为 true,v1.17 为 GA 版本。其功能是基于节点状态添加 taint,当节点处于 NetworkUnavailable
、MemoryPressure
、PIDPressure
、DiskPressure
状态时会添加对应的 taint,TaintNodesByCondition
添加的 taint effect 仅为NoSchedule
,即仅仅不会让新创建的 pod 调度到该 node 上; NodeLease
:该特性在 v1.12 引入,v 1.14 为 Beta 版本且默认启用,v 1.17 GA,主要功能是减少 node 的心跳请求以减轻 apiserver 的负担; kubernetes 版本:v1.16
首先还是看 NodeLifecycleController 的启动方法 startNodeLifecycleController
,在 startNodeLifecycleController
中主要调用了 lifecyclecontroller.NewNodeLifecycleController
对 lifecycleController 进行初始化,在该方法中传入了组件的多个参数以及 TaintBasedEvictions
和 TaintNodesByCondition
两个 feature-gates,然后调用了 lifecycleController.Run
启动 lifecycleController,可以看到 NodeLifecycleController 主要监听 lease、pods、nodes、daemonSets 四种对象。
其中在启动时指定的几个参数默认值分别为:
NodeMonitorPeriod
:通过--node-monitor-period
设置,默认为 5s,表示在 NodeController 中同步NodeStatus 的周期;NodeStartupGracePeriod
:--node-startup-grace-period
默认 60s,在 node 启动完成前标记节点为unhealthy 的允许无响应时间;NodeMonitorGracePeriod
:通过--node-monitor-grace-period
设置,默认 40s,表示在标记某个 node为 unhealthy 前,允许 40s 内该 node 无响应;PodEvictionTimeout
:通过--pod-eviction-timeout
设置,默认 5 分钟,表示在强制删除 node 上的 pod 时,容忍 pod 时间;NodeEvictionRate
:通过--node-eviction-rate
设置, 默认 0.1,表示当集群下某个 zone 为 unhealthy 时,每秒应该剔除的 node 数量,默认即每 10s 剔除1个 node;SecondaryNodeEvictionRate
:通过 --secondary-node-eviction-rate
设置,默认为 0.01,表示如果某个 zone 下的 unhealthy 节点的百分比超过 --unhealthy-zone-threshold
(默认为 0.55)时,驱逐速率将会减小,如果集群较小(小于等于 --large-cluster-size-threshold
个 节点 - 默认为 50),驱逐操作将会停止,否则驱逐速率将降为每秒 --secondary-node-eviction-rate
个(默认为 0.01);LargeClusterSizeThreshold
:通过--large-cluster-size-threshold
设置,默认为 50,当该 zone 的节点超过该阈值时,则认为该 zone 是一个大集群;UnhealthyZoneThreshold
:通过--unhealthy-zone-threshold
设置,默认为 0.55,不健康 zone 阈值,会影响什么时候开启二级驱赶速率,即当该 zone 中节点宕机数目超过 55%,认为该 zone 不健康;EnableTaintManager
:--enable-taint-manager
默认为 true,Beta feature,如果为 true,则表示NodeController 将会启动 TaintManager,当已经调度到该 node 上的 pod 不能容忍 node 的 taint 时,由 TaintManager 负责驱逐此类 pod,若不开启该特性则已调度到该 node 上的 pod 会继续存在;TaintBasedEvictions
:默认为 true;TaintNodesByCondition
:默认为 true;k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go:163
func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
ctx.InformerFactory.Coordination().V1beta1().Leases(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Apps().V1().DaemonSets(),
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
)
if err != nil {
return nil, true, err
}
go lifecycleController.Run(ctx.Stop)
return nil, true, nil
}
首先有必要说明一下 NodeLifecycleController 对象中部分字段的意义,其结构体如下所示:
type Controller struct {
taintManager *scheduler.NoExecuteTaintManager
podInformerSynced cache.InformerSynced
kubeClient clientset.Interface
now func() metav1.Time
// 计算 zone 下 node 驱逐速率
enterPartialDisruptionFunc func(nodeNum int) float32
enterFullDisruptionFunc func(nodeNum int) float32
// 计算 zone 状态
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
// 用来记录NodeController observed节点的集合
knownNodeSet map[string]*v1.Node
// 记录 node 最近一次状态的集合
nodeHealthMap map[string]*nodeHealthData
evictorLock sync.Mutex
// 需要驱逐节点上 pod 的 node 队列
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// 需要打 taint 标签的 node 队列
zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
// 将 node 划分为不同的 zone
zoneStates map[string]ZoneState
daemonSetStore appsv1listers.DaemonSetLister
daemonSetInformerSynced cache.InformerSynced
leaseLister coordlisters.LeaseLister
leaseInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
getPodsAssignedToNode func(nodeName string) ([]v1.Pod, error)
recorder record.EventRecorder
// kube-controller-manager 启动时指定的几个参数
nodeMonitorPeriod time.Duration
nodeStartupGracePeriod time.Duration
nodeMonitorGracePeriod time.Duration
podEvictionTimeout time.Duration
evictionLimiterQPS float32
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
unhealthyZoneThreshold float32
// 启动时默认开启的几个 feature-gates
runTaintManager bool
useTaintBasedEvictions bool
taintNodeByCondition bool
nodeUpdateQueue workqueue.Interface
}
NewNodeLifecycleController
的主要逻辑为:
taintManager
相关的 EventHandler;taintManager
相关的 EventHandler;NodeLease
feature-gates;由以上逻辑可以看出,taintManager
以及 NodeLifecycleController 都会 watch node 的变化并进行不同的处理。
k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:268
func NewNodeLifecycleController(......) (*Controller, error) {
......
// 1、初始化 controller 对象
nc := &Controller{
......
}
......
// 2、注册计算 node 驱逐速率以及 zone 状态的方法
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
nc.computeZoneStateFunc = nc.ComputeZoneState
// 3、为 podInformer 注册 EventHandler,监听到的对象会被放到 nc.taintManager.PodUpdated 中
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(nil, pod)
}
},
UpdateFunc: func(prev, obj interface{}) {
prevPod := prev.(*v1.Pod)
newPod := obj.(*v1.Pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(prevPod, newPod)
}
},
DeleteFunc: func(obj interface{}) {
pod, isPod := obj.(*v1.Pod)
if !isPod {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
pod, ok = deletedState.Obj.(*v1.Pod)
if !ok {
return
}
}
if nc.taintManager != nil {
nc.taintManager.PodUpdated(pod, nil)
}
},
})
nc.podInformerSynced = podInformer.Informer().HasSynced
podInformer.Informer().AddIndexers(cache.Indexers{
nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
},
})
podIndexer := podInformer.Informer().GetIndexer()
nc.getPodsAssignedToNode = func(nodeName string) ([]v1.Pod, error) {
objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
if err != nil {
return nil, err
}
pods := make([]v1.Pod, 0, len(objs))
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
pods = append(pods, *pod)
}
return pods, nil
}
// 4、初始化 TaintManager,为 nodeInformer 注册 EventHandler
// 监听到的对象会被放到 nc.taintManager.NodeUpdated 中
if nc.runTaintManager {
podLister := podInformer.Lister()
podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) }
nodeLister := nodeInformer.Lister()
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
// 5、初始化 taintManager
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
})
}
// 6、为 NodeLifecycleController 注册 nodeInformer
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
nc.nodeUpdateQueue.Add(newNode.Name)
return nil
}),
})
......
// 7、检查是否启用了 NodeLease feature-gates
nc.leaseLister = leaseInformer.Lister()
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
} else {
nc.leaseInformerSynced = func() bool { return true }
}
nc.nodeLister = nodeInformer.Lister()
nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
nc.daemonSetStore = daemonSetInformer.Lister()
nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
return nc, nil
}
Run
方法是 NodeLifecycleController 的启动方法,其中会启动多个 goroutine 完成 controller 的功能,主要逻辑为:
nc.taintManager.Run
启动 taintManager;nc.doNodeProcessingPassWorker
处理 nc.nodeUpdateQueue
队列中的 node;TaintBasedEvictions
特性则启动一个 goroutine 调用 nc.doNoExecuteTaintingPass
处理 nc.zoneNoExecuteTainter
队列中的 node,否则调用 nc.doEvictionPass
处理 nc.zonePodEvictor
队列中的 node; nc.monitorNodeHealth
定期监控 node 的状态;k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:455
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer klog.Infof("Shutting down node controller")
if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { return
}
// 1、启动 taintManager
if nc.runTaintManager {
go nc.taintManager.Run(stopCh)
}
defer nc.nodeUpdateQueue.ShutDown()
// 2、执行 nc.doNodeProcessingPassWorker
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
}
// 3、根据是否启用 TaintBasedEvictions 执行不同的处理逻辑
if nc.useTaintBasedEvictions {
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else {
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
}
// 4、执行 nc.monitorNodeHealth
go wait.Until(func() {
if err := nc.monitorNodeHealth(); err != nil {
klog.Errorf("Error monitoring node health: %v", err)
}
}, nc.nodeMonitorPeriod, stopCh)
<-stopCh
}
Run
方法中主要调用了 5 个方法来完成其核心功能:
nc.taintManager.Run
:处理 taintManager
中 nodeUpdateQueue 和 podUpdateQueue 中的 pod 以及 node,若 pod 不能容忍 node 上的 taint 则将其加入到 taintEvictionQueue
中并最终会删除;nc.doNodeProcessingPassWorker
:从 NodeLifecycleController 的 nodeUpdateQueue 取出 node,(1)若启用 taintNodeByCondition
特性时根据 node condition 以及 node 是否调度为 node 添加对应的 NoSchedule
taint 标签;(2)调用 nc.reconcileNodeLabels
为 node 添加默认的 label;nc.doNoExecuteTaintingPass
:处理 nc.zoneNoExecuteTainter
队列中的数据,根据 node 的 NodeReadyCondition 添加或移除对应的 taint;nc.doEvictionPass
:处理 nc.zonePodEvictor
队列中的 node,将 node 上的 pod 进行驱逐;nc.monitorNodeHealth
:持续监控 node 的状态,当 node 处于异常状态时更新 node 的 taint 以及 node 上 pod 的状态或者直接驱逐 node 上的 pod,此外还会为集群下的所有 node 划分 zoneStates 并为每个 zoneStates 设置对应的驱逐速率;下文会详细分析以上 5 种方法的具体实现。
当组件启动时设置 --enable-taint-manager
参数为 true 时(默认为 true),该功能会启用,其主要作用是当该 node 上的 pod 不容忍 node taint 时将 pod 进行驱逐,若不开启该功能则已调度到该 node 上的 pod 会继续存在,新创建的 pod 需要容忍 node 的 taint 才会调度至该 node 上。
主要逻辑为:
tc.worker
处理 nodeUpdateChannels 和 podUpdateChannels 中的数据;k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:185
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
}
}
}(stopCh)
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(item)
return
case tc.podUpdateChannels[hash] <- podUpdate:
}
}
}(stopCh)
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(i, wg.Done, stopCh)
}
wg.Wait()
}
tc.worker
主要功能是调用 tc.handleNodeUpdate
和 tc.handlePodUpdate
处理 tc.nodeUpdateChannels
和 tc.podUpdateChannels
两个 channel 中的数据,但会优先处理 nodeUpdateChannels 中的数据。
k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:243
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
defer done()
for {
select {
case <-stopCh:
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// 优先处理 nodeUpdateChannels
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
break priority
}
}
tc.handlePodUpdate(podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}
tc.handleNodeUpdate
的主要逻辑为:
NoExecute
的 taints;tc.getPodsAssignedToNode
获取该 node 上的所有 pods;tc.processPodOnNode
检查 pod 是否要被驱逐;k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:417
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
node, err := tc.getNode(nodeUpdate.nodeName)
if err != nil {
......
}
// 1、获取 node 的 taints
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
// 2、获取 node 上的所有 pod
pods, err := tc.getPodsAssignedToNode(node.Name)
if err != nil {
klog.Errorf(err.Error())
return
}
if len(pods) == 0 {
return
}
// 3、若不存在 taints,则取消所有的驱逐操作
if len(taints) == 0 {
for i := range pods {
tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
now := time.Now()
for i := range pods {
pod := &pods[i]
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
// 4、调用 tc.processPodOnNode 进行处理
tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
主要逻辑为:
tc.processPodOnNode
进行处理;k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:377
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
if err != nil {
......
}
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
if !ok {
return
}
// 调用 tc.processPodOnNode 进行处理
tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
tc.handlePodUpdate
和 tc.handleNodeUpdate
最终都是调用 tc.processPodOnNode
检查 pod 是否容忍 node 的 taints,tc.processPodOnNode
首先检查 pod 的 tolerations 是否能匹配 node 上所有的 taints,若无法完全匹配则将 pod 加入到 taintEvictionQueue 然后被删除,若能匹配首先获取 pod tolerations 中的最小容忍时间,如果 tolerations 未设置容忍时间说明会一直容忍则直接返回,否则加入到 taintEvictionQueue 的延迟队列中,当达到最小容忍时间时 pod 会被加入到 taintEvictionQueue 中并驱逐。
通常情况下,如果给一个节点添加了一个 effect 值为 NoExecute
的 taint,则任何不能忍受这个 taint 的 pod 都会马上被驱逐,任何可以忍受这个 taint 的 pod 都不会被驱逐。但是,如果 pod 存在一个 effect 值为 NoExecute
的 toleration 指定了可选属性 tolerationSeconds
的值,则表示在给节点添加了上述 taint 之后,pod 还能继续在节点上运行的时间。例如,
tolerations:
- key: "key1"
operator: "Equal"
value: "value1"
effect: "NoExecute"
tolerationSeconds: 3600
这表示如果这个 pod 正在运行,然后一个匹配的 taint 被添加到其所在的节点,那么 pod 还将继续在节点上运行 3600 秒,然后被驱逐。如果在此之前上述 taint 被删除了,则 pod 不会被驱逐。
k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go:339
func (tc *NoExecuteTaintManager) processPodOnNode(......) {
if len(taints) == 0 {
tc.cancelWorkWithEvent(podNamespacedName)
}
// 1、检查 pod 的 tolerations 是否匹配所有 taints
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
tc.cancelWorkWithEvent(podNamespacedName)
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
}
// 2、获取最小容忍时间
minTolerationTime := getMinTolerationTime(usedTolerations)
if minTolerationTime < 0 {
return
}
// 3、若存在最小容忍时间则将其加入到延时队列中
startTime := now
triggerTime := startTime.Add(minTolerationTime)
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
}
tc.cancelWorkWithEvent(podNamespacedName)
}
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
NodeLifecycleController 中 nodeInformer 监听到 node 变化时会将其添加到 nodeUpdateQueue 中,nc.doNodeProcessingPassWorker
主要是处理 nodeUpdateQueue 中的 node,为其添加合适的 NoSchedule
taint 以及 label,其主要逻辑为:
nc.nodeUpdateQueue
中取出 node;TaintNodeByCondition
feature-gates,调用nc.doNoScheduleTaintingPass
检查该 node 是否需要添加对应的 NoSchedule
taint;
nc.doNoScheduleTaintingPass
中的主要逻辑为:NoSchedule
taint;Unschedulable
状态,若为 Unschedulable
也添加对应的 NoSchedule
taint;nodeutil.SwapNodeControllerTaint
为 node 添加不存在的 taints 并删除不需要的 taints;nc.reconcileNodeLabels
检查 node 是否存在以下 label,若不存在则为其添加;
labels: beta.kubernetes.io/arch: amd64 beta.kubernetes.io/os: linux kubernetes.io/arch: amd64 kubernetes.io/os: linux k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:502
func (nc *Controller) doNodeProcessingPassWorker() {
for {
obj, shutdown := nc.nodeUpdateQueue.Get()
if shutdown {
return
}
nodeName := obj.(string)
if nc.taintNodeByCondition {
if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
......
}
}
if err := nc.reconcileNodeLabels(nodeName); err != nil {
......
}
nc.nodeUpdateQueue.Done(nodeName)
}
}
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
// 1、获取 node 对象
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
......
}
// 2、若 node 存在对应的 condition 则为其添加对应的 taint
var taints []v1.Taint
for _, condition := range node.Status.Conditions {
if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
if taintKey, found := taintMap[condition.Status]; found {
taints = append(taints, v1.Taint{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
// 3、判断是否为 Unschedulable
if node.Spec.Unschedulable {
taints = append(taints, v1.Taint{
Key: schedulerapi.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
})
}
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
if t.Effect != v1.TaintEffectNoSchedule {
return false
}
if t.Key == schedulerapi.TaintNodeUnschedulable {
return true
}
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
// 4、对比 node 已有 taints 和需要添加的 taints 得到 taintsToAdd, taintsToDel
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
// 5、更新 node 的 taints
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
当启用了 TaintBasedEvictions
特性时,通过 nc.monitorNodeHealth
检测到 node 异常时会将其加入到 nc.zoneNoExecuteTainter
队列中,nc.doNoExecuteTaintingPass
会处理 nc.zoneNoExecuteTainter
队列中的 node,并且会按一定的速率进行,此时会根据 node 实际的 NodeCondition 为 node 添加对应的 taint,当 node 存在 taint 时,taintManager 会驱逐 node 上的 pod。此过程中为 node 添加 taint 时进行了限速避免一次性驱逐过多 pod,在驱逐 node 上的 pod 时不会限速。
nc.doNoExecuteTaintingPass
的主要逻辑为:
node.kubernetes.io/not-ready:NoExecute
的 taint 且保证 node 不存在 node.kubernetes.io/unreachable:NoExecute
的 taintnode.kubernetes.io/unreachable:NoExecute
的 taint 且保证 node 不存在 node.kubernetes.io/not-ready:NoExecute
的 taint;
“unreachable” 和 “not ready” 两个 taint 是互斥的,只能存在一个; nodeutil.SwapNodeControllerTaint
更新 node 的 taint;k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:582
func (nc *Controller) doNoExecuteTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNoExecuteTainter {
nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
// 1、获取 node 对象
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
return true, 0
} else if err != nil {
return false, 50 * time.Millisecond
}
// 2、获取 node 的 NodeReadyCondition
_, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
// 3、判断 Condition 状态,并为其添加对应的 taint
switch condition.Status {
case v1.ConditionFalse:
taintToAdd = *NotReadyTaintTemplate
oppositeTaint = *UnreachableTaintTemplate
case v1.ConditionUnknown:
taintToAdd = *UnreachableTaintTemplate
oppositeTaint = *NotReadyTaintTemplate
default:
return true, 0
}
// 4、更新 node 的 taint
result := nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
if result {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
return result, 0
})
}
}
若未启用 TaintBasedEvictions
特性,此时通过 nc.monitorNodeHealth
检测到 node 异常时会将其加入到 nc.zonePodEvictor
队列中,nc.doEvictionPass
会将 nc.zonePodEvictor
队列中 node 上的 pod 驱逐掉。
nc.doEvictionPass
的主要逻辑为:
nodeutil.DeletePods
删除该 node 上的所有 pod,在 nodeutil.DeletePods
中首先通过从 apiserver 获取该 node 上所有的 pod,逐个删除 pod,若该 pod 为 daemonset 所管理的 pod 则忽略;k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:626
func (nc *Controller) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
......
nodeUID, _ := value.UID.(string)
remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
......
if node != nil {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
return true, 0
})
}
}
上面已经介绍了无论是否启用了 TaintBasedEvictions
特性,需要打 taint 或者驱逐 pod 的 node 都会被放在 zoneNoExecuteTainter 或者 zonePodEvictor 队列中,而 nc.monitorNodeHealth
就是这两个队列中数据的生产者。nc.monitorNodeHealth
的主要功能是持续监控 node 的状态,当 node 处于异常状态时更新 node 的 taint 以及 node 上 pod 的状态或者直接驱逐 node 上的 pod,此外还会为集群下的所有 node 划分 zoneStates 并为每个 zoneStates 设置对应的驱逐速率。
nc.monitorNodeHealth
主要逻辑为:
nc.classifyNodes
将 node 分为三类:added、deleted、newZoneRepresentatives,added 表示新增的,deleted 表示被删除的,newZoneRepresentatives 代表该 node 不存在 zoneStates,NodeLifecycleController 会为每一个 node 划分一个 zoneStates,zoneStates 有 Initial、Normal、FullDisruption、PartialDisruption 四种,新增加的 node 默认的 zoneStates 为 Initial,其余的几个 zoneStates 分别对应着不同的驱逐速率;nc.addPodEvictorForNewZone
将 node 添加到对应的的 zoneStates 中,然后根据是否启用了 TaintBasedEvictions
特性将 node 分别加入到 zonePodEvictor 或 zoneNoExecuteTainter 列表中,若启用了则加入到 zoneNoExecuteTainter 列表中否则加入到 zonePodEvictor 中;nc.addPodEvictorForNewZone
将该 node 添加到对应的 zoneStates 中,判断是否启用了 TaintBasedEvictions
特性,若启用了则调用 nc.markNodeAsReachable
移除该 node 上的 UnreachableTaint
和 NotReadyTaint
,并从 zoneNoExecuteTainter 中移除该 node,表示为该 node 进行一次初始化,若未启用 TaintBasedEvictions
特性则调用 nc.cancelPodEviction
将该 node 从 zonePodEvictor 中删除;nc.tryUpdateNodeHealth
获取该 node 的 gracePeriod、observedReadyCondition、currentReadyCondition,observedReadyCondition 可以理解为 node 上一次的状态, currentReadyCondition 为本次的状态;LegacyNodeRoleBehavior
或 NodeDisruptionExclusion
特性时,node 是否存在对应的标签,如果该 node 没有被排除,则将其对应的 zone 加入到 zoneToNodeConditions 中;TaintBasedEvictions
时,为其添加 NotReadyTaint
并且确保 node 不存在 UnreachableTaint
。若未启用 TaintBasedEvictions
则判断距 node 上一次 readyTransitionTimestamp 的时间是否超过了 podEvictionTimeout
(默认 5 分钟),若超过则将 node 加入到 zonePodEvictor 队列中,最终会驱逐 node 上的所有 pod;TaintBasedEvictions
时,则为 node 添加 UnreachableTaint
并且确保 node 不会有 NotReadyTaint
。若未启用 TaintBasedEvictions
则判断距 node 上一次 probeTimestamp 的时间是否超过了 podEvictionTimeout
(默认 5 分钟),若超过则将 node 加入到 zonePodEvictor 队列中,最终会驱逐 node 上的所有 pod;TaintBasedEvictions
时,调用 nc.markNodeAsReachable
移除 node 上的 NotReadyTaint
和 UnreachableTaint
,若未启用 TaintBasedEvictions
则将 node 从 zonePodEvictor 队列中移除;
此处主要是判断是否启用了 TaintBasedEvictions
特性,然后根据 node 的 ReadyCondition 判断是否直接驱逐 node 上的 pod 还是为 node 打 taint 等待 taintManager 驱逐 node 上的 pod; nodeutil.MarkAllPodsNotReady
将该node 上的所有 pod 标记为 notReady;nc.handleDisruption
处理中断情况,为不同 zoneState 设置驱逐的速度;k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:664
func (nc *Controller) monitorNodeHealth() error {
// 1、从 nodeLister 获取所有 node
nodes, err := nc.nodeLister.List(labels.Everything())
if err != nil {
return err
}
// 2、根据 controller knownNodeSet 中的记录将 node 分为三类
added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
// 3、为没有 zone 的 node 添加对应的 zone
for i := range newZoneRepresentatives {
nc.addPodEvictorForNewZone(newZoneRepresentatives[i])
}
// 4、将新增加的 node 添加到 knownNodeSet 中并且对 node 进行初始化
for i := range added {
......
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
nc.markNodeAsReachable(added[i])
} else {
nc.cancelPodEviction(added[i])
}
}
// 5、将 deleted 列表中的 node 从 knownNodeSet 中删除
for i := range deleted {
......
delete(nc.knownNodeSet, deleted[i].Name)
}
zoneToNodeConditions := map[string][]*v1.NodeCondition{}
for i := range nodes {
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
var currentReadyCondition *v1.NodeCondition
node := nodes[i].DeepCopy()
// 6、获取 node 的 gracePeriod, observedReadyCondition, currentReadyCondition
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(node)
if err == nil {
return true, nil
}
name := node.Name
node, err = nc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
return false, nil
}); err != nil {
......
}
// 7、若 node 没有被排除则加入到 zoneToNodeConditions 列表中
if !isNodeExcludedFromDisruptionChecks(node) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}
decisionTimestamp := nc.now()
// 8、根据 observedReadyCondition 为 node 添加不同的 taint
if currentReadyCondition != nil {
switch observedReadyCondition.Status {
case v1.ConditionFalse:
// 9、false 状态添加 NotReady taint
if nc.useTaintBasedEvictions {
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
......
}
} else if nc.markNodeForTainting(node) {
......
}
// 10、或者当超过 podEvictionTimeout 后直接驱逐 node 上的 pod
} else {
if decisionTimestamp.After(nc.nodeHealthMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
......
}
}
}
case v1.ConditionUnknown:
// 11、unknown 状态时添加 UnreachableTaint
if nc.useTaintBasedEvictions {
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
......
}
} else if nc.markNodeForTainting(node) {
......
}
} else {
if decisionTimestamp.After(nc.nodeHealthMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
......
}
}
}
case v1.ConditionTrue:
// 12、true 状态时移除所有 UnreachableTaint 和 NotReadyTaint
if nc.useTaintBasedEvictions {
removed, err := nc.markNodeAsReachable(node)
if err != nil {
......
}
// 13、从 PodEviction 队列中移除
} else {
if nc.cancelPodEviction(node) {
......
}
}
}
// 14、ReadyCondition 由 true 变为 false 时标记 node 上的 pod 为 notready
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
}
}
// 15、处理中断情况
nc.handleDisruption(zoneToNodeConditions, nodes)
return nil
}
nc.tryUpdateNodeHealth
会根据当前获取的 node status 更新 nc.nodeHealthMap
中的数据,nc.nodeHealthMap
保存 node 最近一次的状态,并会根据 nc.nodeHealthMap
判断 node 是否已经处于 unknown 状态。
nc.tryUpdateNodeHealth
的主要逻辑为:
nc.nodeHealthMap
中更新 node 的 Status;nc.nodeHealthMap
中获取 node 的 condition 和 lease 信息,更新 savedNodeHealth 中 status、probeTimestamp、readyTransitionTimestamp,若启用了 NodeLease
特性也会更新 NodeHealth 中的 lease 以及 probeTimestamp,最后将当前计算出 savedNodeHealth 保存到 nc.nodeHealthMap
中;k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:851
func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
_, currentReadyCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
// 1、若 currentReadyCondition 为 nil 则 fake 一个 observedReadyCondition
if currentReadyCondition == nil {
observedReadyCondition = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: node.CreationTimestamp,
}
gracePeriod = nc.nodeStartupGracePeriod
if _, found := nc.nodeHealthMap[node.Name]; found {
nc.nodeHealthMap[node.Name].status = &node.Status
} else {
nc.nodeHealthMap[node.Name] = &nodeHealthData{
status: &node.Status,
probeTimestamp: node.CreationTimestamp,
readyTransitionTimestamp: node.CreationTimestamp,
}
}
} else {
observedReadyCondition = *currentReadyCondition
gracePeriod = nc.nodeMonitorGracePeriod
}
// 2、savedNodeHealth 中保存 node 最近的一次状态
savedNodeHealth, found := nc.nodeHealthMap[node.Name]
var savedCondition *v1.NodeCondition
var savedLease *coordv1beta1.Lease
if found {
_, savedCondition = nodeutil.GetNodeCondition(savedNodeHealth.status, v1.NodeReady)
savedLease = savedNodeHealth.lease
}
// 3、根据 savedCondition 以及 currentReadyCondition 更新 savedNodeHealth 中的数据
if !found {
savedNodeHealth = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
} else if savedCondition == nil && currentReadyCondition != nil {
savedNodeHealth = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
} else if savedCondition != nil && currentReadyCondition == nil {
savedNodeHealth = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
} else if savedCondition != nil && currentReadyCondition != nil && savedCondition.LastHeartbeatTime != currentReadyCondition.LastHeartbeatTime {
var transitionTime metav1.Time
if savedCondition.LastTransitionTime != currentReadyCondition.LastTransitionTime {
transitionTime = nc.now()
} else {
transitionTime = savedNodeHealth.readyTransitionTimestamp
}
savedNodeHealth = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.now(),
readyTransitionTimestamp: transitionTime,
}
}
// 4、判断是否启用了 nodeLease 功能
var observedLease *coordv1beta1.Lease
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
observedLease, _ = nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)
if observedLease != nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
savedNodeHealth.lease = observedLease
savedNodeHealth.probeTimestamp = nc.now()
}
}
nc.nodeHealthMap[node.Name] = savedNodeHealth
// 5、检查 node 是否已经超过 gracePeriod 时间没有上报状态了
if nc.now().After(savedNodeHealth.probeTimestamp.Add(gracePeriod)) {
nodeConditionTypes := []v1.NodeConditionType{
v1.NodeReady,
v1.NodeMemoryPressure,
v1.NodeDiskPressure,
v1.NodePIDPressure,
}
nowTimestamp := nc.now()
// 6、若 node 超过 gracePeriod 时间没有上报状态将其所有 Condition 设置 unknown
for _, nodeConditionType := range nodeConditionTypes {
_, currentCondition := nodeutil.GetNodeCondition(&node.Status, nodeConditionType)
if currentCondition == nil {
node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
Type: nodeConditionType,
Status: v1.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: "Kubelet never posted node status.",
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: nowTimestamp,
})
} else {
if currentCondition.Status != v1.ConditionUnknown {
currentCondition.Status = v1.ConditionUnknown
currentCondition.Reason = "NodeStatusUnknown"
currentCondition.Message = "Kubelet stopped posting node status."
currentCondition.LastTransitionTime = nowTimestamp
}
}
}
// 7、更新 node 最新状态至 apiserver 并更新 nodeHealthMap 中的数据
_, currentReadyCondition = nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
if !apiequality.Semantic.DeepEqual(currentReadyCondition, &observedReadyCondition) {
if _, err := nc.kubeClient.CoreV1().Nodes().UpdateStatus(node); err != nil {
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
nc.nodeHealthMap[node.Name] = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.nodeHealthMap[node.Name].probeTimestamp,
readyTransitionTimestamp: nc.now(),
lease: observedLease,
}
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}
}
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}
monitorNodeHealth
中会为每个 node 划分 zone 并设置 zoneState,nc.handleDisruption
的目的是当集群中不同 zone 下出现多个 unhealthy node 时会 zone 设置不同的驱逐速率。
nc.handleDisruption
主要逻辑为:
nc.computeZoneStateFunc
计算每个 zone 的状态,分为三种 fullyDisrupted
(zone 下所有 node 都处于 notReady 状态)、partiallyDisrupted
(notReady node 占比 >= unhealthyZoneThreshold 的值且 node 数超过三个)、normal
(以上两种情况之外)。若 newState 不为 stateFullDisruption
将 allAreFullyDisrupted 设为 false,将 newState 保存在 newZoneStates 中,FullDisruption
状态;FullyDisrupted
直接停止所有的驱逐工作,因为此时可能处于网络中断的状态;FullyDisrupted
切换到了 FullyDisrupted
模式,此时需要停止所有 node 的驱逐工作,首先去掉 node 上的 taint 并设置所有zone的对应 zoneNoExecuteTainter 或者 zonePodEvictor 的 Rate Limeter 为0,最后更新所有 zone 的状态为 FullDisruption
;FullyDisrupted
变为 非 FullyDisrupted
模式,此时首先更新 nc.nodeHealthMap
中所有 node 的 probeTimestamp 和 readyTransitionTimestamp 为当前时间,然后调用 nc.setLimiterInZone
重置每个 zone 的驱逐速率;FullDisruption
时,此时根据 zone 的 state 为每个 zone 设置默认的驱逐速率;k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:1017
func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]ZoneState{}
allAreFullyDisrupted := true
// 1、判断当前所有 zone 是否都为 FullDisruption 状态
for k, v := range zoneToNodeConditions {
zoneSize.WithLabelValues(k).Set(float64(len(v)))
// 2、计算 zone state 以及 unhealthy node
unhealthy, newState := nc.computeZoneStateFunc(v)
zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
if newState != stateFullDisruption {
allAreFullyDisrupted = false
}
newZoneStates[k] = newState
if _, had := nc.zoneStates[k]; !had {
nc.zoneStates[k] = stateInitial
}
}
// 3、判断上一次观察到的所有 zone 是否都为 FullDisruption 状态
allWasFullyDisrupted := true
for k, v := range nc.zoneStates {
if _, have := zoneToNodeConditions[k]; !have {
zoneSize.WithLabelValues(k).Set(0)
zoneHealth.WithLabelValues(k).Set(100)
unhealthyNodes.WithLabelValues(k).Set(0)
delete(nc.zoneStates, k)
continue
}
if v != stateFullDisruption {
allWasFullyDisrupted = false
break
}
}
// 4、若存在一个不为 FullyDisrupted
if !allAreFullyDisrupted || !allWasFullyDisrupted {
// 5、如果 allAreFullyDisrupted 为 true,则 allWasFullyDisrupted 为 false
// 说明从非 FullyDisrupted 切换到了 FullyDisrupted 模式
if allAreFullyDisrupted {
for i := range nodes {
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsReachable(nodes[i])
if err != nil {
klog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
}
} else {
nc.cancelPodEviction(nodes[i])
}
}
for k := range nc.zoneStates {
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainter[k].SwapLimiter(0)
} else {
nc.zonePodEvictor[k].SwapLimiter(0)
}
}
for k := range nc.zoneStates {
nc.zoneStates[k] = stateFullDisruption
}
return
}
// 6、如果 allWasFullyDisrupted 为 true,则 allAreFullyDisrupted 为 false
// 说明 cluster 从 FullyDisrupted 切换为非 FullyDisrupted 模式
if allWasFullyDisrupted {
now := nc.now()
for i := range nodes {
v := nc.nodeHealthMap[nodes[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeHealthMap[nodes[i].Name] = v
}
for k := range nc.zoneStates {
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
nc.zoneStates[k] = newZoneStates[k]
}
return
}
// 7、根据 zoneState 为每个 zone 设置驱逐速率
for k, v := range nc.zoneStates {
newState := newZoneStates[k]
if v == newState {
continue
}
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
nc.zoneStates[k] = newState
}
}
}
nc.computeZoneStateFunc
是计算 zone state 的方法,该方法会计算每个 zone 下 notReady 的 node 并将 zone 分为三种:
fullyDisrupted
:zone 下所有 node 都处于 notReady 状态;partiallyDisrupted
:notReady node 占比 >= unhealthyZoneThreshold 的值(默认为0.55,通过--unhealthy-zone-threshold
设置)且 notReady node 数超过2个;normal
:以上两种情况之外的;k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:1262
func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
readyNodes := 0
notReadyNodes := 0
for i := range nodeReadyConditions {
if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {
readyNodes++
} else {
notReadyNodes++
}
}
switch {
case readyNodes == 0 && notReadyNodes > 0:
return notReadyNodes, stateFullDisruption
case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
return notReadyNodes, statePartialDisruption
default:
return notReadyNodes, stateNormal
}
}
nc.setLimiterInZone
方法会根据不同的 zoneState 设置对应的驱逐速率:
stateNormal
:驱逐速率为 evictionLimiterQPS(默认为0.1,可以通过 --node-eviction-rate
参数指定)的值,即每隔 10s 清空一个节点;statePartialDisruption
:如果当前 zone size 大于 nc.largeClusterThreshold
(默认为 50,通过--large-cluster-size-threshold
设置),则设置为 secondaryEvictionLimiterQPS(默认为 0.01,可以通过 --secondary-node-eviction-rate
指定),否则设置为 0;stateFullDisruption
:为 evictionLimiterQPS(默认为0.1,可以通过 --node-eviction-rate
参数指定)的值;k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go:1115
func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
switch state {
case stateNormal:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)
} else {
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
}
case statePartialDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainter[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
}
case stateFullDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNoExecuteTainter[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
}
}
}
monitorNodeHealth 中的主要流程如下所示:
monitorNodeHealth
|
|
useTaintBasedEvictions
|
|
---------------------------------------------
yes | | no
| |
v v
addPodEvictorForNewZone evictPods
| |
| |
v v
zoneNoExecuteTainter zonePodEvictor
(RateLimitedTimedQueue) (RateLimitedTimedQueue)
| |
| |
| |
v v
doNoExecuteTaintingPass doEvictionPass
(consumer) (consumer)
NodeLifecycleController 中三个核心组件之间的交互流程如下所示:
monitorNodeHealth
|
|
| 为 node 添加 NoExecute taint
|
|
v 为 node 添加
watch nodeList NoSchedule taint
taintManager ------> APIServer <----------- nc.doNodeProcessingPassWorker
|
|
|
v
驱逐 node 上不容忍
node taint 的 pod
至此,NodeLifecycleController 的核心代码已经分析完。
本文主要分析了 NodeLifecycleController 的设计与实现,NodeLifecycleController 主要是监控 node 状态,当 node 异常时驱逐 node 上的 pod,其行为与其他组件有一定关系,node 的状态由 kubelet 上报,node 异常时为 node 添加 taint 标签后,scheduler 调度 pod 也会有对应的行为。为了保证由于网络等问题引起的 pod 驱逐行为,NodeLifecycleController 会为 node 进行分区并会为每个区设置不同的驱逐速率,即实际上会以 rate-limited 的方式添加 taint,在某些情况下可以避免 pod 被大量驱逐。
此外,NodeLifecycleController 还会对外暴露多个 metrics,包括 zoneHealth、zoneSize、unhealthyNodes、evictionsNumber 等,便于用户查看集群下 node 的状态。
参考:
https://kubernetes.io/zh/docs/concepts/configuration/taint-and-toleration/
https://kubernetes.io/docs/reference/command-line-tools-reference/feature-gates/