前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kubernetes Node Controller源码分析之执行篇

Kubernetes Node Controller源码分析之执行篇

作者头像
Walton
发布2018-04-16 11:36:02
2.4K0
发布2018-04-16 11:36:02
举报
文章被收录于专栏:KubernetesKubernetes

Author: xidianwangtao@gmail.com

Node Controller的执行

Node Controller的Run方法如下,这是所有Node Controller真正处理逻辑的入口。

代码语言:javascript
复制
pkg/controller/node/nodecontroller.go:550

// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() {
	go func() {
		defer utilruntime.HandleCrash()

		if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
			utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
			return
		}

		// Incorporate the results of node status pushed from kubelet to master.
		go wait.Until(func() {
			if err := nc.monitorNodeStatus(); err != nil {
				glog.Errorf("Error monitoring node status: %v", err)
			}
		}, nc.nodeMonitorPeriod, wait.NeverStop)

		if nc.runTaintManager {
			go nc.taintManager.Run(wait.NeverStop)
		}

		if nc.useTaintBasedEvictions {
			// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
			// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
			go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
		} else {
			// Managing eviction of nodes:
			// When we delete pods off a node, if the node was not empty at the time we then
			// queue an eviction watcher. If we hit an error, retry deletion.
			go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
		}
	}()
}

WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced)

  • Node Controller首先调用WaitForCacheSync,等待PodInformer、NodeInformer、DaemonSetInformer的HasSyncs都返回true,即这三个API Object都完成同步。
代码语言:javascript
复制
vendor/k8s.io/client-go/tools/cache/shared_informer.go:100

// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
// if the contoller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    
    // 每隔100ms遍历一次cacheSyncs中的InformerSynced方法,
    // 当所有要求的cacheSyncs方法都返回true,
    // 意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,
    // 否则继续遍历。
	err := wait.PollUntil(syncedPollPeriod,
		func() (bool, error) {
			for _, syncFunc := range cacheSyncs {
				if !syncFunc() {
					return false, nil
				}
			}
			return true, nil
		},
		stopCh)
	if err != nil {
		glog.V(2).Infof("stop requested")
		return false
	}

	glog.V(4).Infof("caches populated")
	return true
}

WaitForCacheSync的实现逻辑是:

  • 每隔100ms遍历一次cacheSyncs中的InformerSynced方法,当所有要求的cacheSyncs方法都返回true,意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,
  • 否则按照100ms的周期继续遍历,知道返回true或者受到stop信号为止。

启动goruntime按照5s的周期执行monitorNodeStatus,进行Node状态监控

代码语言:javascript
复制
pkg/controller/node/nodecontroller.go:586

// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error {

	// We are listing nodes from local cache as we can tolerate some small delays
	// comparing to state from etcd and there is eventual consistency anyway.
	nodes, err := nc.nodeLister.List(labels.Everything())
	if err != nil {
		return err
	}
	
	// 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
	added, deleted := nc.checkForNodeAddedDeleted(nodes)
	
	// 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
	for i := range added {
		...
		
		// 将added node添加到knowNodeSet中
		nc.knownNodeSet[added[i].Name] = added[i]
		
		// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
		zone := utilnode.GetZoneKey(added[i])
		if _, found := nc.zoneStates[zone]; !found {
		
		   // 设置该Node对应的新zone状态为“Initial”
			nc.zoneStates[zone] = stateInitial
			
			// 如果Node Controller的useTaintBasedEvictions为false(--feature-gates中指定,默认TaintBasedEvictions=false),
			// 则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(--node-eviction-rate设置,默认为0.1)
			if !nc.useTaintBasedEvictions {
				nc.zonePodEvictor[zone] =
					NewRateLimitedTimedQueue(
						flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
			} else {
			 
			    // 如果Node Controller的useTaintBasedEvictions为true,
			    // 则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
				nc.zoneNotReadyOrUnreachableTainer[zone] =
					NewRateLimitedTimedQueue(
						flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
			}
			...
		}
		
		// 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,
		// 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
		if nc.useTaintBasedEvictions {
			nc.markNodeAsHealthy(added[i])
		} else {
		
		// 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,
		// 将该node从对应的zonePodEvictor Queue中Remove
			nc.cancelPodEviction(added[i])
		}
	}

    // 遍历deleted Nodes列表,将其从knowNodeSet中删除
	for i := range deleted {
	   ...
		delete(nc.knownNodeSet, deleted[i].Name)
	}
    
    
	zoneToNodeConditions := map[string][]*v1.NodeCondition{}
	for i := range nodes {
		...
		// PollImmediate tries a condition func until it returns true, an error, or the timeout is reached.
		// retrySleepTime为20ms,timeout为100ms。
		if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) {
		
		  // nc.tryUpdateNodeStatus - For a given node checks its conditions and tries to update it. 
		  // Returns grace period to which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
			gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
			if err == nil {
				return true, nil
			}
			...
		}); 
        ...
		}

        // 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
		// We do not treat a master node as a part of the cluster for network disruption checking.
		if !system.IsMasterNode(node.Name) {
			zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
		}

        
		decisionTimestamp := nc.now()
		if currentReadyCondition != nil {
			
			// 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理
			// Check eviction timeout against decisionTimestamp
			if observedReadyCondition.Status == v1.ConditionFalse {
			     
			     // useTaintBasedEvictions为true时,
				if nc.useTaintBasedEvictions {
					
					// 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
					// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
					if v1.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
						taintToAdd := *NotReadyTaintTemplate
						if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
							...
						}
					
					// 将node加入到Tainer Queue中,交给Taint Controller处理
					} else if nc.markNodeForTainting(node) {
						...
					}
					
				// 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
				} else {
				
				    // 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
					if decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
					
					   // 将node加入到PodEvictor Queue中,交给PodEvictor处理
						if nc.evictPods(node) {
							...
						}
					}
				}
			}
			
			// 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
			if observedReadyCondition.Status == v1.ConditionUnknown {
			
			    //  useTaintBasedEvictions为true时,
				if nc.useTaintBasedEvictions {
				
				    // 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
					// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
					if v1.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
						taintToAdd := *UnreachableTaintTemplate
						if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
                        ...
						}
						
					// 将node加入到Tainer Queue中,交给Taint Controller处理
					} else if nc.markNodeForTainting(node) {
						...
					}
				
				// 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
				} else {
				
				    // 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
					if decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
						
						// 将node加入到PodEvictor Queue中,交给PodEvictor处理
						if nc.evictPods(node) {
							...
						}
					}
				}
			}
			
			// 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
			if observedReadyCondition.Status == v1.ConditionTrue {
			
             // useTaintBasedEvictions为true时
				if nc.useTaintBasedEvictions {
				
				    // 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
					removed, err := nc.markNodeAsHealthy(node)
					...
				} else {
				
				    // useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
					if nc.cancelPodEviction(node) {
						...
					}
				}
			}

          // 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
			// Report node event.
			if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
				recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
				if err = markAllPodsNotReady(nc.kubeClient, node); err != nil {
					utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
				}
			}
			

			// Check with the cloud provider to see if the node still exists. If it
			// doesn't, delete the node immediately.
			...
		}
	}
	
	// 处理Disruption
	nc.handleDisruption(zoneToNodeConditions, nodes)

	return nil
}
  • 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
  • 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
    • 将added node添加到knowNodeSet中
    • When adding new Nodes we need to check if new zone appeared, and if so add new evictor.如果是新zone,则:
      • 设置该Node对应的新zone状态为“Initial”
      • 如果Node Controller的useTaintBasedEvictions为false(--feature-gates中指定,默认TaintBasedEvictions=false),则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(--node-eviction-rate设置,默认为0.1)
      • 如果Node Controller的useTaintBasedEvictions为true,则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
    • 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
    • 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,将该node从对应的zonePodEvictor Queue中Remove。
  • 遍历deleted Nodes列表,将其从knowNodeSet中删除
  • 遍历所有nodes,
    • 更新Node Status,得到上一次观察到的NodeCondition和当前的NodeCondition
    • 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
    • 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理。
      • useTaintBasedEvictions为true时,
        • 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
        • 将node加入到Tainer Queue中,交给Taint Controller处理
      • 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
        • 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
        • 将node加入到PodEvictor Queue中,交给PodEvictor处理
    • 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
      • useTaintBasedEvictions为true时,
        • 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
        • 将node加入到Tainer Queue中,交给Taint Controller处理
      • 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
        • 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
        • 将node加入到PodEvictor Queue中,交给PodEvictor处理
    • 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
      • useTaintBasedEvictions为true时
        • 将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
      • useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
    • 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
  • 执行handleDisruption

下面我们接着看handleDisruption的逻辑:

代码语言:javascript
复制
pkg/controller/node/nodecontroller.go:772


func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
	newZoneStates := map[string]zoneState{}
	
	// 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
	allAreFullyDisrupted := true
	for k, v := range zoneToNodeConditions {
		ZoneSize.WithLabelValues(k).Set(float64(len(v)))
		unhealthy, newState := nc.computeZoneStateFunc(v)
		ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
		UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
		if newState != stateFullDisruption {
			allAreFullyDisrupted = false
		}
		newZoneStates[k] = newState
		if _, had := nc.zoneStates[k]; !had {
			glog.Errorf("Setting initial state for unseen zone: %v", k)
			nc.zoneStates[k] = stateInitial
		}
	}

    // 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
	allWasFullyDisrupted := true
	for k, v := range nc.zoneStates {
		if _, have := zoneToNodeConditions[k]; !have {
			ZoneSize.WithLabelValues(k).Set(0)
			ZoneHealth.WithLabelValues(k).Set(100)
			UnhealthyNodes.WithLabelValues(k).Set(0)
			delete(nc.zoneStates, k)
			continue
		}
		if v != stateFullDisruption {
			allWasFullyDisrupted = false
			break
		}
	}

	// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
	// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
	// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
	// - if new state is "fullDisruption" we restore normal eviction rate,
	//   - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
	if !allAreFullyDisrupted || !allWasFullyDisrupted {
	
	   // 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,则遍历所有nodes:
		// We're switching to full disruption mode
		if allAreFullyDisrupted {
			glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
			for i := range nodes {
			     // 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
				if nc.useTaintBasedEvictions {
					_, err := nc.markNodeAsHealthy(nodes[i])
					if err != nil {
						glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
					}
				} else {
				    // 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
					nc.cancelPodEviction(nodes[i])
				}
			}
			
			// 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
			// We stop all evictions.
			for k := range nc.zoneStates {
				if nc.useTaintBasedEvictions {
					nc.zoneNotReadyOrUnreachableTainer[k].SwapLimiter(0)
				} else {
					nc.zonePodEvictor[k].SwapLimiter(0)
				}
			}
			
			// 更新所有zone的状态(nc.zoneStates)为FullDisruption
			for k := range nc.zoneStates {
				nc.zoneStates[k] = stateFullDisruption
			}
			// All rate limiters are updated, so we can return early here.
			return
		}
		
		//  如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,则遍历所有nodes:
		// We're exiting full disruption mode
		if allWasFullyDisrupted {
			glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
			
			// When exiting disruption mode update probe timestamps on all Nodes.
			now := nc.now()
			for i := range nodes {
				v := nc.nodeStatusMap[nodes[i].Name]
				v.probeTimestamp = now
				v.readyTransitionTimestamp = now
				nc.nodeStatusMap[nodes[i].Name] = v
			}
			
			
			// 根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
			// We reset all rate limiters to settings appropriate for the given state.
			for k := range nc.zoneStates {
				nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
				nc.zoneStates[k] = newZoneStates[k]
			}
			return
		}
		
		
		// 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
		// We know that there's at least one not-fully disrupted so,
		// we can use default behavior for rate limiters
		for k, v := range nc.zoneStates {
			newState := newZoneStates[k]
			if v == newState {
				continue
			}
			glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
			nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
			nc.zoneStates[k] = newState
		}
	}
}

因此,handleDisruption的逻辑为:

  • 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
  • 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
  • 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,表示switching to full disruption mode,则遍历所有nodes:
    • 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
    • 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
    • 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
    • 更新所有zone的状态(nc.zoneStates)为FullDisruption
  • 如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,表示 exiting disruption mode 则遍历所有nodes:
    • update probe timestamps on all Nodes.
    • 根据zone size和zone state,调用**setLimiterInZone**重新设置对应的Disruption rate limiter的值。
  • 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,调用**setLimiterInZone**重新设置对应的Disruption rate limiter的值。

下面接着来看setLimiterInZone的逻辑,它是如何根据zone size和zone state对应到不同的rate limiter的。

代码语言:javascript
复制
pkg/controller/node/nodecontroller.go:870

func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
	switch state {
	
	// 如果zone state为normal,则设置对应的rate limiter为evictionLimiterQPS(默认为0.1)
	case stateNormal:
		if nc.useTaintBasedEvictions {
			nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
		} else {
			nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
		}
		
	// 如果zone state为PartialDisruption,则调用nc.enterPartialDisruptionFunc来设置对应的rate limiter。
	case statePartialDisruption:
		if nc.useTaintBasedEvictions {
			nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
				nc.enterPartialDisruptionFunc(zoneSize))
		} else {
			nc.zonePodEvictor[zone].SwapLimiter(
				nc.enterPartialDisruptionFunc(zoneSize))
		}
		
	// 如果zone state为FullDisruption,则调用nc.enterFullDisruptionFunc来设置对应的rate limiter。
	case stateFullDisruption:
		if nc.useTaintBasedEvictions {
			nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
				nc.enterFullDisruptionFunc(zoneSize))
		} else {
			nc.zonePodEvictor[zone].SwapLimiter(
				nc.enterFullDisruptionFunc(zoneSize))
		}
	}
}

nc.enterFullDisruptionFunc和nc.enterPartialDisruptionFunc是在调用NewNodeController创建Node Controller的时候赋值注册的:

代码语言:javascript
复制
pkg/controller/node/nodecontroller.go:270
func NewNodeController(...) (*NodeController, error) {
    ...
    nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
    nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
    ...
}    

pkg/controller/node/nodecontroller.go:1132
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
	return nc.evictionLimiterQPS
}

// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
	if int32(nodeNum) > nc.largeClusterThreshold {
		return nc.secondaryEvictionLimiterQPS
	}
	return 0
}

因此setLimiterInZone的逻辑为:

  • zone state为PartialDisruption时,设置Tainter Queue或者Pod Evictor Queue的rate limiter为:
    • 如果当前zone size大于nc.largeClusterThreshold(默认为50),则设置为secondaryEvictionLimiterQPS(默认为0.01)
    • 否则设置为0
  • zone state为FullDisruption时,设置Tainter Queue或者Pod Evictor Queue的rate limiter为evictionLimiterQPS(默认0.1)
  • 如果zone state为normal,则设置Tainter Queue或者Pod Evictor Queue的rate limiter为evictionLimiterQPS(默认为0.1)

Run TaintManager

在Node Controller的Run方法中,接着会启动Tainter Manager:

代码语言:javascript
复制
if nc.runTaintManager {
	go nc.taintManager.Run(wait.NeverStop)
}

nc.runTaintManager通过--enable-taint-manager设置,默认为true,因此默认情况下都会启动Taint Manager。

接下来,我们看看Taint Manager Run方法中都做了些什么。

代码语言:javascript
复制
pkg/controller/node/taint_controller.go:179

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
	glog.V(0).Infof("Starting NoExecuteTaintManager")
	// Functions that are responsible for taking work items out of the workqueues and putting them
	// into channels.
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.nodeUpdateQueue.Get()
			if shutdown {
				break
			}
			nodeUpdate := item.(*nodeUpdateItem)
			select {
			case <-stopCh:
				break
			case tc.nodeUpdateChannel <- nodeUpdate:
			}
		}
	}(stopCh)

	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.podUpdateQueue.Get()
			if shutdown {
				break
			}
			podUpdate := item.(*podUpdateItem)
			select {
			case <-stopCh:
				break
			case tc.podUpdateChannel <- podUpdate:
			}
		}
	}(stopCh)

	// When processing events we want to prioritize Node updates over Pod updates,
	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
	// start evicting Pods from tainted Nodes.
	for {
		select {
		case <-stopCh:
			break
		case nodeUpdate := <-tc.nodeUpdateChannel:
			tc.handleNodeUpdate(nodeUpdate)
		case podUpdate := <-tc.podUpdateChannel:
			// If we found a Pod update we need to empty Node queue first.
		priority:
			for {
				select {
				case nodeUpdate := <-tc.nodeUpdateChannel:
					tc.handleNodeUpdate(nodeUpdate)
				default:
					break priority
				}
			}
			// After Node queue is emptied we process podUpdate.
			tc.handlePodUpdate(podUpdate)
		}
	}
}
  • 启动一个goroutine从NoExecuteTaintManager的nodeUpdateQueue中获取nodeUpdate,并扔给tc.nodeUpdateChannel。
  • 启动一个goroutine从NoExecuteTaintManager的podUpdateQueue中获取podUpdate,并扔给tc.podUpdateChannel。
  • 调用tc.handleNodeUpdate处理完所有nodeUpdate后,才会去调用tc.handlePodUpdate开始处理podUpdate。

