深度解析Kubernetes Local Persistent Volume(二)

Author: xidianwangtao@gmail.com

摘要:上一篇博客”深度解析Kubernetes Local Persistent Volume(一)“对local volume的基本原理和注意事项进行了分析,本文将进行源码分析,涉及scheduler、pv controller相关的代码,希望能剖析local volume的delay scheduleing、pv node affinity的内部机制。

VolumeBinder in Scheduler

VolumeBinder是Kubernetes default scheduler中的一个模块。

pkg/scheduler/volumebinder/volume_binder.go:33

// VolumeBinder sets up the volume binding library and manages
// the volume binding operations with a queue.
type VolumeBinder struct {
	Binder    persistentvolume.SchedulerVolumeBinder
	BindQueue *workqueue.Type
}
  • 它维护着一个FIFO类型的BindQueue,BindQueue中存放着待Volume Bind的Pods;
  • Binder(persistentvolume.SchedulerVolumeBinder)是PV Controller内的功能子模块,用于提供给scheduler在调度时处理PV/PVC Binding和Dynamic Provisioning。

SchedulerVolumeBinder

SchedulerVolumeBinder用于调度时Volume Bind的考虑,以保证调度后的Node也满足Pod所需的PV NodeAffinity需求,而不仅是Resource Request等其他Predicate Policies得到满足。它实际上是基于StorageClass的VolumeBindingMode为WaitForFirstConsumer来决定要延迟Bind PV,然后schduler predicate时等待并确保Pod的all PVCs均成功Bind到满足条件的PVs时,才会最终触发Bind API完成Pod和Node的Bind。

pkg/controller/volume/persistentvolume/scheduler_binder.go:58

// SchedulerVolumeBinder is used by the scheduler to handle PVC/PV binding
// and dynamic provisioning.  The binding decisions are integrated into the pod scheduling
// workflow so that the PV NodeAffinity is also considered along with the pod's other
// scheduling requirements.
//
// This integrates into the existing default scheduler workflow as follows:
// 1. The scheduler takes a Pod off the scheduler queue and processes it serially:
//    a. Invokes all predicate functions, parallelized across nodes.  FindPodVolumes() is invoked here.
//    b. Invokes all priority functions.  Future/TBD
//    c. Selects the best node for the Pod.
//    d. Cache the node selection for the Pod. (Assume phase)
//       i.  If PVC binding is required, cache in-memory only:
//           * Updated PV objects for prebinding to the corresponding PVCs.
//           * For the pod, which PVs need API updates.
//           AssumePodVolumes() is invoked here.  Then BindPodVolumes() is called asynchronously by the
//           scheduler.  After BindPodVolumes() is complete, the Pod is added back to the scheduler queue
//           to be processed again until all PVCs are bound.
//       ii. If PVC binding is not required, cache the Pod->Node binding in the scheduler's pod cache,
//           and asynchronously bind the Pod to the Node.  This is handled in the scheduler and not here.
// 2. Once the assume operation is done, the scheduler processes the next Pod in the scheduler queue
//    while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface {
	// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node.
	//
	// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
	// Otherwise, it tries to find an available PV to bind to the PVC.
	//
	// It returns true if all of the Pod's PVCs have matching PVs or can be dynamic provisioned,
	// and returns true if bound volumes satisfy the PV NodeAffinity.
	//
	// This function is called by the volume binding scheduler predicate and can be called in parallel
	FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)

	// AssumePodVolumes will:
	// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
	// that the PV is prebound to the PVC.
	// 2. Take the PVCs that need provisioning and update the PVC cache with related
	// annotations set.
	//
	// It returns true if all volumes are fully bound, and returns true if any volume binding/provisioning
	// API operation needs to be done afterwards.
	//
	// This function will modify assumedPod with the node name.
	// This function is called serially.
	AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error)

	// BindPodVolumes will:
	// 1. Initiate the volume binding by making the API call to prebind the PV
	// to its matching PVC.
	// 2. Trigger the volume provisioning by making the API call to set related
	// annotations on the PVC
	//
	// This function can be called in parallel.
	BindPodVolumes(assumedPod *v1.Pod) error

	// GetBindingsCache returns the cache used (if any) to store volume binding decisions.
	GetBindingsCache() PodBindingCache
}

SchedulerVolumeBinder Interface包含如下三个方法:

  • FindPodVolumes:该方法被scheduler进行VolumeBindingChecker predicate policy执行时候调用,用于检查Pod的PVCs是否都能被该Node满足。如果PVC已经Bound成功,会检查是否对应的PV的NodeAffinity是否与Node能匹配上。如果PVC还没有Bound被Bound,将试图从PV cache中查找是否有合适的PV能与该PVC进行Bound。返回值unboundVolumesSatisified,boundVolumesSatisfied分别表示: - unboundVolumesSatisified:bool,true表示Pod的所有PVCs都已经成功Bound,或者可以Dynamic Provisioned(local volume目前只支持static provisioned),否则返回false。 - boundVolumesSatisfied:bool,true表示已经Bound的Volumes能满足PV的NodeAffinity。
  • AssumePodVolumes:当scheduler完成predicate和priority调度逻辑后,接着会执行该方法。为Pod中那些还没被Bound的PVCs寻找合适的PVs,并更新PV cache,完成PVs和PVCs的prebound操作(对于需要Dynamic Provisioning的PVC加上Annotation:"pv.kubernetes.io/bound-by-controller")。如果是需要Dynamic Provisioning的PVCs,那么更新PVC cache中这些PVCs的相关Annotations:"volume.alpha.kubernetes.io/selected-node=$nodeName",也相当于prebound操作。返回值allFullyBound,bindingRequired分别表示: - allFullyBound:bool,true表示Pod对应的所有PVCs都已经完成Bound,否则返回false。 - bindingRequired:bool,true表示还有volume binding/provisioning的API操作还需要进行,否则返回false。
  • BindPodVolumes:根据podBindingCache中信息,调用API完成PV,PVC的PreBind,然后PV Controller watch到这一事件再去完成真正的Bound操作。
  • GetBindingsCache:返回PodBindingCache内容。

Scheduler中VolumeBinder的初始化由volumebinder.NewVolumeBinder完成。

pkg/scheduler/volumebinder/volume_binder.go:39

// NewVolumeBinder sets up the volume binding library and binding queue
func NewVolumeBinder(
	client clientset.Interface,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder {

	return &VolumeBinder{
		Binder:    persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer),
		BindQueue: workqueue.NewNamed("podsToBind"),
	}
}

scheduler volumebinder.NewVolumeBinder负责:

  • 调用persistentvolume.NewVolumeBinder完成Binder对象的初始化,需要pvInformer, pvcInformer,storageClassInformer。
  • 创建podsToBind BindQueue,用于存放待Bind的Pods FIFIO队列。

在Scheduler NewConfigFactory中调用volumebinder.NewVolumeBinder完成其初始化,其中很重要的部分是完成pvcInformer, pvInformer, storageClassInformer的初始化,然后传递给persistentvolume.NewVolumeBinder完成Binder的创建。

pkg/scheduler/factory/factory.go:145

// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(
	schedulerName string,
	client clientset.Interface,
	nodeInformer coreinformers.NodeInformer,
	podInformer coreinformers.PodInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	replicationControllerInformer coreinformers.ReplicationControllerInformer,
	replicaSetInformer extensionsinformers.ReplicaSetInformer,
	statefulSetInformer appsinformers.StatefulSetInformer,
	serviceInformer coreinformers.ServiceInformer,
	pdbInformer policyinformers.PodDisruptionBudgetInformer,
	storageClassInformer storageinformers.StorageClassInformer,
	hardPodAffinitySymmetricWeight int32,
	enableEquivalenceClassCache bool,
	disablePreemption bool,
) scheduler.Configurator {
	stopEverything := make(chan struct{})
	schedulerCache := schedulercache.New(30*time.Second, stopEverything)

	// storageClassInformer is only enabled through VolumeScheduling feature gate
	var storageClassLister storagelisters.StorageClassLister
	if storageClassInformer != nil {
		storageClassLister = storageClassInformer.Lister()
	}

	...
	
	// On add and delete of PVs, it will affect equivalence cache items
	// related to persistent volume
	pvInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
			AddFunc:    c.onPvAdd,
			UpdateFunc: c.onPvUpdate,
			DeleteFunc: c.onPvDelete,
		},
	)
	c.pVLister = pvInformer.Lister()

	// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
	pvcInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			AddFunc:    c.onPvcAdd,
			UpdateFunc: c.onPvcUpdate,
			DeleteFunc: c.onPvcDelete,
		},
	)
	c.pVCLister = pvcInformer.Lister()

	...

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Setup volume binder
		c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)

		storageClassInformer.Informer().AddEventHandler(
			cache.ResourceEventHandlerFuncs{
				AddFunc:    c.onStorageClassAdd,
				DeleteFunc: c.onStorageClassDelete,
			},
		)
	}

	...

	return c
}

scheduler volumebinder.NewVolumeBinder的调用前提是Enable VolumeScheduling Feature Gate。

volumeBinder in PV Controller

前面提到,scheduler volumebinder.NewVolumeBinder在初始化Binder时是通过persistentvolume.NewVolumeBinder完成的,因此这里我们将对persistentvolume.volumeBinder进行分析。

PV Contorller中的volumeBinder就是前面提到的SchedulerVolumeBinder Interface的实现,实现了其中的FindPodVolumes、AssumePodVolumes、BindPodVolumes、GetBindingsCache这些接口。

pkg/controller/volume/persistentvolume/scheduler_binder.go:96

type volumeBinder struct {
	ctrl *PersistentVolumeController

	pvcCache PVCAssumeCache
	pvCache  PVAssumeCache

	// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
	// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
	podBindingCache PodBindingCache
}

pkg/controller/volume/persistentvolume/scheduler_binder.go:108
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
func NewVolumeBinder(
	kubeClient clientset.Interface,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	storageClassInformer storageinformers.StorageClassInformer) SchedulerVolumeBinder {

	// TODO: find better way...
	ctrl := &PersistentVolumeController{
		kubeClient:  kubeClient,
		classLister: storageClassInformer.Lister(),
	}

	b := &volumeBinder{
		ctrl:            ctrl,
		pvcCache:        NewPVCAssumeCache(pvcInformer.Informer()),
		pvCache:         NewPVAssumeCache(pvInformer.Informer()),
		podBindingCache: NewPodBindingCache(),
	}

	return b
}

volumeBinder struct主要包含pvController实例、pvCache、pvcCache、podBindingCache。

podBindingCache结构体是我们需要关注的:

pkg/controller/volume/persistentvolume/scheduler_binder_cache.go:48

type podBindingCache struct {
	mutex sync.Mutex

	// Key = pod name
	// Value = nodeDecisions
	bindingDecisions map[string]nodeDecisions
}

// Key = nodeName
// Value = bindings & provisioned PVCs of the node
type nodeDecisions map[string]nodeDecision

// A decision includes bindingInfo and provisioned PVCs of the node
type nodeDecision struct {
	bindings      []*bindingInfo
	provisionings []*v1.PersistentVolumeClaim
}

type bindingInfo struct {
	// Claim that needs to be bound
	pvc *v1.PersistentVolumeClaim

	// Proposed PV to bind to this claim
	pv *v1.PersistentVolume
}

VolumeBindingChecker Predicate

在Scheduler NewConfigFactory时完成VolumeBinder的创建,然后CheckVolumeBinding Predicate Policy注册到default scheduler。注意默认的所有predicate policies的执行是有先后顺序的:

predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
		GeneralPred, HostNamePred, PodFitsHostPortsPred,
		MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
		PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
		CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
		MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
		CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}

VolumeBindingChecker.predicate就是对应的predicate实现。

pkg/scheduler/algorithm/predicates/predicates.go:1680

func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
	if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		return true, nil, nil
	}

	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}

	unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node)
	if err != nil {
		return false, nil, err
	}

	failReasons := []algorithm.PredicateFailureReason{}
	if !boundSatisfied {
		glog.V(5).Infof("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
		failReasons = append(failReasons, ErrVolumeNodeConflict)
	}

	if !unboundSatisfied {
		glog.V(5).Infof("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
		failReasons = append(failReasons, ErrVolumeBindConflict)
	}

	if len(failReasons) > 0 {
		return false, failReasons, nil
	}

	// All volumes bound or matching PVs found for all unbound PVCs
	glog.V(5).Infof("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
	return true, nil, nil
}
  • 需要确认VolumeScheduling Feature Gate Enabled。
  • 调用volumeBinder.FindPodVolumes检查Pod的PVCs是否都能被该Node满足。

调度时VolumeBindingChecker失败会怎么样

如果VolumeBindingChecker.predicate失败,会怎么样?熟悉scheduler逻辑的同学应该知道,调度失败,会触发MakeDefaultErrorFunc。

func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) {
	return func(pod *v1.Pod, err error) {
		
		...

		backoff.Gc()
		// Retry asynchronously.
		// Note that this is extremely rudimentary and we need a more real error handling path.
		go func() {
			defer runtime.HandleCrash()
			podID := types.NamespacedName{
				Namespace: pod.Namespace,
				Name:      pod.Name,
			}
			origPod := pod

			// When pod priority is enabled, we would like to place an unschedulable
			// pod in the unschedulable queue. This ensures that if the pod is nominated
			// to run on a node, scheduler takes the pod into account when running
			// predicates for the node.
			if !util.PodPriorityEnabled() {
				entry := backoff.GetEntry(podID)
				if !entry.TryWait(backoff.MaxDuration()) {
					glog.Warningf("Request for pod %v already in flight, abandoning", podID)
					return
				}
			}
			// Get the pod again; it may have changed/been scheduled already.
			getBackoff := initialGetBackoff
			for {
				pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
				if err == nil {
					if len(pod.Spec.NodeName) == 0 {
						podQueue.AddUnschedulableIfNotPresent(pod)
					} else {
						if c.volumeBinder != nil {
							// Volume binder only wants to keep unassigned pods
							c.volumeBinder.DeletePodBindings(pod)
						}
					}
					break
				}
				if errors.IsNotFound(err) {
					glog.Warningf("A pod %v no longer exists", podID)

					if c.volumeBinder != nil {
						// Volume binder only wants to keep unassigned pods
						c.volumeBinder.DeletePodBindings(origPod)
					}
					return
			
				glog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
				if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
					getBackoff = maximalGetBackoff
				}
				time.Sleep(getBackoff)
			}
		}()
	}
}

MakeDefaultErrorFunc会对调度失败的Pod进行异步重试:

  • 如果pod.Spec.NodeName不为空,并且volumeBinder不为空(意味着Enable VolumeScheduling Feature Gate),则调用podBindingCache.DeleteBindings将该pod对应的bindingDecisions从podBindingCache中删除,因为volumeBinder仅处理unassigned pods。
  • 如果该Pod已经被API删除了,并且volumeBinder不为空(意味着Enable VolumeScheduling Feature Gate),同样的调用podBindingCache.DeleteBindings将该pod对应的bindingDecisions从podBindingCache中删除,因为volumeBinder仅处理unassigned pods。

调度时VolumeBindingChecker成功会如何?

NewConfigFactory中注册了从unscheduled pod queue中删除pod(意味着调度成功)的Event handler:deletePodFromSchedulingQueue。

pkg/scheduler/factory/factory.go:745

func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
	var pod *v1.Pod
	...
	if err := c.podQueue.Delete(pod); err != nil {
		runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
	}
	if c.volumeBinder != nil {
		// Volume binder only wants to keep unassigned pods
		c.volumeBinder.DeletePodBindings(pod)
	}
}

deletePodFromSchedulingQueue的处理逻辑,除了将pod从podQueue中删除外,如果volumeBinder不为空(意味着Enable VolumeScheduling Feature Gate),还需要同MakeDefaultErrorFunc一样,调用podBindingCache.DeleteBindings将该pod对应的bindingDecisions从podBindingCache中删除,因为volumeBinder仅处理unassigned pods。

volumeBinder

接下来,我们看看volumeBinder的各个接口的实现,及何时被调用。

FindPodVolumes

前面分析VolumeBindingChecker Predicate的时看到,其中调用了volumeBinder.FindPodVolumes。

FindPodVolumes用于检查Pod的PVCs是否都能被该Node满足。

  • 如果PVC已经Bound成功,会检查是否对应的PV的NodeAffinity是否与Node能匹配上。
  • 如果PVC还没有Bound被Bound,将试图从PV cache中查找是否有合适的PV能与该PVC进行Bound。返回值unboundVolumesSatisified,boundVolumesSatisfied分别表示: - unboundVolumesSatisified:bool,true表示Pod的所有PVCs都已经成功Bound,或者可以Dynamic Provisioned(local volume目前只支持static provisioned),否则返回false。 - boundVolumesSatisfied:bool,true表示已经Bound的Volumes能满足PV的NodeAffinity。
pkg/controller/volume/persistentvolume/scheduler_binder.go:135

// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
	podName := getPodName(pod)

	// Warning: Below log needs high verbosity as it can be printed several times (#60933).
	glog.V(5).Infof("FindPodVolumes for pod %q, node %q", podName, node.Name)

	// Initialize to true for pods that don't have volumes
	unboundVolumesSatisfied = true
	boundVolumesSatisfied = true

	// The pod's volumes need to be processed in one call to avoid the race condition where
	// volumes can get bound/provisioned in between calls.
	boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
	if err != nil {
		return false, false, err
	}

	// Immediate claims should be bound
	if len(unboundClaimsImmediate) > 0 {
		return false, false, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
	}

	// Check PV node affinity on bound volumes
	if len(boundClaims) > 0 {
		boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName)
		if err != nil {
			return false, false, err
		}
	}

	if len(claimsToBind) > 0 {
		var claimsToProvision []*v1.PersistentVolumeClaim
		unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node)
		if err != nil {
			return false, false, err
		}

		if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
			// Try to provision for unbound volumes
			if !unboundVolumesSatisfied {
				unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
				if err != nil {
					return false, false, err
				}
			}
		}
	}

	return unboundVolumesSatisfied, boundVolumesSatisfied, nil
}

FindPodVolumes中调用了三个重要的方法:

  • getPodVolumes:将PVCs分成boundClaims、unboundClaims、unboundClaimsImmediate。
  • checkBoundClaims:如果boundClaims不为空,则checkBoundClaims Bound的PV的NodeAffinity是否与Node Labels匹配,如果匹配成功,则boundVolumesSatisfied为true。
  • findMatchingVolumes:如果claimsToBind不为空,则调用findMatchingVolumes从pvcache中选择匹配条件的size smallestPV,如果没有匹配成功的,则调用checkVolumeProvisions检查是否dynamic provision。

下面我们重点看getPodVolumes、findMatchingVolumes和checkVolumeProvisions。

getPodVolumes

pkg/controller/volume/persistentvolume/scheduler_binder.go:359

// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding,
// and unbound with immediate binding
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
	boundClaims = []*v1.PersistentVolumeClaim{}
	unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
	unboundClaims = []*bindingInfo{}

	for _, vol := range pod.Spec.Volumes {
		volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol, false)
		if err != nil {
			return nil, nil, nil, err
		}
		if pvc == nil {
			continue
		}
		if volumeBound {
			boundClaims = append(boundClaims, pvc)
		} else {
			delayBinding, err := b.ctrl.shouldDelayBinding(pvc)
			if err != nil {
				return nil, nil, nil, err
			}
			if delayBinding {
				// Scheduler path
				unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc})
			} else {
				// Immediate binding should have already been bound
				unboundClaimsImmediate = append(unboundClaimsImmediate, pvc)
			}
		}
	}
	return boundClaims, unboundClaims, unboundClaimsImmediate, nil
}

getPodVolumes将pod的PVCs分成三类:

  • boundClaims:已经Bound的PVCs,包括prebound;
  • unboundClaims:需要delay binding的unbound PVCs;
  • unboundClaimsImmediate:需要immediate binding的unbound PVCs;

那么什么样的PVCs是delay binding的呢?我们看看shouldDelayBinding的逻辑:

func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
	if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		return false, nil
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
		// When feature DynamicProvisioningScheduling enabled,
		// Scheduler signal to the PV controller to start dynamic
		// provisioning by setting the "annSelectedNode" annotation
		// in the PVC
		if _, ok := claim.Annotations[annSelectedNode]; ok {
			return false, nil
		}
	}

	className := v1helper.GetPersistentVolumeClaimClass(claim)
	if className == "" {
		return false, nil
	}

	class, err := ctrl.classLister.Get(className)
	if err != nil {
		return false, nil
	}

	if class.VolumeBindingMode == nil {
		return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
	}

	return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
  • 如果VolumeScheduling Feature Gate Disenable,则PVC都不属于deley binding。
  • 如果DynamicProvisioningScheduling Feature Gate Enable,则检查PVC的Annotation是否包含"volume.alpha.kubernetes.io/selected-node",如果包含该Annotation,则该PVC不属于delay binding。
  • 如果PVC对应的storageClass为空或者该storageClass不存在,则该PVC不属于delay binding。
  • 如果PVC对应的storageClass存在,且storageClass的VolumeBindingMode为空,则该PVC不属于delay binding。
  • 只有当PVC对应的storageClass存在,且storageClass的VolumeBindingMode为WaitForFirstConsumer,该PVC才属于delay binding。

findMatchingVolumes

如果getPodVolumes返回的claimsToBind不为空,则调用findMatchingVolumes从pvcache中选择匹配条件的size smallestPV,如果没有匹配成功的,则调用checkVolumeProvisions检查是否dynamic provision。

pkg/controller/volume/persistentvolume/scheduler_binder.go:413

// findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) {
	podName := getPodName(pod)
	// Sort all the claims by increasing size request to get the smallest fits
	sort.Sort(byPVCSize(claimsToBind))

	chosenPVs := map[string]*v1.PersistentVolume{}

	foundMatches = true
	matchedClaims := []*bindingInfo{}

	for _, bindingInfo := range claimsToBind {
		// Get storage class name from each PVC
		storageClassName := ""
		storageClass := bindingInfo.pvc.Spec.StorageClassName
		if storageClass != nil {
			storageClassName = *storageClass
		}
		allPVs := b.pvCache.ListPVs(storageClassName)

		// Find a matching PV
		bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
		if err != nil {
			return false, nil, err
		}
		if bindingInfo.pv == nil {
			glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name)
			unboundClaims = append(unboundClaims, bindingInfo.pvc)
			foundMatches = false
			continue
		}

		// matching PV needs to be excluded so we don't select it again
		chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
		matchedClaims = append(matchedClaims, bindingInfo)
		glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName)
	}

	// Mark cache with all the matches for each PVC for this node
	if len(matchedClaims) > 0 {
		b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
	}

	if foundMatches {
		glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name)
	}

	return
}

