Author: xidianwangtao@gmail.com
Node Controller的Run方法如下,这是所有Node Controller真正处理逻辑的入口。
pkg/controller/node/nodecontroller.go:550
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() {
go func() {
defer utilruntime.HandleCrash()
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}
}()
}
vendor/k8s.io/client-go/tools/cache/shared_informer.go:100
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the contoller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
// 每隔100ms遍历一次cacheSyncs中的InformerSynced方法,
// 当所有要求的cacheSyncs方法都返回true,
// 意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,
// 否则继续遍历。
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
glog.V(2).Infof("stop requested")
return false
}
glog.V(4).Infof("caches populated")
return true
}
WaitForCacheSync的实现逻辑是:
pkg/controller/node/nodecontroller.go:586
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error {
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeLister.List(labels.Everything())
if err != nil {
return err
}
// 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
added, deleted := nc.checkForNodeAddedDeleted(nodes)
// 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
for i := range added {
...
// 将added node添加到knowNodeSet中
nc.knownNodeSet[added[i].Name] = added[i]
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
zone := utilnode.GetZoneKey(added[i])
if _, found := nc.zoneStates[zone]; !found {
// 设置该Node对应的新zone状态为“Initial”
nc.zoneStates[zone] = stateInitial
// 如果Node Controller的useTaintBasedEvictions为false(--feature-gates中指定,默认TaintBasedEvictions=false),
// 则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(--node-eviction-rate设置,默认为0.1)
if !nc.useTaintBasedEvictions {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
} else {
// 如果Node Controller的useTaintBasedEvictions为true,
// 则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
nc.zoneNotReadyOrUnreachableTainer[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
...
}
// 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,
// 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
if nc.useTaintBasedEvictions {
nc.markNodeAsHealthy(added[i])
} else {
// 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,
// 将该node从对应的zonePodEvictor Queue中Remove
nc.cancelPodEviction(added[i])
}
}
// 遍历deleted Nodes列表,将其从knowNodeSet中删除
for i := range deleted {
...
delete(nc.knownNodeSet, deleted[i].Name)
}
zoneToNodeConditions := map[string][]*v1.NodeCondition{}
for i := range nodes {
...
// PollImmediate tries a condition func until it returns true, an error, or the timeout is reached.
// retrySleepTime为20ms,timeout为100ms。
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) {
// nc.tryUpdateNodeStatus - For a given node checks its conditions and tries to update it.
// Returns grace period to which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
if err == nil {
return true, nil
}
...
});
...
}
// 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
// We do not treat a master node as a part of the cluster for network disruption checking.
if !system.IsMasterNode(node.Name) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}
decisionTimestamp := nc.now()
if currentReadyCondition != nil {
// 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理
// Check eviction timeout against decisionTimestamp
if observedReadyCondition.Status == v1.ConditionFalse {
// useTaintBasedEvictions为true时,
if nc.useTaintBasedEvictions {
// 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if v1.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
...
}
// 将node加入到Tainer Queue中,交给Taint Controller处理
} else if nc.markNodeForTainting(node) {
...
}
// 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
} else {
// 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
if decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
// 将node加入到PodEvictor Queue中,交给PodEvictor处理
if nc.evictPods(node) {
...
}
}
}
}
// 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
if observedReadyCondition.Status == v1.ConditionUnknown {
// useTaintBasedEvictions为true时,
if nc.useTaintBasedEvictions {
// 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if v1.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
...
}
// 将node加入到Tainer Queue中,交给Taint Controller处理
} else if nc.markNodeForTainting(node) {
...
}
// 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
} else {
// 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
if decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
// 将node加入到PodEvictor Queue中,交给PodEvictor处理
if nc.evictPods(node) {
...
}
}
}
}
// 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
if observedReadyCondition.Status == v1.ConditionTrue {
// useTaintBasedEvictions为true时
if nc.useTaintBasedEvictions {
// 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
removed, err := nc.markNodeAsHealthy(node)
...
} else {
// useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
if nc.cancelPodEviction(node) {
...
}
}
}
// 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
// Report node event.
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = markAllPodsNotReady(nc.kubeClient, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
...
}
}
// 处理Disruption
nc.handleDisruption(zoneToNodeConditions, nodes)
return nil
}
下面我们接着看handleDisruption
的逻辑:
pkg/controller/node/nodecontroller.go:772
func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]zoneState{}
// 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
allAreFullyDisrupted := true
for k, v := range zoneToNodeConditions {
ZoneSize.WithLabelValues(k).Set(float64(len(v)))
unhealthy, newState := nc.computeZoneStateFunc(v)
ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
if newState != stateFullDisruption {
allAreFullyDisrupted = false
}
newZoneStates[k] = newState
if _, had := nc.zoneStates[k]; !had {
glog.Errorf("Setting initial state for unseen zone: %v", k)
nc.zoneStates[k] = stateInitial
}
}
// 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
allWasFullyDisrupted := true
for k, v := range nc.zoneStates {
if _, have := zoneToNodeConditions[k]; !have {
ZoneSize.WithLabelValues(k).Set(0)
ZoneHealth.WithLabelValues(k).Set(100)
UnhealthyNodes.WithLabelValues(k).Set(0)
delete(nc.zoneStates, k)
continue
}
if v != stateFullDisruption {
allWasFullyDisrupted = false
break
}
}
// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
// - if new state is "fullDisruption" we restore normal eviction rate,
// - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
if !allAreFullyDisrupted || !allWasFullyDisrupted {
// 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,则遍历所有nodes:
// We're switching to full disruption mode
if allAreFullyDisrupted {
glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
// 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsHealthy(nodes[i])
if err != nil {
glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
}
} else {
// 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
nc.cancelPodEviction(nodes[i])
}
}
// 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
// We stop all evictions.
for k := range nc.zoneStates {
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[k].SwapLimiter(0)
} else {
nc.zonePodEvictor[k].SwapLimiter(0)
}
}
// 更新所有zone的状态(nc.zoneStates)为FullDisruption
for k := range nc.zoneStates {
nc.zoneStates[k] = stateFullDisruption
}
// All rate limiters are updated, so we can return early here.
return
}
// 如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,则遍历所有nodes:
// We're exiting full disruption mode
if allWasFullyDisrupted {
glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
// When exiting disruption mode update probe timestamps on all Nodes.
now := nc.now()
for i := range nodes {
v := nc.nodeStatusMap[nodes[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[nodes[i].Name] = v
}
// 根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
// We reset all rate limiters to settings appropriate for the given state.
for k := range nc.zoneStates {
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
nc.zoneStates[k] = newZoneStates[k]
}
return
}
// 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
// We know that there's at least one not-fully disrupted so,
// we can use default behavior for rate limiters
for k, v := range nc.zoneStates {
newState := newZoneStates[k]
if v == newState {
continue
}
glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
nc.zoneStates[k] = newState
}
}
}
因此,handleDisruption的逻辑为:
setLimiterInZone
**重新设置对应的Disruption rate limiter的值。setLimiterInZone
**重新设置对应的Disruption rate limiter的值。下面接着来看setLimiterInZone的逻辑,它是如何根据zone size和zone state对应到不同的rate limiter的。
pkg/controller/node/nodecontroller.go:870
func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
switch state {
// 如果zone state为normal,则设置对应的rate limiter为evictionLimiterQPS(默认为0.1)
case stateNormal:
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
} else {
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
}
// 如果zone state为PartialDisruption,则调用nc.enterPartialDisruptionFunc来设置对应的rate limiter。
case statePartialDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
}
// 如果zone state为FullDisruption,则调用nc.enterFullDisruptionFunc来设置对应的rate limiter。
case stateFullDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
}
}
}
nc.enterFullDisruptionFunc和nc.enterPartialDisruptionFunc是在调用NewNodeController创建Node Controller的时候赋值注册的:
pkg/controller/node/nodecontroller.go:270
func NewNodeController(...) (*NodeController, error) {
...
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
...
}
pkg/controller/node/nodecontroller.go:1132
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
return nc.evictionLimiterQPS
}
// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
if int32(nodeNum) > nc.largeClusterThreshold {
return nc.secondaryEvictionLimiterQPS
}
return 0
}
因此setLimiterInZone的逻辑为:
nc.largeClusterThreshold
(默认为50),则设置为secondaryEvictionLimiterQPS
(默认为0.01)evictionLimiterQPS
(默认0.1)
evictionLimiterQPS
(默认为0.1)
在Node Controller的Run方法中,接着会启动Tainter Manager:
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
nc.runTaintManager通过--enable-taint-manager
设置,默认为true,因此默认情况下都会启动Taint Manager。
接下来,我们看看Taint Manager Run方法中都做了些什么。
pkg/controller/node/taint_controller.go:179
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
glog.V(0).Infof("Starting NoExecuteTaintManager")
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(*nodeUpdateItem)
select {
case <-stopCh:
break
case tc.nodeUpdateChannel <- nodeUpdate:
}
}
}(stopCh)
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
podUpdate := item.(*podUpdateItem)
select {
case <-stopCh:
break
case tc.podUpdateChannel <- podUpdate:
}
}
}(stopCh)
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
break
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
case podUpdate := <-tc.podUpdateChannel:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
}
}
}
tc.handleNodeUpdate
处理完所有nodeUpdate后,才会去调用tc.handlePodUpdate
开始处理podUpdate。关于NoExecuteTaintManager的handleNodeUpdate和handlePodUpdate,我会在之后专门对Taint Manager写一篇博客进行分析,在此就不会再深入下去。
如果useTaintBasedEvictions为true,即--feature-gates
中指定TaintBasedEvictions为true(默认TaintBasedEvictions=false)则每隔100ms调用一次doTaintingPass。
doTaintingPass就是根据Node Condition是NotReady或者Unknown,调apiserver,分别给node打上对应的Taint:node.alpha.kubernetes.io/notReady
和node.alpha.kubernetes.io/unreachable
。
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
}
如果useTaintBasedEvictions为false(默认TaintBasedEvictions=false)则每隔100ms调用一次doEvictionPass。
else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}
我们接着来看doEvictionPass
的代码:
pkg/controller/node/nodecontroller.go:481
func (nc *NodeController) doEvictionPass() {
...
// 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,然后调用deletePods删除node上的所有pods(deamonSet对应的Pod除外)
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
...
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
...
return true, 0
})
}
}
doEvictionPass的逻辑:
deletePods
删除node上的所有pods(deamonSet对应的Pod除外)deletePods
的代码如下,
pkg/controller/node/controller_utils.go:49
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
...
// 从apiserver中获取所有的pods对象。
pods, err := kubeClient.Core().Pods(metav1.NamespaceAll).List(options)
...
// 逐个遍历pods中的pod,筛选出该node上的pods
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
...
// if the pod has already been marked for deletion, we still return true that there are remaining pods.
if pod.DeletionGracePeriodSeconds != nil {
remaining = true
continue
}
// if the pod is managed by a daemonset, ignore it
_, err := daemonStore.GetPodDaemonSets(&pod)
if err == nil { // No error means at least one daemonset was found
continue
}
...
// 调用apiserver接口删除pod
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
...
return remaining, nil
}
deletePods
的主要逻辑如下:
至此,Node Controller Run方法的所有主要逻辑都已分析完成。
其中涉及到Taint Manager Run的相关逻辑,在该博文没有深入进去分析,我将在后续博文中对Taint Manager做一次专门的分析。
关于Node Controller的其他博文: