原 荐 剖析Kubernetes Enabl

Equivalence Class概念及其意义

2015年,google发表的关于Borg的论文“Large-scale cluster management at Google with Borg”中对Equivalence Class的描述如下:

Equivalence classes: Tasks in a Borg job usually have identical requirements and constraints, so rather than determining feasibility for every pending task on every machine, and scoring all the feasible machines, Borg only does feasibility and scoring for one task per equivalence class – a group of tasks with identical requirements.

Equivalence Class目前是用来在Kubernetes Scheduler加速Predicate,提升Scheduler的吞吐性能。Kubernetes scheduler及时维护着Equivalence Cache的数据,当某些情况发生时(比如delete node、bind pod等事件),需要立刻invalid相关的Equivalence Cache中的缓存数据。

一个Equivalence Class是用来定义一组具有相同Requirements和Constraints的Pods的相关信息的集合,在Scheduler进行Predicate阶段时可以只需对Equivalence Class中一个Pod进行Predicate,并把Predicate的结果放到Equivalence Cache中以供该Equivalence Class中其他Pods(成为Equivalent Pods)重用该结果。只有当Equivalence Cache中没有可以重用的Predicate Result才会进行正常的Predicate流程。

什么样的Pods会被归类到同一Equivalence Class呢?按照其定义,其实只要Pods有某些相同的field,比如resources requirement、label、affinity等,它们都被认为是Equivalent Pods,属于同一Equivalence Class。但是考虑到用户可能随时修改Pods的fields,会导致Scheduler需要及时更新该Pod所属的Equivalence Class变动,从而导致可能正在进行的Predicate需要感知这一变化并作出变更,这使得问题变得异常复杂。因此,目前Scheduler只把那些属于同一OwnerReference(包括RC,RS,Job, StatefulSet)的Pods归类到同一Equivalence Class,比如某个RS定义了N个副本,那么这N个副本Pods就对应一个Equivalence Class。Scheduler会为每个Equivalence Class中的Equivalent Pods计算出一个uint64 EquivalenceHash值。

注意,截止Kubernetes 1.10,即使有两个一样Pod Template的RS,也会对应两个Equivalence Class。

Equivalence Class工作原理

要想使用Equivalence Class需要启用EnableEquivalenceClassCache Feature Gate,截止Kubernetes 1.10,该Feature还是Alpha阶段。

前期我的几遍关于scheduler的博客中对Predicate的分析中提到,所有注册成功的Predicate Policy都会在scheduler.findNodesThatFit(pod, nodes, predicateFuncs ...)过程中按照一定的并行数对每个node调用scheduler.podFitsOnNode(pod, node, predicateFuncs ...)进行注册的Predicate Policys检查。

podFitsOnNode的输入是一个pod,一个node和一系列注册成功的predicateFuncs,用来检查node是否满足该pod的预选条件。加入了Equivalence Class之后,预选阶段会发生了如下变化:

  • 预选之前,先检查该pod是否有对应的Equivalence Class。
  • 如果有对应的Equivalence Class,那么接下来检查Equivalence Cache中是否有可用的Predicate Result,否则触发完整的正常预选。
  • 如果有可用的Predicate Result,那么直接使用该Cached Predicate Result完成预选,否则触发完整的正常预选。

Equivalence Cache会存储每个node的Predicates Results,是一个3层Map对象:

  • 第一层key是node name,表示节点名称;
  • 第二层key是predicateKey,表示预选策略,因此该node对应的algorithmCache Entries数量最多不超过Scheduler注册的Predicate Policies数量,这用来保证Cache大小,防止查找Equivalence Cache时性能太差。
  • 第三层key是Equivalence Hash,前面已经提到过。

比如,algorithmCache[$nodeName].predicatesCache.Get($predicateKey)[$equivalenceHash]表示$equivalenceHash对应的Pods在$nodeName节点上进行$predicateKey进行预选是否成功。

截止Kubernetes 1.10,predicateKey支持列表如下(20个):

  • MatchInterPodAffinity
  • CheckVolumeBinding
  • CheckNodeCondition
  • GeneralPredicates
  • HostName
  • PodFitsHostPorts
  • MatchNodeSelector
  • PodFitsResources
  • NoDiskConflict
  • PodToleratesNodeTaints
  • CheckNodeUnschedulable
  • PodToleratesNodeNoExecuteTaints
  • CheckNodeLabelPresence
  • CheckServiceAffinity
  • MaxEBSVolumeCount
  • MaxGCEPDVolumeCount
  • MaxAzureDiskVolumeCount
  • NoVolumeZoneConflict
  • CheckNodeMemoryPressure
  • CheckNodeDiskPressure

注意,即使该Pod找到对应的Equivalence Class,Equivalence Cache中也有可能没有可用的Predicate Result,或者对应的Predicate Result已经失效。这时就会触发正常的Predicate,并把Result写到Equivalence Cache中。

如何维护和更新Equivalence Cache呢?如果频繁的更新整个node对应的Equivalence Cache,这违背了Equivalence Cache设计的初衷,并不能提升Predicate的效率。

前面提到过Equivalence Cache的三层Map结构设计,第二层Key是predicateKey,因此Scheduler能做到只invalid单个Predicate Result,而不是盲目的invalid整个node的algorithmCache。

Scheduler会Watch相关API Objects Add/Update/Delete Event,并根据相关策略invalid对应的Equivalence Cache数据,具体的逻辑请看下面的源码分析部分。

Equivalence Class源码分析

Equivalence Cache数据结构

Equivalence Cache结构定义如下:

// EquivalenceCache holds:
// 1. a map of AlgorithmCache with node name as key
// 2. function to get equivalence pod
type EquivalenceCache struct {
	sync.RWMutex
	getEquivalencePod algorithm.GetEquivalencePodFunc
	algorithmCache    map[string]AlgorithmCache
}

// The AlgorithmCache stores PredicateMap with predicate name as key
type AlgorithmCache struct {
	// Only consider predicates for now
	predicatesCache *lru.Cache
}
  • Equivalence Cache真正的缓存数据是通过algorithmCache Map存储,其key为nodeName。
  • 每个node上的Predicate Result Cache通过AlgorithmCache.predicateCache存储,predicateCache是LRU(Least Recently Used,最少最近使用算法)Cache,只能存储一定数量的Entries,Kubernetes中指定最大值为100(Kubernetes 1.10默认实现的Predicate Funcs一共有20个)。 LRU Cache是一个Cache置换算法,含义是“最近最少使用”,当Cache满(没有空闲的cache块)时,把满足“最近最少使用”的数据从Cache中置换出去,并且保证Cache中第一个数据是最近刚刚访问的。由“局部性原理”,这样的数据更有可能被接下来的程序访问,提升性能。
  • predicateCache也是k-v存储,key为predicateKey,value为PredicateMap。
  • predicateMap的key为uint64的Equivalence Hash,value为HostPredicate。
  • HostPredicate用来表示Pod使用Predicate Policy与某个node的匹配结果,结构如下: // HostPredicate is the cached predicate result type HostPredicate struct { Fit bool FailReasons []algorithm.PredicateFailureReason }

Equivalence Cache的核心操作

  • InvalidateCachedPredicateItem:用来从Equivalence Cache中删除某个node上某个predicate policy的所有EquivalenceHash(对应Equivalent Pods)的Predicate Result缓存数据。
func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) {
	...
	if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
		for predicateKey := range predicateKeys {
			algorithmCache.predicatesCache.Remove(predicateKey)
		}
	}
	...
}
  • InvalidateCachedPredicateItemOfAllNodes:用来删除所有node上指定predicate policy集合对应的所有EquivalenceHash(对应Equivalent Pods)的Predicate Result缓存数据。
func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) {
	...
	// algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates
	for _, algorithmCache := range ec.algorithmCache {
		for predicateKey := range predicateKeys {
			// just use keys is enough
			algorithmCache.predicatesCache.Remove(predicateKey)
		}
	}
	...
}
  • PredicateWithECache:检查Equivalence Cache中的Predicate Result缓存数据是否有可用的数据,如果命中缓存,则直接根据缓存中的Predicate Result作为该pod在该node上该Predicate policy的预选结果返回。如果没命中,则返回false和失败原因。
// PredicateWithECache returns:
// 1. if fit
// 2. reasons if not fit
// 3. if this cache is invalid
// based on cached predicate results
func (ec *EquivalenceCache) PredicateWithECache(
	podName, nodeName, predicateKey string,
	equivalenceHash uint64, needLock bool,
) (bool, []algorithm.PredicateFailureReason, bool) {
	...
	if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
		if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
			predicateMap := cachePredicate.(PredicateMap)
			// TODO(resouer) Is it possible a race that cache failed to update immediately?
			if hostPredicate, ok := predicateMap[equivalenceHash]; ok {
				if hostPredicate.Fit {
					return true, []algorithm.PredicateFailureReason{}, false
				}
				return false, hostPredicate.FailReasons, false
			}
			// is invalid
			return false, []algorithm.PredicateFailureReason{}, true
		}
	}
	return false, []algorithm.PredicateFailureReason{}, true
}
  • UpdateCachedPredicateItem:当PredicateWithECache使用Predicate Result Cache数据命中失败时,scheduler会调用对应的Predicate Funcs触发真正的预选逻辑,完成之后,就通过UpdateCachedPredicateItem将刚预选的结果更新到Equivalence Cache缓存中。每个node的predicateCache的初始化也是在这里完成的。
// UpdateCachedPredicateItem updates pod predicate for equivalence class
func (ec *EquivalenceCache) UpdateCachedPredicateItem(
	podName, nodeName, predicateKey string,
	fit bool,
	reasons []algorithm.PredicateFailureReason,
	equivalenceHash uint64,
	needLock bool,
) {
	...
	if _, exist := ec.algorithmCache[nodeName]; !exist {
		ec.algorithmCache[nodeName] = newAlgorithmCache()
	}
	predicateItem := HostPredicate{
		Fit:         fit,
		FailReasons: reasons,
	}
	// if cached predicate map already exists, just update the predicate by key
	if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok {
		predicateMap := v.(PredicateMap)
		// maps in golang are references, no need to add them back
		predicateMap[equivalenceHash] = predicateItem
	} else {
		ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey,
			PredicateMap{
				equivalenceHash: predicateItem,
			})
	}
}

Equivalence Cache的初始化

Kubernetes在注册predicates、priorities、scheduler extenders时,同时也会进行Equivalence Cache的初始化,并将其传入scheduler config中。

// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
	...
	// Init equivalence class cache
	if c.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil {
		pluginArgs, err := c.getPluginArgs()
		if err != nil {
			return nil, err
		}
		c.equivalencePodCache = core.NewEquivalenceCache(
			getEquivalencePodFuncFactory(*pluginArgs),
		)
		glog.Info("Created equivalence class cache")
	}
	...
}

// NewEquivalenceCache creates a EquivalenceCache object.
func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache {
	return &EquivalenceCache{
		getEquivalencePod: getEquivalencePodFunc,
		algorithmCache:    make(map[string]AlgorithmCache),
	}
}

NewEquivalenceCache负责Equivalence Cache的初始化工作,那么getEquivalencePod又是在哪完成注册的呢?defualt algorithm provider初始化时完成注册GetEquivalencePodFunc(只能使用defualt provider?通过configfile就不行吗?),注意这里factory.PluginFactoryArgs只传入了PVCInfo。

GetEquivalencePodFunc is a function that gets a EquivalencePod from a pod.

pkg/scheduler/algorithmprovider/defaults/defaults.go:38

func init() {
	...
	// Use equivalence class to speed up heavy predicates phase.
	factory.RegisterGetEquivalencePodFunction(
		func(args factory.PluginFactoryArgs) algorithm.GetEquivalencePodFunc {
			return predicates.NewEquivalencePodGenerator(args.PVCInfo)
		},
	)
	...
}	

为什么只传入PVCInfo呢?或者为什么需要PVCInfo呢?要回答这个问题,我们先来看看EquivalencePod和getEquivalencePod的定义。

// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
type EquivalencePod struct {
	ControllerRef metav1.OwnerReference
	PVCSet        sets.String
}

EquivalencePod定义了具备哪些相同属性的Pods属于Equivalent Pods,Equivalence Hash就是根据Pod的EquivalencePod中指定的两个属性来计算的,这两个属性分别是:

  • ControllerRef:对应Pod的meta.OwnerReference,对应Pod所属的Controller Object,可以是RS,RC,Job,StatefulSet类型之一。
  • PVCSet:是Pod所引用的所有PVCs IDs集合。

因此,只有两个Pod属于同一个Controller并且引用可同样的PVCs对象才被认为是EquivalentPod,对应同一个Equivalence Hash。

getEquivalencePod根据Pod Object中的OwnerReference和PVC信息获取它所属的EquivalencePod对象。

func (e *EquivalencePodGenerator) getEquivalencePod(pod *v1.Pod) interface{} {
	for _, ref := range pod.OwnerReferences {
		if ref.Controller != nil && *ref.Controller {
			pvcSet, err := e.getPVCSet(pod)
			if err == nil {
				// A pod can only belongs to one controller, so let's return.
				return &EquivalencePod{
					ControllerRef: ref,
					PVCSet:        pvcSet,
				}
			}
			return nil
		}
	}
	return nil
}

何时生成Pod对应的Equivalence Hash

预选的入口是findNodesThatFit,也就是在findNodesThatFit中调用了getEquivalenceClassInfo计算Pod的EquivalenceHash,然后把该hash值传入podFitsOnNode中进行后续的Equivalence Class功能。

func findNodesThatFit(
	pod *v1.Pod,
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
	nodes []*v1.Node,
	predicateFuncs map[string]algorithm.FitPredicate,
	extenders []algorithm.SchedulerExtender,
	metadataProducer algorithm.PredicateMetadataProducer,
	ecache *EquivalenceCache,
	schedulingQueue SchedulingQueue,
	alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
	...

		var equivCacheInfo *equivalenceClassInfo
		if ecache != nil {
			// getEquivalenceClassInfo will return immediately if no equivalence pod found
			equivCacheInfo = ecache.getEquivalenceClassInfo(pod)
		}

		checkNode := func(i int) {
			nodeName := nodes[i].Name
			fits, failedPredicates, err := podFitsOnNode(
				pod,
				meta,
				nodeNameToInfo[nodeName],
				predicateFuncs,
				ecache,
				schedulingQueue,
				alwaysCheckAllPredicates,
				equivCacheInfo,
			)
			...
		}
		...
	}

getEquivalenceClassInfo计算pod的EquivalenceHash的原理如下:

// getEquivalenceClassInfo returns the equivalence class of given pod.
func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo {
	equivalencePod := ec.getEquivalencePod(pod)
	if equivalencePod != nil {
		hash := fnv.New32a()
		hashutil.DeepHashObject(hash, equivalencePod)
		return &equivalenceClassInfo{
			hash: uint64(hash.Sum32()),
		}
	}
	return nil
}

可见,EquivalenceHash就是对getEquivalencePod利用FNV算法进行哈希的。

Equivalent Pod的Predicate Result何时加到PredicateCache中

我们先看看podFitsOnNode的相关实现:

// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func podFitsOnNode(
	pod *v1.Pod,
	meta algorithm.PredicateMetadata,
	info *schedulercache.NodeInfo,
	predicateFuncs map[string]algorithm.FitPredicate,
	ecache *EquivalenceCache,
	queue SchedulingQueue,
	alwaysCheckAllPredicates bool,
	equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
	...
			if predicate, exist := predicateFuncs[predicateKey]; exist {
				// Use an in-line function to guarantee invocation of ecache.Unlock()
				// when the in-line function returns.
				func() {
					var invalid bool
					if eCacheAvailable {
						// Lock ecache here to avoid a race condition against cache invalidation invoked
						// in event handlers. This race has existed despite locks in equivClassCacheimplementation.
						ecache.Lock()
						defer ecache.Unlock()
						// PredicateWithECache will return its cached predicate results.
						fit, reasons, invalid = ecache.PredicateWithECache(
							pod.GetName(), info.Node().GetName(),
							predicateKey, equivCacheInfo.hash, false)
					}

					if !eCacheAvailable || invalid {
						// we need to execute predicate functions since equivalence cache does not work
						fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
						if err != nil {
							return
						}

						if eCacheAvailable {
							// Store data to update equivClassCacheafter this loop.
							if res, exists := predicateResults[predicateKey]; exists {
								res.Fit = res.Fit && fit
								res.FailReasons = append(res.FailReasons, reasons...)
								predicateResults[predicateKey] = res
							} else {
								predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
							}
							result := predicateResults[predicateKey]
							ecache.UpdateCachedPredicateItem(
								pod.GetName(), info.Node().GetName(),
								predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
						}
					}
				}()

				...
}

podFitsOnNode时会先通过PredicateWithECache检查是否Equivalence Cache中有该缓存命中:

  • 如果有命中数据可用,则对应的Predicate Policy就算处理完成。
  • 如果没有命中数据才会触发调用predicate,然后将predicate的结果通过UpdateCachedPredicateItem添加/更新到缓存中。

维护Equivalence Cache

我们回到Scheduler Config Factory,看看Scheduler中podInformer、nodeInformer、serviceInformer、pvcInformer等注册的EventHandler中对Equivalence Cache的操作。

Assume Pod

当完成pod的调度后,在Bind Node之前,会先进行Pod Assume,在Assume过程中,会对Equivalence Cache有操作。

// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
	...

	// Optimistically assume that the binding will succeed, so we need to invalidate affected
	// predicates in equivalence cache.
	// If the binding fails, these invalidated item will not break anything.
	if sched.config.Ecache != nil {
		sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
	}
	return nil
}

Assume Pod时调用InvalidateCachedPredicateItemForPodAdd对Equivalence Cache进行操作。

func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
	// GeneralPredicates: will always be affected by adding a new pod
	invalidPredicates := sets.NewString("GeneralPredicates")

	// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision.
	for _, vol := range pod.Spec.Volumes {
		if vol.PersistentVolumeClaim != nil {
			invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount")
		} else {
			if vol.AWSElasticBlockStore != nil {
				invalidPredicates.Insert("MaxEBSVolumeCount")
			}
			if vol.GCEPersistentDisk != nil {
				invalidPredicates.Insert("MaxGCEPDVolumeCount")
			}
			if vol.AzureDisk != nil {
				invalidPredicates.Insert("MaxAzureDiskVolumeCount")
			}
		}
	}
	ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
}

InvalidateCachedPredicateItemForPodAdd中可以看出,Assume Pod会删除该node上以下predicateKey对应的predicateCache:

  • GeneralPredicates;
  • 如果该pod中引用了PVCs,则会删除"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount"这些PredicateCaches;
  • 如果pod volume中使用了AWSElasticBlockStore,则会删除MaxEBSVolumeCount PredicateCache;
  • 如果pod volume中使用了GCEPersistentDisk,则会删除MaxGCEPDVolumeCount PredicateCache;
  • 如果pod volume中使用了AzureDisk,则会删除MaxAzureDiskVolumeCount PredicateCache;

Update Pod in Scheduled Pod Cache

在scheduler进行NewConfigFactory时,注册Update assignedNonTerminatedPod Event Handler为updatePodInCache。

func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
	...
	c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
	c.podQueue.AssignedPodUpdated(newPod)
}

func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
	if c.enableEquivalenceClassCache {
		// if the pod does not have bound node, updating equivalence cache is meaningless;
		// if pod's bound node has been changed, that case should be handled by pod add & delete.
		if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
			if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
				// MatchInterPodAffinity need to be reconsidered for this node,
				// as well as all nodes in its same failure domain.
				c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
					matchInterPodAffinitySet)
			}
			// if requested container resource changed, invalidate GeneralPredicates of this node
			if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
				predicates.GetResourceRequest(oldPod)) {
				c.equivalencePodCache.InvalidateCachedPredicateItem(
					newPod.Spec.NodeName, generalPredicatesSets)
			}
		}
	}
}