checkVolumeProvisions

pkg/controller/volume/persistentvolume/scheduler_binder.go:465

// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) {
	podName := getPodName(pod)
	provisionedClaims := []*v1.PersistentVolumeClaim{}

	for _, claim := range claimsToProvision {
		className := v1helper.GetPersistentVolumeClaimClass(claim)
		if className == "" {
			return false, fmt.Errorf("no class for claim %q", getPVCName(claim))
		}

		class, err := b.ctrl.classLister.Get(className)
		if err != nil {
			return false, fmt.Errorf("failed to find storage class %q", className)
		}
		provisioner := class.Provisioner
		if provisioner == "" || provisioner == notSupportedProvisioner {
			glog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, getPVCName(claim))
			return false, nil
		}

		// Check if the node can satisfy the topology requirement in the class
		if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
			glog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, getPVCName(claim))
			return false, nil
		}

		// TODO: Check if capacity of the node domain in the storage class
		// can satisfy resource requirement of given claim

		provisionedClaims = append(provisionedClaims, claim)

	}
	glog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name)

	// Mark cache with all the PVCs that need provisioning for this node
	b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)

	return true, nil
}
  • checkVolumeProvisions主要检查对应的PVC的storageClass的TopologySelectorTerm与Node Labels是否能匹配成功。
  • 如果匹配成功,则调用UpdateProvisionedPVCs更新podBindingCache的bindingDecisions。

AssumePodVolumes

volumeBinder的AssumePodVolumes啥时候被调用呢?我们看看scheduleOne的相关代码:

scheduleOne invoke assumeAndBindVolumes

pkg/scheduler/scheduler.go:439

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
	pod := sched.config.NextPod()
	
	...
	
	suggestedHost, err := sched.schedule(pod)
	
	...
	
	// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
	// This allows us to keep scheduling without waiting on binding to occur.
	assumedPod := pod.DeepCopy()

	// Assume volumes first before assuming the pod.
	//
	// If no volumes need binding, then nil is returned, and continue to assume the pod.
	//
	// Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes.
	// scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod.
	//
	// After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for
	// subsequent passes until all volumes are fully bound.
	//
	// This function modifies 'assumedPod' if volume binding is required.
	err = sched.assumeAndBindVolumes(assumedPod, suggestedHost)
	if err != nil {
		return
	}

	// assume modifies `assumedPod` by setting NodeName=suggestedHost
	err = sched.assume(assumedPod, suggestedHost)
	...
	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
	go func() {
		err := sched.bind(assumedPod, &v1.Binding{
			...
		}
	}()
}

在sched.schedule(pod)完成pod的predicate,priority后,先调用sched.assumeAndBindVolumes,然后再调用sched.assume进行pod assume,最后调用sched.bind进行Bind操作。

assumeAndBindVolumes add assume pod to BindQueue

pkg/scheduler/scheduler.go:268

// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required.
//
// If volume binding is required, then the bind volumes routine will update the pod to send it back through
// the scheduler.
//
// Otherwise, return nil error and continue to assume the pod.
//
// This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error {
	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
		if err != nil {
			sched.config.Error(assumed, err)
			sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
			sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
				Type:    v1.PodScheduled,
				Status:  v1.ConditionFalse,
				Reason:  "SchedulerError",
				Message: err.Error(),
			})
			return err
		}
		if !allBound {
			err = fmt.Errorf("Volume binding started, waiting for completion")
			if bindingRequired {
				if sched.config.Ecache != nil {
					invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
					sched.config.Ecache.InvalidatePredicates(invalidPredicates)
				}

				// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue
				sched.config.VolumeBinder.BindQueue.Add(assumed)
			} else {
				// We are just waiting for PV controller to finish binding, put it back in the
				// scheduler queue
				sched.config.Error(assumed, err)
				sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err)
				sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
					Type:   v1.PodScheduled,
					Status: v1.ConditionFalse,
					Reason: "VolumeBindingWaiting",
				})
			}
			return err
		}
	}
	return nil
}

assumeAndBindVolumes调用volumeBinder.AssumePodVolumes。

pkg/controller/volume/persistentvolume/scheduler_binder.go:191

