2015年,google发表的关于Borg的论文“Large-scale cluster management at Google with Borg”中对Equivalence Class的描述如下:
Equivalence classes: Tasks in a Borg job usually have identical requirements and constraints, so rather than determining feasibility for every pending task on every machine, and scoring all the feasible machines, Borg only does feasibility and scoring for one task per equivalence class – a group of tasks with identical requirements.
Equivalence Class目前是用来在Kubernetes Scheduler加速Predicate,提升Scheduler的吞吐性能。Kubernetes scheduler及时维护着Equivalence Cache的数据,当某些情况发生时(比如delete node、bind pod等事件),需要立刻invalid相关的Equivalence Cache中的缓存数据。
一个Equivalence Class是用来定义一组具有相同Requirements和Constraints的Pods的相关信息的集合,在Scheduler进行Predicate阶段时可以只需对Equivalence Class中一个Pod进行Predicate,并把Predicate的结果放到Equivalence Cache中以供该Equivalence Class中其他Pods(成为Equivalent Pods)重用该结果。只有当Equivalence Cache中没有可以重用的Predicate Result才会进行正常的Predicate流程。
什么样的Pods会被归类到同一Equivalence Class呢?按照其定义,其实只要Pods有某些相同的field,比如resources requirement、label、affinity等,它们都被认为是Equivalent Pods,属于同一Equivalence Class。但是考虑到用户可能随时修改Pods的fields,会导致Scheduler需要及时更新该Pod所属的Equivalence Class变动,从而导致可能正在进行的Predicate需要感知这一变化并作出变更,这使得问题变得异常复杂。因此,目前Scheduler只把那些属于同一OwnerReference(包括RC,RS,Job, StatefulSet)的Pods归类到同一Equivalence Class,比如某个RS定义了N个副本,那么这N个副本Pods就对应一个Equivalence Class。Scheduler会为每个Equivalence Class中的Equivalent Pods计算出一个uint64 EquivalenceHash值。
注意,截止Kubernetes 1.10,即使有两个一样Pod Template的RS,也会对应两个Equivalence Class。
要想使用Equivalence Class需要启用EnableEquivalenceClassCache Feature Gate,截止Kubernetes 1.10,该Feature还是Alpha阶段。
前期我的几遍关于scheduler的博客中对Predicate的分析中提到,所有注册成功的Predicate Policy都会在scheduler.findNodesThatFit(pod, nodes, predicateFuncs ...)
过程中按照一定的并行数对每个node调用scheduler.podFitsOnNode(pod, node, predicateFuncs ...)
进行注册的Predicate Policys检查。
podFitsOnNode的输入是一个pod,一个node和一系列注册成功的predicateFuncs,用来检查node是否满足该pod的预选条件。加入了Equivalence Class之后,预选阶段会发生了如下变化:
Equivalence Cache会存储每个node的Predicates Results,是一个3层Map对象:
比如,algorithmCache[$nodeName].predicatesCache.Get($predicateKey)[$equivalenceHash]
表示$equivalenceHash
对应的Pods在$nodeName
节点上进行$predicateKey
进行预选是否成功。
截止Kubernetes 1.10,predicateKey支持列表如下(20个):
注意,即使该Pod找到对应的Equivalence Class,Equivalence Cache中也有可能没有可用的Predicate Result,或者对应的Predicate Result已经失效。这时就会触发正常的Predicate,并把Result写到Equivalence Cache中。
如何维护和更新Equivalence Cache呢?如果频繁的更新整个node对应的Equivalence Cache,这违背了Equivalence Cache设计的初衷,并不能提升Predicate的效率。
前面提到过Equivalence Cache的三层Map结构设计,第二层Key是predicateKey,因此Scheduler能做到只invalid单个Predicate Result,而不是盲目的invalid整个node的algorithmCache。
Scheduler会Watch相关API Objects Add/Update/Delete Event,并根据相关策略invalid对应的Equivalence Cache数据,具体的逻辑请看下面的源码分析部分。
Equivalence Cache结构定义如下:
// EquivalenceCache holds:
// 1. a map of AlgorithmCache with node name as key
// 2. function to get equivalence pod
type EquivalenceCache struct {
sync.RWMutex
getEquivalencePod algorithm.GetEquivalencePodFunc
algorithmCache map[string]AlgorithmCache
}
// The AlgorithmCache stores PredicateMap with predicate name as key
type AlgorithmCache struct {
// Only consider predicates for now
predicatesCache *lru.Cache
}
func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) {
...
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
for predicateKey := range predicateKeys {
algorithmCache.predicatesCache.Remove(predicateKey)
}
}
...
}
func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) {
...
// algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates
for _, algorithmCache := range ec.algorithmCache {
for predicateKey := range predicateKeys {
// just use keys is enough
algorithmCache.predicatesCache.Remove(predicateKey)
}
}
...
}
// PredicateWithECache returns:
// 1. if fit
// 2. reasons if not fit
// 3. if this cache is invalid
// based on cached predicate results
func (ec *EquivalenceCache) PredicateWithECache(
podName, nodeName, predicateKey string,
equivalenceHash uint64, needLock bool,
) (bool, []algorithm.PredicateFailureReason, bool) {
...
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
predicateMap := cachePredicate.(PredicateMap)
// TODO(resouer) Is it possible a race that cache failed to update immediately?
if hostPredicate, ok := predicateMap[equivalenceHash]; ok {
if hostPredicate.Fit {
return true, []algorithm.PredicateFailureReason{}, false
}
return false, hostPredicate.FailReasons, false
}
// is invalid
return false, []algorithm.PredicateFailureReason{}, true
}
}
return false, []algorithm.PredicateFailureReason{}, true
}
// UpdateCachedPredicateItem updates pod predicate for equivalence class
func (ec *EquivalenceCache) UpdateCachedPredicateItem(
podName, nodeName, predicateKey string,
fit bool,
reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64,
needLock bool,
) {
...
if _, exist := ec.algorithmCache[nodeName]; !exist {
ec.algorithmCache[nodeName] = newAlgorithmCache()
}
predicateItem := HostPredicate{
Fit: fit,
FailReasons: reasons,
}
// if cached predicate map already exists, just update the predicate by key
if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok {
predicateMap := v.(PredicateMap)
// maps in golang are references, no need to add them back
predicateMap[equivalenceHash] = predicateItem
} else {
ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey,
PredicateMap{
equivalenceHash: predicateItem,
})
}
}
Kubernetes在注册predicates、priorities、scheduler extenders时,同时也会进行Equivalence Cache的初始化,并将其传入scheduler config中。
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
...
// Init equivalence class cache
if c.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil {
pluginArgs, err := c.getPluginArgs()
if err != nil {
return nil, err
}
c.equivalencePodCache = core.NewEquivalenceCache(
getEquivalencePodFuncFactory(*pluginArgs),
)
glog.Info("Created equivalence class cache")
}
...
}
// NewEquivalenceCache creates a EquivalenceCache object.
func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache {
return &EquivalenceCache{
getEquivalencePod: getEquivalencePodFunc,
algorithmCache: make(map[string]AlgorithmCache),
}
}
NewEquivalenceCache负责Equivalence Cache的初始化工作,那么getEquivalencePod又是在哪完成注册的呢?defualt algorithm provider初始化时完成注册GetEquivalencePodFunc(只能使用defualt provider?通过configfile就不行吗?),注意这里factory.PluginFactoryArgs
只传入了PVCInfo。
GetEquivalencePodFunc is a function that gets a EquivalencePod from a pod.
pkg/scheduler/algorithmprovider/defaults/defaults.go:38
func init() {
...
// Use equivalence class to speed up heavy predicates phase.
factory.RegisterGetEquivalencePodFunction(
func(args factory.PluginFactoryArgs) algorithm.GetEquivalencePodFunc {
return predicates.NewEquivalencePodGenerator(args.PVCInfo)
},
)
...
}
为什么只传入PVCInfo呢?或者为什么需要PVCInfo呢?要回答这个问题,我们先来看看EquivalencePod和getEquivalencePod的定义。
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
type EquivalencePod struct {
ControllerRef metav1.OwnerReference
PVCSet sets.String
}
EquivalencePod定义了具备哪些相同属性的Pods属于Equivalent Pods,Equivalence Hash就是根据Pod的EquivalencePod中指定的两个属性来计算的,这两个属性分别是:
因此,只有两个Pod属于同一个Controller并且引用可同样的PVCs对象才被认为是EquivalentPod,对应同一个Equivalence Hash。
getEquivalencePod根据Pod Object中的OwnerReference和PVC信息获取它所属的EquivalencePod对象。
func (e *EquivalencePodGenerator) getEquivalencePod(pod *v1.Pod) interface{} {
for _, ref := range pod.OwnerReferences {
if ref.Controller != nil && *ref.Controller {
pvcSet, err := e.getPVCSet(pod)
if err == nil {
// A pod can only belongs to one controller, so let's return.
return &EquivalencePod{
ControllerRef: ref,
PVCSet: pvcSet,
}
}
return nil
}
}
return nil
}
预选的入口是findNodesThatFit,也就是在findNodesThatFit中调用了getEquivalenceClassInfo计算Pod的EquivalenceHash,然后把该hash值传入podFitsOnNode中进行后续的Equivalence Class功能。
func findNodesThatFit(
pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
nodes []*v1.Node,
predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
schedulingQueue SchedulingQueue,
alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
...
var equivCacheInfo *equivalenceClassInfo
if ecache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivCacheInfo = ecache.getEquivalenceClassInfo(pod)
}
checkNode := func(i int) {
nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
nodeNameToInfo[nodeName],
predicateFuncs,
ecache,
schedulingQueue,
alwaysCheckAllPredicates,
equivCacheInfo,
)
...
}
...
}
getEquivalenceClassInfo计算pod的EquivalenceHash的原理如下:
// getEquivalenceClassInfo returns the equivalence class of given pod.
func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo {
equivalencePod := ec.getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return &equivalenceClassInfo{
hash: uint64(hash.Sum32()),
}
}
return nil
}
可见,EquivalenceHash就是对getEquivalencePod利用FNV算法进行哈希的。
我们先看看podFitsOnNode的相关实现:
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache,
queue SchedulingQueue,
alwaysCheckAllPredicates bool,
equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
...
if predicate, exist := predicateFuncs[predicateKey]; exist {
// Use an in-line function to guarantee invocation of ecache.Unlock()
// when the in-line function returns.
func() {
var invalid bool
if eCacheAvailable {
// Lock ecache here to avoid a race condition against cache invalidation invoked
// in event handlers. This race has existed despite locks in equivClassCacheimplementation.
ecache.Lock()
defer ecache.Unlock()
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(
pod.GetName(), info.Node().GetName(),
predicateKey, equivCacheInfo.hash, false)
}
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return
}
if eCacheAvailable {
// Store data to update equivClassCacheafter this loop.
if res, exists := predicateResults[predicateKey]; exists {
res.Fit = res.Fit && fit
res.FailReasons = append(res.FailReasons, reasons...)
predicateResults[predicateKey] = res
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
}
result := predicateResults[predicateKey]
ecache.UpdateCachedPredicateItem(
pod.GetName(), info.Node().GetName(),
predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
}
}
}()
...
}
podFitsOnNode时会先通过PredicateWithECache检查是否Equivalence Cache中有该缓存命中:
我们回到Scheduler Config Factory,看看Scheduler中podInformer、nodeInformer、serviceInformer、pvcInformer等注册的EventHandler中对Equivalence Cache的操作。
当完成pod的调度后,在Bind Node之前,会先进行Pod Assume,在Assume过程中,会对Equivalence Cache有操作。
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
...
// Optimistically assume that the binding will succeed, so we need to invalidate affected
// predicates in equivalence cache.
// If the binding fails, these invalidated item will not break anything.
if sched.config.Ecache != nil {
sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
}
return nil
}
Assume Pod时调用InvalidateCachedPredicateItemForPodAdd对Equivalence Cache进行操作。
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// GeneralPredicates: will always be affected by adding a new pod
invalidPredicates := sets.NewString("GeneralPredicates")
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision.
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount")
} else {
if vol.AWSElasticBlockStore != nil {
invalidPredicates.Insert("MaxEBSVolumeCount")
}
if vol.GCEPersistentDisk != nil {
invalidPredicates.Insert("MaxGCEPDVolumeCount")
}
if vol.AzureDisk != nil {
invalidPredicates.Insert("MaxAzureDiskVolumeCount")
}
}
}
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
}
InvalidateCachedPredicateItemForPodAdd中可以看出,Assume Pod会删除该node上以下predicateKey对应的predicateCache:
在scheduler进行NewConfigFactory时,注册Update assignedNonTerminatedPod Event Handler为updatePodInCache。
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
...
c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
c.podQueue.AssignedPodUpdated(newPod)
}
func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
if c.enableEquivalenceClassCache {
// if the pod does not have bound node, updating equivalence cache is meaningless;
// if pod's bound node has been changed, that case should be handled by pod add & delete.
if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
// MatchInterPodAffinity need to be reconsidered for this node,
// as well as all nodes in its same failure domain.
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
matchInterPodAffinitySet)
}
// if requested container resource changed, invalidate GeneralPredicates of this node
if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
predicates.GetResourceRequest(oldPod)) {
c.equivalencePodCache.InvalidateCachedPredicateItem(
newPod.Spec.NodeName, generalPredicatesSets)
}
}
}
}
updatePodInCache调用invalidateCachedPredicatesOnUpdatePod对Equivalence Cache做了如下处理:
同样的,当发生删除assignedNonTerminatedPod时,对应会调用invalidateCachedPredicatesOnDeletePod更新Equivalence Cache。
func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
if c.enableEquivalenceClassCache {
// part of this case is the same as pod add.
c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName)
// MatchInterPodAffinity need to be reconsidered for this node,
// as well as all nodes in its same failure domain.
// TODO(resouer) can we just do this for nodes in the same failure domain
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
matchInterPodAffinitySet)
// if this pod have these PV, cached result of disk conflict will become invalid.
for _, volume := range pod.Spec.Volumes {
if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
volume.RBD != nil || volume.ISCSI != nil {
c.equivalencePodCache.InvalidateCachedPredicateItem(
pod.Spec.NodeName, noDiskConflictSet)
}
}
}
}
invalidateCachedPredicatesOnDeletePod更新Equivalence Cache的处理总结为:
当发生node update event时,对应会调用invalidateCachedPredicatesOnNodeUpdate更新Equivalence Cache。
func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
if c.enableEquivalenceClassCache {
// Begin to update equivalence cache based on node update
// TODO(resouer): think about lazily initialize this set
invalidPredicates := sets.NewString()
if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources"
}
if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) {
invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches"
for k, v := range oldNode.GetLabels() {
// any label can be topology key of pod, we have to invalidate in all cases
if v != newNode.GetLabels()[k] {
invalidPredicates.Insert(predicates.MatchInterPodAffinityPred)
}
// NoVolumeZoneConflict will only be affected by zone related label change
if isZoneRegionLabel(k) {
if v != newNode.GetLabels()[k] {
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
}
}
}
}
oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations())
if oldErr != nil {
glog.Errorf("Failed to get taints from old node annotation for equivalence cache")
}
newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations())
if newErr != nil {
glog.Errorf("Failed to get taints from new node annotation for equivalence cache")
}
if !reflect.DeepEqual(oldTaints, newTaints) ||
!reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) {
invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred)
}
if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) {
oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
for _, cond := range oldNode.Status.Conditions {
oldConditions[cond.Type] = cond.Status
}
for _, cond := range newNode.Status.Conditions {
newConditions[cond.Type] = cond.Status
}
if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] {
invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred)
}
if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] {
invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred)
}
if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] ||
oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] ||
oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] {
invalidPredicates.Insert(predicates.CheckNodeConditionPred)
}
}
if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable {
invalidPredicates.Insert(predicates.CheckNodeConditionPred)
}
c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates)
}
}
因此,node update时,会删除该node对应的Equivalence Cache中如下PredicateKey的PredicateCache:
当发生node delete event时,对应会调用InvalidateAllCachedPredicateItemOfNode更新Equivalence Cache。
// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid
func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) {
ec.Lock()
defer ec.Unlock()
delete(ec.algorithmCache, nodeName)
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
}
因此,node delete时,则会从Equivalence Cache中删除整个node对应的algorthmCache。
当发生pv add或者delete event时,对应会调用invalidatePredicatesForPv更新Equivalence Cache。
func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
// You could have a PVC that points to a PV, but the PV object doesn't exist.
// So when the PV object gets added, we can recount.
invalidPredicates := sets.NewString()
// PV types which impact MaxPDVolumeCountPredicate
if pv.Spec.AWSElasticBlockStore != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
}
if pv.Spec.GCEPersistentDisk != nil {
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
}
if pv.Spec.AzureDisk != nil {
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
}
// If PV contains zone related label, it may impact cached NoVolumeZoneConflict
for k := range pv.Labels {
if isZoneRegionLabel(k) {
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
break
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
因此,当add或者delete PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:
当发生pv update event时,对应会调用invalidatePredicatesForPvUpdate更新Equivalence Cache。
func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
invalidPredicates := sets.NewString()
for k, v := range newPV.Labels {
// If PV update modifies the zone/region labels.
if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) {
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
break
}
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
因此,当update PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
// We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod
// The bound volume type may change
invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...)
// The bound volume's label may change
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
当发生pvc add或者delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
invalidPredicates := sets.NewString()
if old.Spec.VolumeName != new.Spec.VolumeName {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// PVC volume binding has changed
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
// The bound volume type may change
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
当发生pvc update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:
func (c *configFactory) onServiceAdd(obj interface{}) {
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onServiceDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
}
c.podQueue.MoveAllToActiveQueue()
}
当发生Service Add或Delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
if c.enableEquivalenceClassCache {
// TODO(resouer) We may need to invalidate this for specified group of pods only
oldService := oldObj.(*v1.Service)
newService := newObj.(*v1.Service)
if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
}
}
c.podQueue.MoveAllToActiveQueue()
}
当发生Service Update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:
Equivalence Class是用来给Kubernetes Scheduler加速Predicate,从而提升Scheduler的吞吐性能。当然,普通用户其实无需关注Equivalence Class Feature,因为目前的scheduler性能对大部分用户来说已经足够了,但对于有大规模AI训练场景的用户,可以多关注它。