updatePodInCache调用invalidateCachedPredicatesOnUpdatePod对Equivalence Cache做了如下处理:

  • 如果pod Labels做了更新,那么会删除所有nodes上Equivalence Cache中的MatchInterPodAffinity PredicateCache;
  • 如果pod的resource request做了更新,那么会删除该node上Equivalence Cache中的GeneralPredicates PredicateCache;

Delete Pod in Scheduled Pod Cache

同样的,当发生删除assignedNonTerminatedPod时,对应会调用invalidateCachedPredicatesOnDeletePod更新Equivalence Cache。

func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
	if c.enableEquivalenceClassCache {
		// part of this case is the same as pod add.
		c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName)
		// MatchInterPodAffinity need to be reconsidered for this node,
		// as well as all nodes in its same failure domain.
		// TODO(resouer) can we just do this for nodes in the same failure domain
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
			matchInterPodAffinitySet)

		// if this pod have these PV, cached result of disk conflict will become invalid.
		for _, volume := range pod.Spec.Volumes {
			if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
				volume.RBD != nil || volume.ISCSI != nil {
				c.equivalencePodCache.InvalidateCachedPredicateItem(
					pod.Spec.NodeName, noDiskConflictSet)
			}
		}
	}
}

invalidateCachedPredicatesOnDeletePod更新Equivalence Cache的处理总结为:

  • 删除该node上Equivalence Cache中的GeneralPredicates PredicateCache;
  • 如果该pod中引用了PVCs,则会删除该node上Equivalence Cache中的"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount"这些PredicateCaches;
  • 如果pod volume中使用了AWSElasticBlockStore,则会删除该node上Equivalence Cache中的MaxEBSVolumeCount PredicateCache;
  • 如果pod volume中使用了GCEPersistentDisk,则会删除该node上Equivalence Cache中的MaxGCEPDVolumeCount PredicateCache;
  • 如果pod volume中使用了AzureDisk,则会删除该node上Equivalence Cache中的MaxAzureDiskVolumeCount PredicateCache;
  • 删除所有nodes上Equivalence Cache中的MatchInterPodAffinity PredicateCache;
  • 如果pod的resource request做了更新,那么会删除该node上Equivalence Cache中的GeneralPredicates PredicateCache;
  • 如果pod volume中引用了GCEPersistentDisk、AWSElasticBlockStore、RBD、ISCSI之一,则删除该node上Equivalence Cache中的NoDiskConflict PredicateCache。

Update Node

当发生node update event时,对应会调用invalidateCachedPredicatesOnNodeUpdate更新Equivalence Cache。

func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
	if c.enableEquivalenceClassCache {
		// Begin to update equivalence cache based on node update
		// TODO(resouer): think about lazily initialize this set
		invalidPredicates := sets.NewString()

		if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
			invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources"
		}
		if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) {
			invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches"
			for k, v := range oldNode.GetLabels() {
				// any label can be topology key of pod, we have to invalidate in all cases
				if v != newNode.GetLabels()[k] {
					invalidPredicates.Insert(predicates.MatchInterPodAffinityPred)
				}
				// NoVolumeZoneConflict will only be affected by zone related label change
				if isZoneRegionLabel(k) {
					if v != newNode.GetLabels()[k] {
						invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
					}
				}
			}
		}

		oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations())
		if oldErr != nil {
			glog.Errorf("Failed to get taints from old node annotation for equivalence cache")
		}
		newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations())
		if newErr != nil {
			glog.Errorf("Failed to get taints from new node annotation for equivalence cache")
		}
		if !reflect.DeepEqual(oldTaints, newTaints) ||
			!reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) {
			invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred)
		}

		if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) {
			oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
			newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
			for _, cond := range oldNode.Status.Conditions {
				oldConditions[cond.Type] = cond.Status
			}
			for _, cond := range newNode.Status.Conditions {
				newConditions[cond.Type] = cond.Status
			}
			if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] {
				invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred)
			}
			if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] {
				invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred)
			}
			if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] ||
				oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] ||
				oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] {
				invalidPredicates.Insert(predicates.CheckNodeConditionPred)
			}
		}
		if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable {
			invalidPredicates.Insert(predicates.CheckNodeConditionPred)
		}
		c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates)
	}
}