// AssumePodVolumes will take the cached matching PVs and PVCs to provision
// in podBindingCache for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
// 2. Update the pvcCache with the new PVCs with annotations set
// It will update podBindingCache again with the PVs and PVCs that need an API update.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) {
	podName := getPodName(assumedPod)

	glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName)

	if allBound := b.arePodVolumesBound(assumedPod); allBound {
		glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: all PVCs bound and nothing to do", podName, nodeName)
		return true, false, nil
	}

	assumedPod.Spec.NodeName = nodeName
	// Assume PV
	claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
	newBindings := []*bindingInfo{}

	for _, binding := range claimsToBind {
		newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc)
		glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for pod %q, PV %q, PVC %q.  newPV %p, dirty %v, err: %v",
			podName,
			binding.pv.Name,
			binding.pvc.Name,
			newPV,
			dirty,
			err)
		if err != nil {
			b.revertAssumedPVs(newBindings)
			return false, true, err
		}
		if dirty {
			err = b.pvCache.Assume(newPV)
			if err != nil {
				b.revertAssumedPVs(newBindings)
				return false, true, err
			}

			newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc})
		}
	}

	// Don't update cached bindings if no API updates are needed.  This can happen if we
	// previously updated the PV object and are waiting for the PV controller to finish binding.
	if len(newBindings) != 0 {
		bindingRequired = true
		b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
	}

	// Assume PVCs
	claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName)

	newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
	for _, claim := range claimsToProvision {
		// The claims from method args can be pointing to watcher cache. We must not
		// modify these, therefore create a copy.
		claimClone := claim.DeepCopy()
		metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName)
		err = b.pvcCache.Assume(claimClone)
		if err != nil {
			b.revertAssumedPVs(newBindings)
			b.revertAssumedPVCs(newProvisionedPVCs)
			return
		}

		newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
	}

	if len(newProvisionedPVCs) != 0 {
		bindingRequired = true
		b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs)
	}

	return
}

volumeBinder.AssumePodVolumes主要逻辑:

  • 为Pod中那些还没被Bound的PVCs寻找合适的PVs,并更新PV cache,完成PVs和PVCs的prebound操作(对于需要Dynamic Provisioning的PVC加上Annotation:"pv.kubernetes.io/bound-by-controller")。
  • 如果是需要Dynamic Provisioning的PVCs,那么更新PVC cache中这些PVCs的相关Annotations:"volume.alpha.kubernetes.io/selected-node=$nodeName",也相当于prebound操作。返回值allFullyBound,bindingRequired分别表示: - allFullyBound:bool,true表示Pod对应的所有PVCs都已经完成Bound,否则返回false。 - bindingRequired:bool,true表示还有volume binding/provisioning的API操作还需要进行,否则返回false。
  • 如果allFullyBound为false,并且bindingRequired为true,则将pod加入到volumeBinder的BindQueue。

BindQueue中的Pods由bindVolumesWorker进行逐个处理,其中会调用volumeBinder.BindPodVolumes完成volume binding operation,下面我们看看bindVolumesWorker干了啥。

bindVolumesWorker

bindVolumesWorker负责循环处理volumeBinder中的BindQueue内的Pods,完成volume bind。我们得先知道bindVolumesWorker在哪里启动的。

pkg/scheduler/scheduler.go:174

// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
	if !sched.config.WaitForCacheSync() {
		return
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
	}

	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

在default scheduler启动时,如果VolumeScheduling Feature Gate Enable,则会启动bindVolumesWorker goroutine。

pkg/scheduler/scheduler.go:312

// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to
// make the API update for volume binding.
// This function runs forever until the volume BindQueue is closed.
func (sched *Scheduler) bindVolumesWorker() {
	workFunc := func() bool {
		keyObj, quit := sched.config.VolumeBinder.BindQueue.Get()
		if quit {
			return true
		}
		defer sched.config.VolumeBinder.BindQueue.Done(keyObj)

		assumed, ok := keyObj.(*v1.Pod)
		if !ok {
			glog.V(4).Infof("Object is not a *v1.Pod")
			return false
		}

		// TODO: add metrics
		var reason string
		var eventType string

		glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)

		// The Pod is always sent back to the scheduler afterwards.
		err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
		if err != nil {
			glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
			reason = "VolumeBindingFailed"
			eventType = v1.EventTypeWarning
		} else {
			glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
			reason = "VolumeBindingWaiting"
			eventType = v1.EventTypeNormal
			err = fmt.Errorf("Volume binding started, waiting for completion")
		}

		// Always fail scheduling regardless of binding success.
		// The Pod needs to be sent back through the scheduler to:
		// * Retry volume binding if it fails.
		// * Retry volume binding if dynamic provisioning fails.
		// * Bind the Pod to the Node once all volumes are bound.
		sched.config.Error(assumed, err)
		sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
		sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
			Type:   v1.PodScheduled,
			Status: v1.ConditionFalse,
			Reason: reason,
		})
		return false
	}

	for {
		if quit := workFunc(); quit {
			glog.V(4).Infof("bindVolumesWorker shutting down")
			break
		}
	}
}