关于NoExecuteTaintManager的handleNodeUpdate和handlePodUpdate,我会在之后专门对Taint Manager写一篇博客进行分析,在此就不会再深入下去。

doTaintingPass

如果useTaintBasedEvictions为true,即--feature-gates中指定TaintBasedEvictions为true(默认TaintBasedEvictions=false)则每隔100ms调用一次doTaintingPass。

doTaintingPass就是根据Node Condition是NotReady或者Unknown,调apiserver,分别给node打上对应的Taint:node.alpha.kubernetes.io/notReadynode.alpha.kubernetes.io/unreachable

代码语言:javascript
复制
if nc.useTaintBasedEvictions {
	// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
	// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
	go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
}

doEvictionPass

如果useTaintBasedEvictions为false(默认TaintBasedEvictions=false)则每隔100ms调用一次doEvictionPass。

代码语言:javascript
复制
else {
	// Managing eviction of nodes:
	// When we delete pods off a node, if the node was not empty at the time we then
	// queue an eviction watcher. If we hit an error, retry deletion.
	go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}

我们接着来看doEvictionPass的代码:

代码语言:javascript
复制
pkg/controller/node/nodecontroller.go:481

func (nc *NodeController) doEvictionPass() {
	...
	
	// 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,然后调用deletePods删除node上的所有pods(deamonSet对应的Pod除外)
	for k := range nc.zonePodEvictor {
		// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
		nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
			node, err := nc.nodeLister.Get(value.Value)
			...
			nodeUid, _ := value.UID.(string)
			remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
			...
			return true, 0
		})
	}
}

doEvictionPass的逻辑:

  • 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,
  • 然后调用deletePods删除node上的所有pods(deamonSet对应的Pod除外)

deletePods的代码如下,

代码语言:javascript
复制
pkg/controller/node/controller_utils.go:49

// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
	...
	// 从apiserver中获取所有的pods对象。
	pods, err := kubeClient.Core().Pods(metav1.NamespaceAll).List(options)
	...

    // 逐个遍历pods中的pod,筛选出该node上的pods
	for _, pod := range pods.Items {
		// Defensive check, also needed for tests.
		if pod.Spec.NodeName != nodeName {
			continue
		}

		...
		
		// if the pod has already been marked for deletion, we still return true that there are remaining pods.
		if pod.DeletionGracePeriodSeconds != nil {
			remaining = true
			continue
		}
		
		// if the pod is managed by a daemonset, ignore it
		_, err := daemonStore.GetPodDaemonSets(&pod)
		if err == nil { // No error means at least one daemonset was found
			continue
		}

		...
		
		// 调用apiserver接口删除pod
		if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
			return false, err
		}
		remaining = true
	}

	...
	return remaining, nil
}

deletePods的主要逻辑如下:

  • 从apiserver中获取所有的pods对象。
  • 逐个遍历pods中的pod,筛选出该node上的pods
  • 如果pod已经被标记为删除(pod.DeletionGracePeriodSeconds != nil ),我们跳过这个pod.
  • 如果pod是某个daemonset的pod,我们跳过这个pod。
  • 除此之外,调用apiserver接口删除pod。

至此,Node Controller Run方法的所有主要逻辑都已分析完成。

其中涉及到Taint Manager Run的相关逻辑,在该博文没有深入进去分析,我将在后续博文中对Taint Manager做一次专门的分析。

关于Node Controller的其他博文:

  • Kubernetes Node Controller源码分析之执行篇
  • Kubernetes Node Controller源码分析之创建篇
  • Kubernetes Node Controller源码分析之配置篇
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Node Controller的执行
  • WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced)
  • 启动goruntime按照5s的周期执行monitorNodeStatus,进行Node状态监控
  • Run TaintManager
  • doTaintingPass
  • doEvictionPass
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档