因此,node update时,会删除该node对应的Equivalence Cache中如下PredicateKey的PredicateCache:

  • GeneralPredicates, 前提:node.Status.Allocatable或node labels发生变更.
  • ServiceAffinity, 前提:node labels发生变更。
  • MatchInterPodAffinity, 前提:node labels发生变更。
  • NoVolumeZoneConflict, 前提:failure-domain.beta.kubernetes.io/zone或failure-domain.beta.kubernetes.io/region Annotation发生变更;
  • PodToleratesNodeTaints, 前提: Node的Taints(对应scheduler.alpha.kubernetes.io/taints Annotation)发生变更.
  • CheckNodeMemoryPressure, CheckNodeDiskPressure, CheckNodeCondition, 前提:如果对应的Node Condition发生变更。

Delete Node

当发生node delete event时,对应会调用InvalidateAllCachedPredicateItemOfNode更新Equivalence Cache。

// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid
func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) {
	ec.Lock()
	defer ec.Unlock()
	delete(ec.algorithmCache, nodeName)
	glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
}

因此,node delete时,则会从Equivalence Cache中删除整个node对应的algorthmCache。

Add or Delete PV

当发生pv add或者delete event时,对应会调用invalidatePredicatesForPv更新Equivalence Cache。

func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
	// You could have a PVC that points to a PV, but the PV object doesn't exist.
	// So when the PV object gets added, we can recount.
	invalidPredicates := sets.NewString()

	// PV types which impact MaxPDVolumeCountPredicate
	if pv.Spec.AWSElasticBlockStore != nil {
		invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
	}
	if pv.Spec.GCEPersistentDisk != nil {
		invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
	}
	if pv.Spec.AzureDisk != nil {
		invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
	}

	// If PV contains zone related label, it may impact cached NoVolumeZoneConflict
	for k := range pv.Labels {
		if isZoneRegionLabel(k) {
			invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
			break
		}
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Add/delete impacts the available PVs to choose from
		invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
	}

	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

因此,当add或者delete PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • MaxEBSVolumeCount, MaxGCEPDVolumeCount, MaxAzureDiskVolumeCount,前提:PV类型是这三者的范围内;

Update PV

当发生pv update event时,对应会调用invalidatePredicatesForPvUpdate更新Equivalence Cache。

func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
	invalidPredicates := sets.NewString()
	for k, v := range newPV.Labels {
		// If PV update modifies the zone/region labels.
		if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) {
			invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
			break
		}
	}
	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

因此,当update PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • NoVolumeZoneConflict, 前提:PV的failure-domain.beta.kubernetes.io/zone或failure-domain.beta.kubernetes.io/region Annotation发生变更;

Add or Delete PVC

func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
	// We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod

	// The bound volume type may change
	invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...)

	// The bound volume's label may change
	invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Add/delete impacts the available PVs to choose from
		invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
	}
	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

当发生pvc add或者delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount" PredicateCaches;
  • NoVolumeZoneConflict PredicateCaches;
  • CheckVolumeBinding,前提,VolumeScheduling这个Feature Gate是启用状态;

Update PVC

func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
	invalidPredicates := sets.NewString()

	if old.Spec.VolumeName != new.Spec.VolumeName {
		if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
			// PVC volume binding has changed
			invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
		}
		// The bound volume type may change
		invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
	}

	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

当发生pvc update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • CheckVolumeBinding,前提:VolumeScheduling这个Feature Gate是启用状态,并且PVC对应的PV发生变更;
  • "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount" PredicateCaches,前提:PVC对应的PV发生变更;

Add or Delete Service

func (c *configFactory) onServiceAdd(obj interface{}) {
	if c.enableEquivalenceClassCache {
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
	}
	c.podQueue.MoveAllToActiveQueue()
}