bindVolumesWorker会调用volumeBinder.BindPodVolumes进行podBindingCache中的volume binding operation。

BindPodVolumes

pkg/controller/volume/persistentvolume/scheduler_binder.go:266

// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache
// and makes the API update for those PVs/PVCs.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
	podName := getPodName(assumedPod)
	glog.V(4).Infof("BindPodVolumes for pod %q", podName)

	bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
	claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)

	// Do the actual prebinding. Let the PV controller take care of the rest
	// There is no API rollback if the actual binding fails
	for i, bindingInfo := range bindings {
		glog.V(5).Infof("BindPodVolumes: Pod %q, binding PV %q to PVC %q", podName, bindingInfo.pv.Name, bindingInfo.pvc.Name)
		_, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false)
		if err != nil {
			// only revert assumed cached updates for volumes we haven't successfully bound
			b.revertAssumedPVs(bindings[i:])
			// Revert all of the assumed cached updates for claims,
			// since no actual API update will be done
			b.revertAssumedPVCs(claimsToProvision)
			return err
		}
	}

	// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
	// PV controller is expect to signal back by removing related annotations if actual provisioning fails
	for i, claim := range claimsToProvision {
		if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
			glog.V(4).Infof("updating PersistentVolumeClaim[%s] failed: %v", getPVCName(claim), err)
			// only revert assumed cached updates for claims we haven't successfully updated
			b.revertAssumedPVCs(claimsToProvision[i:])
			return err
		}
	}

	return nil
}
  • 根据podBindingCache中保存的信息,调用API完成PVC和PV的Binding,也就是preBound。PV Controller watch到这一事件后去执行真正的Volume Bound操作。
  • 根据podBindingCache中保存的信息,调用API完成PVCs的claimsToProvision的更新,PV Controller watch到这一事件后会执行Dynamic Volume Provisioning。

关键流程

总结

本文对VolumeBinder in Scheduler、volumeBinder in PV Controller进行了源码分析,了解了local volume scheduling是如何实现delay Scheduling、pv node affinity的。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏逸鹏说道

我这么玩Web Api(一)

帮助页面或用户手册(Microsoft and Swashbuckle Help Page) 前言   你需要为客户编写Api调用手册?你需要测试你的Api接口...

3065
来自专栏纯洁的微笑

springboot(五):spring data jpa的使用

在上篇文章springboot(二):web综合开发中简单介绍了一下spring data jpa的基础性使用,这篇文章将更加全面的介绍spring data ...

6789
来自专栏Java成神之路

分布式_事务_02_2PC框架raincat源码解析

上一节已经将raincat demo工程运行起来了,这一节来分析下raincat的源码

1901
来自专栏JavaEdge

Redis实践(八)-Sentinal12 主从复制高可用?3 Redis Sentinel 架构4 安装与配置5 安装与演示6 客户端11 三个定时任务12 主观下线和客观下线13 领导者选举14

由于Redis Sentinel只会对主节点进行故障转移,对从节点采取主观的下线,所以需要自定义一个客户端来监控对应的事件

1801
来自专栏JavaEE

Java调用微信登录以及eclipse 远程调试前言:一、微信测试号的连接与申请:二、eclipse远程调试:总结:

3175
来自专栏后端之路

Poi导出产生OOM解决方案

背景 目前生产系统大量的数据的导出,情况不是特别理想,将常出现OOM 如果不限制程序最大内存可能导致其他应用收到影响 比如稽计任务 思路 考虑出现OOM是如下几...

56610
来自专栏JadePeng的技术博客

一起读源码之zookeeper(1) -- 启动分析

从本文开始,不定期分析一个开源项目源代码,起篇从大名鼎鼎的zookeeper开始。 为什么是zk,因为用到zk的场景实在太多了,大部分耳熟能详的分布式系统都有...

7336
来自专栏林冠宏的技术文章

关于Android中为什么主线程不会因为Looper.loop()里的死循环卡死?引发的思考,事实可能不是一个 epoll 那么 简单。

( 转载请务必标明出处:https://cloud.tencent.com/developer/user/1148436/activities) 前序 本文将...

3185
来自专栏小尘哥的专栏

小程序(3):授权登录

判断是否授权,如果没有,则显示授权按钮。注意上面的open-type="getUserInfo",这个会自动调起授权框。看一下js

1694
来自专栏老码农专栏

TodoBackend展示应用以及ActFramework的实现

1225

扫码关注云+社区