func (c *configFactory) onServiceDelete(obj interface{}) {
	if c.enableEquivalenceClassCache {
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
	}
	c.podQueue.MoveAllToActiveQueue()
}

当发生Service Add或Delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • CheckServiceAffinity;

Update Service

func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
	if c.enableEquivalenceClassCache {
		// TODO(resouer) We may need to invalidate this for specified group of pods only
		oldService := oldObj.(*v1.Service)
		newService := newObj.(*v1.Service)
		if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
			c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
		}
	}
	c.podQueue.MoveAllToActiveQueue()
}

当发生Service Update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

  • CheckServiceAffinity,前提:Service的Selector发生变更。

Equivalence Class的不足

  • Equivalence Class Feature最困难的就是如何最优的维护和更新Equivalence Cache,做到每次更新都是最小粒度的、准确无误的,目前这方面还需优化。
  • Equivalence Cache只缓存Predicate Result,并不支持Priority Result数据的缓存和维护(社区正在实现基于Map-Reduce方式优化),通常情况下,Priority Funcs的处理逻辑要比Predicate Funcs复杂,支持的意义就更大。
  • Equivalence Class目前只能根据Pod对应的OwnerReference和PVC信息进行Equivalence Hash,如果能摒弃OwnerReference的考虑,充分考虑Pod spec中那些核心的field,比如resource request, Labels,Affinity等,缓存命中的几率可能会大的多,Predicate的性能就能得到更显著的提升。

总结

Equivalence Class是用来给Kubernetes Scheduler加速Predicate,从而提升Scheduler的吞吐性能。当然,普通用户其实无需关注Equivalence Class Feature,因为目前的scheduler性能对大部分用户来说已经足够了,但对于有大规模AI训练场景的用户,可以多关注它。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

利用message queue实现aspx与winform通信, 并附完整示例

先在本机(本文中的计算机名为jimmyibm)安装message queue(消息队列),别告诉我你不会安装.(就跟安装IIS差不多的) 安装好后,打开 "计算...

1898
来自专栏DOTNET

asp.net web api 下载之断点续传

一、基本思想 利用 HTTP 请求的Range标头值,来向服务端传递请求数据的开始位置和结束位置。服务端获得这两个参数后,将指定范围内的数据传递给客户端。当客户...

44312
来自专栏AhDung

C#程序防多开又一法

在Main()方法开始时遍历所有进程,获取每个进程的程序集GUID和PID,若发现有跟自己GUID相同且PID不同的进程,就勒令自身退出。

1073
来自专栏一个爱瞎折腾的程序猿

asp.net core封装layui组件示例分享

用什么封装?这里只是用了TagHelper,是啥?自己瞅[文档](https://docs.microsoft.com/zh-cn/aspnet/core/mv...

951
来自专栏林德熙的博客

WPF 禁用实时触摸

微软想把 WPF 作为 win7 的触摸好用的框架,所以微软做了很多特殊的兼容。为了获得真实的触摸消息,微软提供了 OnStylusDown, OnStylus...

761
来自专栏别先生

kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instan

3972
来自专栏Flutter&Dart

DartVM服务器开发(第十七天)--Jaguar_websocket结合Flutter搭建简单聊天室

我们这里定义了一个ChatMessageData,如果你想需要更多字段,可以再添加

1581
来自专栏Kubernetes

runC源码分析——主体调用链

本文将简单的对runC的源码调用主体逻辑进行梳理,为跟系统的阅读runC源码。 ##runC总体调用逻辑 下图中,runC源码逻辑跳转流程总体上分为三步: ma...

3548
来自专栏ASP.NETCore

在ASP.NET CORE 2.0使用SignalR技术

上次讲SignalR还是在《在ASP.NET Core下使用SignalR技术》文章中提到,ASP.NET Core 1.x.x 版本发布中并没有包含Signa...

653
来自专栏葡萄城控件技术团队

Silverlight DataGrid使用WCF RIA Service实现Load-on-demand的数据加载

在Windows或者ASP.NET Web应用程序中,我们经常可以看到在Grid控件上通过Load-on-demand的方式来提高系统性能,提升用户体验。 所谓...

1775

扫码关注云+社区