前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >K8S-Node自动扩容项目CA源码分析(下)

K8S-Node自动扩容项目CA源码分析(下)

原创
作者头像
kinnylee
发布2022-07-02 22:01:53
1.2K0
发布2022-07-02 22:01:53
举报
文章被收录于专栏:kinnylee钻研技术kinnylee钻研技术

三、主流程

关键代码逻辑,梳理成流程图,可以对照查看。高清图

CA源码分析.jpg
CA源码分析.jpg

3.1 main 启动入口

代码语言:go
复制
func main() {
  // 选取leader
 	leaderElection := defaultLeaderElectionConfiguration()
	leaderElection.LeaderElect = true

  go func() {
    // 注册指标、快照、监控检查接口
		pathRecorderMux := mux.NewPathRecorderMux("cluster-autoscaler")
		defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
		pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
			defaultMetricsHandler(w, req)
		})
		if *debuggingSnapshotEnabled {
			pathRecorderMux.HandleFunc("/snapshotz", debuggingSnapshotter.ResponseHandler)
		}
		pathRecorderMux.HandleFunc("/health-check", healthCheck.ServeHTTP)

		err := http.ListenAndServe(*address, pathRecorderMux)
	}()

	if !leaderElection.LeaderElect {
		run(healthCheck, debuggingSnapshotter)
	} else {
    ...
    // 入口函数
    run(healthCheck, debuggingSnapshotter)
    ...
  }
}

3.2 run

  • 初始化 autoscaler
  • 调用 autoscaler.Start,后台刷新缓存
  • 间隔执行(默认10s)autoscaler.RunOnce 方法,实现扩缩容逻辑
代码语言:go
复制
func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
	metrics.RegisterAll(*emitPerNodeGroupMetrics)
	// 构造 autoscaler 对象
	autoscaler, err := buildAutoscaler(debuggingSnapshotter)
	...
  // 在后台启动 autoscaler
	if err := autoscaler.Start(); err != nil {
		klog.Fatalf("Failed to autoscaler background components: %v", err)
	}

	// Autoscale ad infinitum.
	for {
		select {
    // 默认 10s 执行一次
		case <-time.After(*scanInterval):
			{
				...
        // 开始执行
				err := autoscaler.RunOnce(loopStart)
				...
			}
		}
	}
}

3.3 autoscaler

3.3.1 buildAutoscaler
代码语言:go
复制
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
	...
  // Create autoscaler.
	return core.NewAutoscaler(opts)
}
3.3.2 NewAutoscaler
代码语言:go
复制
func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) {
	// 这个方法主要是做一下初始化工作,其中 provider 的初始化在介绍 aws provider 创建流程时介绍过
  // 内部会初始化 awsManager、asgCache,并从云厂商同步 asg信息到本地缓存
  err := initializeDefaultOptions(&opts)
	if err != nil {
		return nil, errors.ToAutoscalerError(errors.InternalError, err)
	}
  // 实例化 AutoScaler
	return NewStaticAutoscaler(
		opts.AutoscalingOptions,
		opts.PredicateChecker,
		opts.ClusterSnapshot,
		opts.AutoscalingKubeClients,
		opts.Processors,
		opts.CloudProvider,
		opts.ExpanderStrategy,
		opts.EstimatorBuilder,
		opts.Backoff,
		opts.DebuggingSnapshotter), nil
}
initializeDefaultOptions

初始化的内容有:

  • Processor:各种前置处理器
  • PredicateChecker:扩容前的预调度检查
  • CloudProvider:前面介绍过,主要用于操作 IaaS 云资源
  • Estimator:评估扩容节点
  • Expander:从多个符合扩容条件的 NodeGroup 中选择最终 Node 的策略
代码语言:go
复制
func initializeDefaultOptions(opts *AutoscalerOptions) error {
	// 初始化 processor
  if opts.Processors == nil {
		opts.Processors = ca_processors.DefaultProcessors()
	}
	if opts.AutoscalingKubeClients == nil {
		opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient)
	}
  // 初始化前置校验
	if opts.PredicateChecker == nil {
		predicateCheckerStopChannel := make(chan struct{})
		predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(opts.KubeClient, predicateCheckerStopChannel)
		if err != nil {
			return err
		}
		opts.PredicateChecker = predicateChecker
	}
  // 初始化快照
	if opts.ClusterSnapshot == nil {
		opts.ClusterSnapshot = simulator.NewBasicClusterSnapshot()
	}
  // 初始化 provider
	if opts.CloudProvider == nil {
		opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
	}
  // 初始化 expander 策略
	if opts.ExpanderStrategy == nil {
		expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), opts.CloudProvider,
			opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace, opts.GRPCExpanderCert, opts.GRPCExpanderURL)
		if err != nil {
			return err
		}
		opts.ExpanderStrategy = expanderStrategy
	}
  // 初始化 Estimate 策略
	if opts.EstimatorBuilder == nil {
		estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration))
		if err != nil {
			return err
		}
		opts.EstimatorBuilder = estimatorBuilder
	}
  // 初始化 Backoff 策略
	if opts.Backoff == nil {
		opts.Backoff =
			backoff.NewIdBasedExponentialBackoff(opts.InitialNodeGroupBackoffDuration, opts.MaxNodeGroupBackoffDuration, opts.NodeGroupBackoffResetTimeout)
	}

	return nil
}
3.3.3 Autoscaler.Start

定时从 cloud provider 获取 node group 以及 node group 下的 instance 信息,并刷新本地缓存

代码语言:go
复制
func (a *StaticAutoscaler) Start() error {
	a.clusterStateRegistry.Start()
	return nil
}

func (csr *ClusterStateRegistry) Start() {
	csr.cloudProviderNodeInstancesCache.Start(csr.interrupt)
}

// 后台定时刷新数据
func (cache *CloudProviderNodeInstancesCache) Start(interrupt chan struct{}) {
	go wait.Until(func() {
		cache.Refresh()
	}, CloudProviderNodeInstancesCacheRefreshInterval, interrupt)
}
Refresh
代码语言:go
复制
// Refresh refreshes cache.
func (cache *CloudProviderNodeInstancesCache) Refresh() {
	// 从 cloud provider 获取 node group
  // 调用 cloud provider 的第一个扩展点
	nodeGroups := cache.cloudProvider.NodeGroups()
  // 移除不存在的 node group
	cache.removeEntriesForNonExistingNodeGroupsLocked(nodeGroups)
	for _, nodeGroup := range nodeGroups {
    // 调用 cloud provider 中 node group 接口扩展点
		nodeGroupInstances, err := nodeGroup.Nodes()
		// 更新缓存中的 node group
		cache.updateCacheEntryLocked(nodeGroup, &cloudProviderNodeInstancesCacheEntry{nodeGroupInstances, time.Now()})
	}
}
3.3.4 Autoscaler.RunOnce

关键逻辑:

  • 获取现有集群所有 Node,以及Node上运行的Pod信息
  • 经过几个 Processor 模块处理,将 Node 和 Pod 做分类规整到所属 asgCache 中的 各个asg 下,保存在 nodeInfosForGroups 中
  • 获取所有资源不足导致 pending 的 pod
  • 经过几个 Processor 模块处理,将未调度 pod 做处理,保存在 unschedulablePodsToHelp 中
  • 根据 unschedulablePodsToHelp 判断当前是否需要执行 ScaleUp 进行扩容
  • 根据是否配置了缩容参数ScaleDownEnabled,判断是否要进行缩容

核心逻辑伪代码

代码语言:go
复制
func (a *StaticAutoscaler) RunOnce() {
  // 获取节点信息
  allNodes, readyNodes := a.obtainNodeLists(a.CloudProvider)
  // 将 Node 信息按照 asg 的id做分类规整
  nodeInfosForGroups := a.processors.TemplateNodeInfoProvider.Process(...)
  // 获取未调度的 pod
  unschedulablePods, err := unschedulablePodLister.List()
  // pod按调度类型分类
  unschedulablePodsToHelp := a.processors.PodListProcessor.Process(unschedulablePods)
  
  if len(unschedulablePodsToHelp) == 0 {
		// 不需要扩容
	} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
		// 扩容达到上限
	} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
		// Node 扩容过程中,pod 新创建,等待一定冷却周期再尝试扩容
	} else {
    // 启动扩容
		ScaleUp()
  }
  
  // 如果开启缩容
  if a.ScaleDownEnabled {
    // 缩容逻辑
  }
}

RunOnce 实现细节如下:

代码语言:go
复制
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError {
 	...
  // 初始化获取未调度 pod的对象
  unschedulablePodLister := a.UnschedulablePodLister()

  // 获取所有的 node、ready 的node
  allNodes, readyNodes, typedErr := a.obtainNodeLists(a.CloudProvider)
  originalScheduledPods, err := scheduledPodLister.List()
  // 计算集群资源,获取 node.Status.Capacity[resource] 的值
  coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)

  // 获取 ds 相关pod,后期加入调度器参与模拟调度
	daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything())
	// 手动刷新云资源,保持与本地缓存同步
  err = a.AutoscalingContext.CloudProvider.Refresh()
  
  // 找到 pod.Spec.Priority 值小于设定值 ExpendablePodsPriorityCutoff 的 pod
  // 这些 pod 优先级高,不可以被 expend
  nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)

  // TemplateNodeInfo
  // 将所有运行的 pod,按照不同的 node分类存储好,构造出 NodeInfo 对象,为后续调度准备数据
  // 取出 pod.Spec.NodeName, 依次存储好
  // 依次调用了 
  // 1. MixedTemplateNodeInfoProvider
  // 2. AnnotationNodeInfoProvider
  nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints, currentTime)

  // NodeInfoProcessor
	nodeInfosForGroups, err = a.processors.NodeInfoProcessor.Process(autoscalingContext, nodeInfosForGroups)

  // 获取未注册的 node(在 CA node group 中,但是未注册到 k8s)
  unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
  if len(unregisteredNodes) > 0 {
		// 删除这些 node
		removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext,
			a.clusterStateRegistry, currentTime, autoscalingContext.LogRecorder)
	}
  danglingNodes, err := a.deleteCreatedNodesWithErrors()
  
  // 调整 node group size
  fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)

  // 获取未调度 pod
  // 未调度 pod 的排查规则:
  // selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" +
	//	string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
  unschedulablePods, err := unschedulablePodLister.List()

  unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
	unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
  
  // 根据未调度的 pod 数量,判断是否需要扩容
	if len(unschedulablePodsToHelp) == 0 {
    // 没有未调度的 pod,不需要扩容
		scaleUpStatus.Result = status.ScaleUpNotNeeded
		klog.V(1).Info("No unschedulable pods")
	} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
    /// 已经达到扩容上限
		scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
		klog.V(1).Info("Max total nodes in cluster reached")
	} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
    // 大量 pod 同时创建,一段时间内不再触发扩容,处于冷却期
		a.processorCallbacks.DisableScaleDownForLoop()
		scaleUpStatus.Result = status.ScaleUpInCooldown
		klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
	} else {
		scaleUpStart := time.Now()
		metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
		// 真正执行扩容动作
		scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints)
	}
}
MixedTemplateNodeInfoProvider

将所有的 Node,以及 Node 上运行的 pod,按照 asg 的 id归类保存

代码语言:go
复制
func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, now time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
	...
  // 获取 node 上运行的所有的 pod,key是 node-name,value 是 pod 列表
	podsForNodes, err := getPodsForNodes(ctx.ListerRegistry)
  
  // 定义一个回调函数,处理 node 节点
	processNode := func(node *apiv1.Node) (bool, string, errors.AutoscalerError) {
    // 根据 node信息,调用 clouder provider 扩展点,获取 node group 信息
    // aws: 根据 node.Spec.ProviderID 调用 aws-sdk 获取
		nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
		// 得到 node group 的 id
		id := nodeGroup.Id()
		if _, found := result[id]; !found {
			// 根据给定的node,构造节点信息,确认是否是需要创建的 node
      // getRequiredPodsForNode:将 node 上的 dameonset pod 单独取出来(新节点也必须要运行这些 pod)
      // schedulerframework.NewNodeInfo: 构造新的 node 信息,都是调度框架的函数
      //    pInfo.Update(pod):计算 pod 的亲和性信息
      //    n.AddPodInfo(...):计算 cpu、memory、端口占用、pvc 引用等信息
			nodeInfo, err := simulator.BuildNodeInfoForNode(node, podsForNodes)
			if err != nil {
				return false, "", err
			}
      // 修改从 template 中生成的 node 信息,避免使用了重复的主机名
      //    sanitizeTemplateNode:根据 node group 规则自动生成主机名、新增 trait 信息
      //    schedulerframework.NewNodeInfo:更新完主机信息后,再次调用调度框架
			sanitizedNodeInfo, err := utils.SanitizeNodeInfo(nodeInfo, id, ignoredTaints)
			
			result[id] = sanitizedNodeInfo
			return true, id, nil
		}
		return false, "", nil
	}
  
  // 从 Node Group 中扩展新的节点
	for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
    // nodeGroup.TemplateNodeInfo() 获取 节点模板
    // daemonset.GetDaemonSetPodsForNode: 根据 ds 和 node 返回 pod
    //  schedulerframework.NewNodeInfo:构造完整的 pod
  	nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonsets, ctx.PredicateChecker, ignoredTaints)
		result[id] = nodeInfo
  }
}
AnnotationNodeInfoProvider

从 asg 中获取注解信息,并追加到 NodeInfo 中,便于后续参与调度

代码语言:go
复制
func (p *AnnotationNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, currentTime time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
  // 先经过 mixedTemplateNodeInfoProvider 处理
	nodeInfos, err := p.mixedTemplateNodeInfoProvider.Process(ctx, nodes, daemonsets, ignoredTaints, currentTime)
	
	for _, nodeInfo := range nodeInfos {
    // 拿到所有的 node group
		nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodeInfo.Node())
		// 获取  node  group 模板
		template, err := nodeGroup.TemplateNodeInfo()
		// 获取模板 Annotation 信息
		for key, val := range template.Node().Annotations {
			if _, ok := nodeInfo.Node().Annotations[key]; !ok {
        // 将模板 annotation 添加到 node 上
				nodeInfo.Node().Annotations[key] = val
			}
		}
	}
	return nodeInfos, nil
}

3.4 ScaleUp

  • 找到 cloud provider 所有可用的 node group
  • 把不可调度的 pod 按照扩容需求进行分组
  • 调用
  • 将前两步得到的数据作为输入,传给 estimator 模块的装箱算法,得到候选的 pod、node 分配方案
  • 将上一步得到的结果,传给 expander 模块,得到最优的分配方案。默认提供好几种策略

伪代码实现关键步骤:

代码语言:go
复制
// 前面的动作,将集群所有 Node 和 Pod 做规整,构造 NodeInfo 信息,按 nodeGroupId 建立索引
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(...)

func ScaleUp(...) {
  // 获取所有可用的 node group
 	nodeGroups := context.CloudProvider.NodeGroups()
  // 将所有待调度的 pod 按照调度属性分类
  podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
  expansionOptions := make(map[string]expander.Option, 0)
  // 遍历所有的 node group
  for _, nodeGroup := range nodeGroups {
    // 获取当前 node group 的 nodeInfo 信息
    nodeInfo, found := nodeInfos[nodeGroup.Id()]
    // 计算当前 asg 能够提供的cpu、memory 资源量,确认是否超过限额
    scaleUpResourcesDelta := computeScaleUpResourcesDelta(nodeInfo, nodeGroup, resourceLimiter)
    // 调用 Extimate 模块背包算法,计算出当前 node group 下需要扩展几台 Node,能满足哪些 pod 调度,保存在 option 变量中 
    option, err := computeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
    // 将该 NodeGroup 扩容的情况保存起来
    expansionOptions[nodeGroup.Id()] = option
  }
  // 计算结果重命名为 options
  // 此时有多个满足条件的结果
  options := expansionOptions
  // 调用 Expander 模块,根据启动传入的策略参数,从多个选项中选择最终一个结果
  bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
  
  // 如果 NodeGroup 不存在,创建 NodeGroup
  processors.NodeGroupManager.CreateNodeGroup(context, bestOption.NodeGroup)
  
  // 负载均衡策略计算多个 NodeGroup中各自需要扩容的信息
  scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups()
  for _, info := range scaleUpInfos {
    // 调用 provider 执行扩容
    executeScaleUp(...)
    // 负载均衡
    clusterStateRegistry.Recalculate()
  }
}

ScaleUp实现细节如下:

代码语言:go
复制
func ScaleUp(...) {
  ...
  // 第一步:找到 cloud provider 所有可用的 node group
  // 返回 node 列表中不在 node group 中的 node 子集
  nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider)
  
  // 计算扩容可用的剩余资源
  //  calculateScaleUpCoresMemoryTotal:计算 node group 和非 Node group 所有的资源
  //  sum(nodeGroup.targetSize * nodeInfo.cpu(memory) )
  // computeBelowMax(totalCores, max):根据 CA 配置的资源限额 - 目前所有已使用资源 = 可扩容的资源余量
	scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)

  // Node在NodeGroup中但是没有Registed在kubenetes集群
  // 数量为 newNodes := ar.CurrentTarget - (readiness.Ready + readiness.Unready + readiness.LongUnregistered)
  upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
	for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
    
  }
  
  processors.NodeGroupListProcessor.Process(...)
  
  // 第二步:将所有待调度 pod 按照扩容需求分类
  podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
  
  // 评估所有的 node group,哪些不可用跳过,哪些可用
  skippedNodeGroups := map[string]status.Reasons{}
  
  // 外层循环,遍历所有的 NodeGroup
	for _, nodeGroup := range nodeGroups {
    if nodeGroup.Exist() && !clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) {
      ...
    }
    // 取出当前大小
    currentTargetSize, err := nodeGroup.TargetSize()
    
    // 计算扩容需要的增量资源
    // 取出 node group 下对应的 cpu、memory 信息
    scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter)
    
    // 校验是否超过限额,对比可扩容量和待扩容量
    checkResult := scaleUpResourcesLeft.checkScaleUpDeltaWithinLimits(scaleUpResourcesDelta)

    // 第三步:将前两步得到的数据作为输入,传给 estimator 模块的装箱算法,得到候选的 pod、node 分配方案
    option, err := computeExpansionOption(context, podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
  }

  // 第四步:将上一步得到的结果,传给 expander 模块,得到最优的分配方案
  //  根据expansion (random ,mostpods, price,waste)配置获取最佳伸缩组
  // expander 是表示选择 node group 的策略
  bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
  if bestOption != nil && bestOption.NodeCount > 0 {
    
    // 得到需要扩容的节点数
    newNodes := bestOption.NodeCount
    // 判断是否达到扩容上限
    if context.MaxNodesTotal > 0 && len(nodes)+newNodes+len(upcomingNodes) > context.MaxNodesTotal {
    }
    
    // 不存在 node group,创建新的
    if !bestOption.NodeGroup.Exist() {
      // 创建的 ng 包括主的、扩展的
      createNodeGroupResult, err := processors.NodeGroupManager.CreateNodeGroup(context, bestOption.NodeGroup)
      
      // 根据主 ng 创建 nodeinfo
      // 将 daemonset 追加到到 node group 模板创建出来的 node节点 pod 列表中
      // trait 信息追加到新创建 node 的 Spec.Trait 中
      // 填充 node name
      mainCreatedNodeInfo, err := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
      // 依次创建多个扩展的 ng
			for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
        option, err2 := computeExpansionOption(context, podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)

      }
      
      // 重新计算缓存中节点信息
      clusterStateRegistry.Recalculate()
      
       // 计算出要扩容的资源
  		newNodes, err = applyScaleUpResourcesLimits(context, processors, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
    
      if context.BalanceSimilarNodeGroups {
        // 找到相似的 ng
				similarNodeGroups, typedErr := processors.NodeGroupSetProcessor.FindSimilarNodeGroups(context, bestOption.NodeGroup, nodeInfos)
      }
      
      // 平衡多个 ng
      scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(
			context, targetNodeGroups, newNodes)
      
      // 依次遍历所有待扩容的机器,执行扩容操作
      for _, info := range scaleUpInfos {
        // executeScaleUp 会执行 clouder provider 中的 IncreaseSize 方法
			typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now)
			}
      
     	clusterStateRegistry.Recalculate()
      // 返回扩容成功
			return &status.ScaleUpStatus{
        Result:                  status.ScaleUpSuccessful,
        ScaleUpInfos:            scaleUpInfos,
        PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
        ConsideredNodeGroups:    nodeGroups,
        CreateNodeGroupResults:  createNodeGroupResults,
        PodsTriggeredScaleUp:    bestOption.Pods,
        PodsAwaitEvaluation:     getPodsAwaitingEvaluation(podEquivalenceGroups, bestOption.NodeGroup.Id()),
      }, nil
    }
    // 返回不需要扩容
    return &status.ScaleUpStatus{
      Result:                  status.ScaleUpNoOptionsAvailable,
      PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
      ConsideredNodeGroups:    nodeGroups,
    }, nil
}
3.4.1 ScaleUpInfo

计算出来的 ScaleUpInfo 会传给 Clouder Provider,用于操作云资源

需要开通的资源数量 delta = ScaleUpInfo.NewSize - ScaleUpInfo.CurrentSize

代码语言:go
复制
type ScaleUpInfo struct {
	// Group is the group to be scaled-up
	Group cloudprovider.NodeGroup
	// CurrentSize is the current size of the Group
	CurrentSize int
	// NewSize is the size the Group will be scaled-up to
	NewSize int
	// MaxSize is the maximum allowed size of the Group
	MaxSize int
}
3.4.2 computeExpansionOption

这里分为两大块逻辑:预检查 + 背包计算

预检查:

  • 遍历所有待调度pod
  • 每个 pod 依次去和当前 Node 做预调度,确认一个 Node扩容后可以让 该pod 调度成功
  • 将初步筛选出来满足条件的 pod 列表,

背包计算:

通过前面的计算:所有的 pod中,如果扩容一个Node,一定能满足调度条件;但是到底要创建最少多少个 Node,能满足所有的 pod 调度需求,就要经过 Estimate 模块的背包算法了

  • 通过给定多个 Pod 和多个 Node,计算出最优的 Node 和 Pod 数量
代码语言:go
复制
	func computeExpansionOption(...) {
    // 遍历每个待调度的 pod
    // 用每个 pod 去匹配 node,做模拟调度
		for _, eg := range podEquivalenceGroups {
      // 校验调度
      // 内部调用调度框架
      // p.framework.RunPreFilterPlugins(context.TODO(), state, pod)
      // 	filterStatuses := p.framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo)
      // 确认调度状态是否正确
    	if err := context.PredicateChecker.CheckPredicates(context.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil {
			// 返回可以调度的 pod
			option.Pods = append(option.Pods, eg.pods...)
      // 标记 pod 理论上可以调度成功
			eg.schedulable = true
    }
      
    // 可以成功调度 pod > 0,开始仿真调度
    // 计算需要的 node 数,和可以成功调度的 pod 数
    if len(option.Pods) > 0 {
			estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot)
      // 调用 Estimate 模块,后面单独介绍
			option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup)
		}

		return option, nil
  }

3.5 Estimate

前面介绍过,通过上述计算后,所有的 pod中,如果扩容一个Node,一定能满足调度条件;但是到底要创建最少多少个 Node,能满足所有的 pod 调度需求,就要经过 Estimate 模块的背包算法了

3.5.1 优化目标

通过给定多个 Pod 和多个 Node,计算出最优的 Node 和 Pod 数量

输入
  • 待调度 pod 列表
  • nodeinfo
  • NodeGroup
接口
代码语言:go
复制
type Estimator interface {
	Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod)
}
输出
  • 节点数量
  • pod列表
3.5.2 实现分析
  • 将这组 pod 所需的cpu、memory 资源 / Node节点能提供的资源,计算出每个 pod 的得分
  • 按照得分从高到底排序
  • 按照由高到低得分顺序,依次遍历每个 pod,去匹配 Node
  • 新创建的 Node 保存到 newNodeNames 数组中
  • 如果没有找到,就去创建一个新的 Node。直到所有的 pod 都处理完。
代码语言:go
复制
func (e *BinpackingNodeEstimator) Estimate(
  // 计算 pod 的得分
	podInfos := calculatePodScore(pods, nodeTemplate)
	// 按照得分排序
	sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score })
	// 遍历所有的 pod
  for _, podInfo := range podInfos {
		found := false
		// 确认给定的 pod 是否能调度到 给定的 node 上
    // 内部调用调度框架的 preFilter 依次跟每个 node 过滤一遍, p.framework.RunPreFilterPlugins
		nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
			return newNodeNames[nodeInfo.Node().Name]
		})
		if err == nil {
      // 为 pod 找到合适的 node
			found = true
			scheduledPods = append(scheduledPods, podInfo.pod)
			newNodesWithPods[nodeName] = true
		}

		if !found {
			if lastNodeName != "" && !newNodesWithPods[lastNodeName] {
				continue
			}

      // 添加一个新的 node 进来
			newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
			
			newNodeNameIndex++
			newNodeNames[newNodeName] = true
			lastNodeName = newNodeName

      // 再次尝试调度
      // 如果还是失败:比如设置了 pod 拓扑分布,这种情况无法解决 pending 问题,尝试移除这类 pod
			if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil {
				continue
			}
			if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil {
				klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err)
				return 0, nil
			}
			newNodesWithPods[newNodeName] = true
			scheduledPods = append(scheduledPods, podInfo.pod)
		}
	}
	return len(newNodesWithPods), scheduledPods
}

3.6 Expander 策略

3.6.1 概述

通过前面的处理,会返回多个 Option 对象,即有多个可选的组合可以满足此次调度需求(可能只是部分 pod,不是全部pod),

Expander 提供多种策略,在这一组答案中最终选择一个作为最终答案。

选择要扩展的节点组提供的不同策略,通过 --expander=least-waste 参数指定,可以多个策略组合

输入

给定多个 option,选择一个最合适的 option。每个 option 对应一个 NodeGroup、需要调度的 pod、Node数量

代码语言:go
复制
// Option describes an option to expand the cluster.
type Option struct {
	NodeGroup cloudprovider.NodeGroup
	NodeCount int
	Debug     string
	Pods      []*apiv1.Pod
}
接口
代码语言:go
复制
// Strategy describes an interface for selecting the best option when scaling up
type Strategy interface {
	BestOption(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) *Option
}
输出

最终选定的 Option,即:扩容哪个 NodeGroup、扩容该 NodeGroup 的几台机器

3.6.2 实现分析
策略初始化
代码语言:go
复制
func ExpanderStrategyFromStrings(...) {
		...
    // 根据不同的策略,使用不同的 Filter
		switch expanderFlag {
		case expander.RandomExpanderName:
			filters = append(filters, random.NewFilter())
		case expander.MostPodsExpanderName:
			filters = append(filters, mostpods.NewFilter())
		case expander.LeastWasteExpanderName:
			filters = append(filters, waste.NewFilter())
		case expander.PriceBasedExpanderName:
			if _, err := cloudProvider.Pricing(); err != nil {
				return nil, err
			}
			filters = append(filters, price.NewFilter(cloudProvider,
				price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
				price.SimpleNodeUnfitness))
		case expander.PriorityBasedExpanderName:
			// It seems other listers do the same here - they never receive the termination msg on the ch.
			// This should be currently OK.
			stopChannel := make(chan struct{})
			lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
			filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder))
		case expander.GRPCExpanderName:
			filters = append(filters, grpcplugin.NewFilter(GRPCExpanderCert, GRPCExpanderURL))
		default:
			return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
		}
		if _, ok := filters[len(filters)-1].(expander.Strategy); ok {
			strategySeen = true
		}
	}
	// 最后追加一个 random 的 fallback
	return newChainStrategy(filters, random.NewStrategy()), nil
}
调用策略
代码语言:go
复制
func ScaleUp() {
  ...
  bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
  ...
}

// 执行策略
func (c *chainStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
	filteredOptions := options
  // 依次执行所有的 Filter
	for _, filter := range c.filters {
		filteredOptions = filter.BestOptions(filteredOptions, nodeInfo)
		if len(filteredOptions) == 1 {
			return &filteredOptions[0]
		}
	}
	return c.fallback.BestOption(filteredOptions, nodeInfo)
}
3.6.3 Filter 接口

Expander 策略是通过多个 Filter 实现的,Filter 定义了统一的接口,和多种实现

接口定义

代码语言:go
复制
type Filter interface {
	BestOptions(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) []Option
}
leastwaste 实现
  • 将所需资源与可用资源计算差值,得到分数
  • 取出分数最小的值
代码语言:go
复制
func (l *leastwaste) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	var leastWastedScore float64
	var leastWastedOptions []expander.Option

	for _, option := range expansionOptions {
    // 计算所有 pod 总共需要的 cpu、memory 资源
		requestedCPU, requestedMemory := resourcesForPods(option.Pods)
    // 确认当前的 node group 是否存在
		node, found := nodeInfo[option.NodeGroup.Id()]
		if !found {
      // 不存在就匹配下一个 node group
			klog.Errorf("No node info for: %s", option.NodeGroup.Id())
			continue
		}
		// 找到 Node 能够提供的 cpu、memory 资源
    // cpu = node.Status.Capacity[cpu]
    // memory = node.Status.Capacity[memory]
		nodeCPU, nodeMemory := resourcesForNode(node.Node())
    // 可用资源 = 单节点资源 * nodeGroup数量
		availCPU := nodeCPU.MilliValue() * int64(option.NodeCount)
		availMemory := nodeMemory.Value() * int64(option.NodeCount)
    // 浪费资源 = (可用资源 - 所需资源)/ 可用资源
		wastedCPU := float64(availCPU-requestedCPU.MilliValue()) / float64(availCPU)
		wastedMemory := float64(availMemory-requestedMemory.Value()) / float64(availMemory)
    // 浪费资源数 = cpu浪费 + memory 浪费
		wastedScore := wastedCPU + wastedMemory

		klog.V(1).Infof("Expanding Node Group %s would waste %0.2f%% CPU, %0.2f%% Memory, %0.2f%% Blended\n", option.NodeGroup.Id(), wastedCPU*100.0, wastedMemory*100.0, wastedScore*50.0)

		if wastedScore == leastWastedScore {
			leastWastedOptions = append(leastWastedOptions, option)
		}
		// 取浪费分数最小的选项
		if leastWastedOptions == nil || wastedScore < leastWastedScore {
			leastWastedScore = wastedScore
			leastWastedOptions = []expander.Option{option}
		}
	}

	if len(leastWastedOptions) == 0 {
		return nil
	}

	return leastWastedOptions
}
mostpods
代码语言:go
复制
func (m *mostpods) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	var maxPods int
	var maxOptions []expander.Option

  // 遍历所有的 option
	for _, option := range expansionOptions {
		if len(option.Pods) == maxPods {
			maxOptions = append(maxOptions, option)
		}

    // 取 pod 数量最大的那个选项
		if len(option.Pods) > maxPods {
			maxPods = len(option.Pods)
			maxOptions = []expander.Option{option}
		}
	}

	if len(maxOptions) == 0 {
		return nil
	}

	return maxOptions
}
random
代码语言:go
复制
func (r *random) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	// 调用 BestOption
  best := r.BestOption(expansionOptions, nodeInfo)
	if best == nil {
		return nil
	}
	return []expander.Option{*best}
}

func (r *random) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
	if len(expansionOptions) <= 0 {
		return nil
	}
	// 从所有备选 option 中随机选择一个
	pos := rand.Int31n(int32(len(expansionOptions)))
	return &expansionOptions[pos]
}
priority
代码语言:go
复制
func (p *priority) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	
	// 读取名为 cluster-autoscaler-priority-expander,key 为 priorities 的 configmap
  // 将yaml数据转换为 type priorities map[int][]*regexp.Regexp 对象
	priorities, cm, err := p.reloadConfigMap()
	
  // 遍历所有 option
	for _, option := range expansionOptions {
		// 获取 node group 的 id
    id := option.NodeGroup.Id()
		found := false
    // 遍历所有的优先级
		for prio, nameRegexpList := range priorities {
      // 优先级列表中匹配当前的 node group id
      // 匹配不到就跳过
			if !p.groupIDMatchesList(id, nameRegexpList) {
				continue
			}
			found = true
      // 当前优先级低就跳过
			if prio < maxPrio {
				continue
			}
      // 找到优先级最高那个
			if prio > maxPrio {
				maxPrio = prio
				best = nil
			}
			best = append(best, option)

		}
		if !found {
			msg := fmt.Sprintf("Priority expander: node group %s not found in priority expander configuration. "+
				"The group won't be used.", id)
			p.logConfigWarning(cm, "PriorityConfigMapNotMatchedGroup", msg)
		}
	}
	// 优先级失效
	if len(best) == 0 {
		msg := "Priority expander: no priorities info found for any of the expansion options. No options filtered."
		p.logConfigWarning(cm, "PriorityConfigMapNoGroupMatched", msg)
		return expansionOptions
	}

	for _, opt := range best {
		klog.V(2).Infof("priority expander: %s chosen as the highest available", opt.NodeGroup.Id())
	}
	return best
}
price

选择成本最小的,依赖 cloud provider 的价格模型,aws cloud provider 没有实现,可以不用考虑

代码语言:go
复制
// BestOption selects option based on cost and preferred node type.
func (p *priceBased) BestOptions(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) []expander.Option {
	var bestOptions []expander.Option
	....
}
grpc
代码语言:go
复制
func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	// 判断 grpcClient 参数是否传入
  if g.grpcClient == nil {
		klog.Errorf("Incorrect gRPC client config, filtering no options")
		return expansionOptions
	}
	
  // 调用 grpc 请求
	bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap})
	...
	return options
}

func (c *expanderClient) BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error) {
	out := new(BestOptionsResponse)
	err := c.cc.Invoke(ctx, "/grpcplugin.Expander/BestOptions", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

grpc对应 proto 文件

代码语言:text
复制
// Interface for Expander
service Expander {

  rpc BestOptions (BestOptionsRequest)
    returns (BestOptionsResponse) {}
}

3.7 缩容实现

3.7.1 缩容概述

缩容和扩容都在同一个定时器中,即默认10s一个检查循环。满足以下所有条件会触发缩容:

  • 在改节点上运行的所有 pod 的 cpu、memory的总和 < 节点可分配总额的 50%。(所有参数都可定制)
  • 节点上运行的所有 pod(除Daemonset),都可以移动到其他节点(特殊pod可以添加注解禁止CA调度到其他Node)
  • Node 没有添加禁用缩减的 Annotation

缩容其他注意事项:

  • 一个Node从检查出空闲,持续10min时间内依然空闲,才会被真正移除
  • 缩容操作一次之后缩一个,避免不可预期的错误

关键源码实现:

代码语言:go
复制
func (a *StaticAutoscaler) RunOnce(...){
  // 缩容逻辑
  if a.ScaleDownEnabled {
    // 获取可能将被删除的 Node 列表,只是初步判断 Node 所在 ASG 实例数是否缩容到最小了
    scaleDownCandidates := GetScaleDownCandidates()
    // 返回某个 Node 被删除后,可能容纳node上 pod 的Node,默认返回所有 nodes
    podDestinations := GetPodDestinationCandidates()
    
    // 关键方法,通过各个维度统计出不再需要的 Node
    // 更新不再需要的 Node 信息,保存在 scaleDown.unneededNodes 中
    scaleDown.UpdateUnneededNodes(podDestinations, scaleDownCandidates)
    
    if scaleDownInCooldown {
      // 缩容冷却中
			scaleDownStatus.Result = status.ScaleDownInCooldown
		} else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
      // 正在进行缩容过程中
			scaleDownStatus.Result = status.ScaleDownInProgress
		} else {
    	// 可以开始缩容
      scaleDownStatus := scaleDown.TryToScaleDown(currentTime, pdbs)
    } 
  }
}
3.7.2 源码分析
代码语言:go
复制
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError {
  // 扩容逻辑,前面已分析
  ...
  // 缩容逻辑
  if a.ScaleDownEnabled {
    // 特殊处理的 pod
		pdbs, err := pdbLister.List()
		
    // 计算不再需要的 node
    // 保存待缩容的候选 Node
		var scaleDownCandidates []*apiv1.Node
    // 保存可以存放被删除 Node上pod的节点
		var podDestinations []*apiv1.Node

		if a.processors == nil || a.processors.ScaleDownNodeProcessor == nil {
			scaleDownCandidates = allNodes
			podDestinations = allNodes
		} else {
			var err errors.AutoscalerError
      // 初步筛选处理
			scaleDownCandidates, err = a.processors.ScaleDownNodeProcessor.GetScaleDownCandidates(
				autoscalingContext, allNodes)
			// pod选择新的 node
			podDestinations, err = a.processors.ScaleDownNodeProcessor.GetPodDestinationCandidates(autoscalingContext, allNodes)
		}

		// We use scheduledPods (not originalScheduledPods) here, so artificial scheduled pods introduced by processors
		// (e.g unscheduled pods with nominated node name) can block scaledown of given node.
		if typedErr := scaleDown.UpdateUnneededNodes(podDestinations, scaleDownCandidates, currentTime, pdbs); typedErr != nil {
			scaleDownStatus.Result = status.ScaleDownError
			klog.Errorf("Failed to scale down: %v", typedErr)
			return typedErr
		}

		metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart)

		if klog.V(4).Enabled() {
			for key, val := range scaleDown.unneededNodes {
				klog.Infof("%s is unneeded since %s duration %s", key, val.String(), currentTime.Sub(val).String())
			}
		}

		scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop ||
			a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
			a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
			a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
		// In dry run only utilization is updated
		calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()

		klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+
			"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v "+
			"isDeleteInProgress=%v scaleDownInCooldown=%v",
			calculateUnneededOnly, a.lastScaleUpTime,
			a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop,
			scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress(), scaleDownInCooldown)
		metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)

		if scaleDownInCooldown {
			scaleDownStatus.Result = status.ScaleDownInCooldown
		} else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
			scaleDownStatus.Result = status.ScaleDownInProgress
		} else {
			klog.V(4).Infof("Starting scale down")

			// We want to delete unneeded Node Groups only if there was no recent scale up,
			// and there is no current delete in progress and there was no recent errors.
			removedNodeGroups, err := a.processors.NodeGroupManager.RemoveUnneededNodeGroups(autoscalingContext)
			if err != nil {
				klog.Errorf("Error while removing unneeded node groups: %v", err)
			}

			scaleDownStart := time.Now()
			metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
      // 开始尝试缩容
			scaleDownStatus, typedErr := scaleDown.TryToScaleDown(currentTime, pdbs)
			metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
			metrics.UpdateUnremovableNodesCount(scaleDown.getUnremovableNodesCount())

			scaleDownStatus.RemovedNodeGroups = removedNodeGroups

			if scaleDownStatus.Result == status.ScaleDownNodeDeleteStarted {
				a.lastScaleDownDeleteTime = currentTime
				a.clusterStateRegistry.Recalculate()
			}

			if (scaleDownStatus.Result == status.ScaleDownNoNodeDeleted ||
				scaleDownStatus.Result == status.ScaleDownNoUnneeded) &&
				a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
        // 
				scaleDown.SoftTaintUnneededNodes(allNodes)
			}

			if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
				scaleDownStatus.SetUnremovableNodesInfo(scaleDown.unremovableNodeReasons, scaleDown.nodeUtilizationMap, scaleDown.context.CloudProvider)
				a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
				scaleDownStatusProcessorAlreadyCalled = true
			}

			if typedErr != nil {
				klog.Errorf("Failed to scale down: %v", typedErr)
				a.lastScaleDownFailTime = currentTime
				return typedErr
			}
		}
	}
	return nil
}
GetScaleDownCandidates

这一步只是判断哪些 Node 节点所在的 ASG 符合要求

代码语言:go
复制
func (n *PreFilteringScaleDownNodeProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext,
	nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
	result := make([]*apiv1.Node, 0, len(nodes))

  // 获取每个 asg 当前的实例个数,保存为 map
	nodeGroupSize := utils.GetNodeGroupSizeMap(ctx.CloudProvider)

  // 遍历所有 node
	for _, node := range nodes {
    // 获取当前 node 所属的 asg
		nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
		// 获取当前 asg 的实例数
		size, found := nodeGroupSize[nodeGroup.Id()]
		// 获取 asg 的最小实例数。当前实例数已经最小了,就跳过不再缩容
		if size <= nodeGroup.MinSize() {
			klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
			continue
		}
    // 追加到结果中
		result = append(result, node)
	}
	return result, nil
}
GetPodDestinationCandidates

默认返回所有的 Node

代码语言:go
复制
func (n *PreFilteringScaleDownNodeProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext,
	nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
	return nodes, nil
}
UpdateUnneededNodes

计算不再需要的node,从以下维度逐一排查:

  • 所有的 pod 可以被调度到其他节点
  • 资源使用率低于某个阈值
  • 其他判断

找到可以移除的节点,放到 unneededNodes 数组中,便于后面移除

代码语言:go
复制
// destinationNodes:可以用来安置由于缩容导致被驱逐的pod的节点
// scaleDownCandidates:可以考虑缩容的节点
func (sd *ScaleDown) UpdateUnneededNodes(
	destinationNodes []*apiv1.Node,
	scaleDownCandidates []*apiv1.Node,
	timestamp time.Time,
	pdbs []*policyv1.PodDisruptionBudget,
) errors.AutoscalerError {

  // 第一步:计算节点资源利用率(只计算被管理的节点)
	for _, node := range scaleDownCandidates {
		// 获取节点信息
    nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(node.Name)
		// 检查节点情况,是否满足缩容
    // 1. 节点是否最近已经被标记为删除,这种节点打了 ToBeDeletedByClusterAutoscaler 的 taint
    // 2. 节点是否有 cluster-autoscaler.kubernetes.io/scale-down-disabled 这个禁止缩容的标签
    // 3. CalculateUtilization 计算资源使用率:累加所有 pod 上容器设置的 cpu、memroy request值
    // 4. isNodeBelowUtilizationThreshold 判断资源使用是否达到阈值(可启动时配置)
		reason, utilInfo := sd.checkNodeUtilization(timestamp, node, nodeInfo)
		
		// 保存可以被删除的节点
		currentlyUnneededNodeNames = append(currentlyUnneededNodeNames, node.Name)
	}

	// 第二步:将候选缩容节点和其他节点分开
	currentCandidates, currentNonCandidates := sd.chooseCandidates(currentlyUnneededNonEmptyNodes)

  // 找到新节点,用于移除候选节点
	nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(
		currentCandidates,
		destinations,
		nil,
		sd.context.ClusterSnapshot,
		sd.context.PredicateChecker,
		len(currentCandidates),
		true,
		sd.podLocationHints,
		sd.usageTracker,
		timestamp,
		pdbs)

  //  additionalCandidatesCount 表示用于缩容额外的备选节点数量
	additionalCandidatesCount := sd.context.ScaleDownNonEmptyCandidatesCount - len(nodesToRemove)
	if additionalCandidatesCount > len(currentNonCandidates) {
		additionalCandidatesCount = len(currentNonCandidates)
	}

  // 限制并发缩容数量
	additionalCandidatesPoolSize := int(math.Ceil(float64(len(allNodeInfos)) * sd.context.ScaleDownCandidatesPoolRatio))
	if additionalCandidatesPoolSize < sd.context.ScaleDownCandidatesPoolMinCount {
		additionalCandidatesPoolSize = sd.context.ScaleDownCandidatesPoolMinCount
	}
	if additionalCandidatesPoolSize > len(currentNonCandidates) {
		additionalCandidatesPoolSize = len(currentNonCandidates)
	}
	if additionalCandidatesCount > 0 {
	
    // 找到新节点,用于移除候选节点
		additionalNodesToRemove, additionalUnremovable, additionalNewHints, simulatorErr :=
			simulator.FindNodesToRemove(
				currentNonCandidates[:additionalCandidatesPoolSize],
				destinations,
				nil,
				sd.context.ClusterSnapshot,
				sd.context.PredicateChecker,
				additionalCandidatesCount,
				true,
				sd.podLocationHints,
				sd.usageTracker,
				timestamp,
				pdbs)
	}
  // 将待移除节点保存到 unneededNodes 数组中
  for _, node := range nodesToRemove {
		name := node.Node.Name
		unneededNodesList = append(unneededNodesList, node.Node)
		if val, found := sd.unneededNodes[name]; !found {
			result[name] = timestamp
		} else {
			result[name] = val
		}
	}
	...
}
TryToScaleDown
代码语言:go
复制
func (sd *ScaleDown) TryToScaleDown(
	currentTime time.Time,
	pdbs []*policyv1.PodDisruptionBudget,
) (*status.ScaleDownStatus, errors.AutoscalerError) {

	...
  // 遍历待删除 node 列表
	for nodeName, unneededSince := range sd.unneededNodes {
		
		// 获取 nodeinfo、node 信息
    nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(nodeName)
		node := nodeInfo.Node()
		// 检查 node 是否打上了禁止删除的 annotation
		if hasNoScaleDownAnnotation(node) {
			klog.V(4).Infof("Skipping %s - scale down disabled annotation found", node.Name)
			sd.addUnremovableNodeReason(node, simulator.ScaleDownDisabledAnnotation)
			continue
		}
		// 获取 node 状态,根据状态做一些处理
		ready, _, _ := kube_util.GetReadinessState(node)
		
    // 计算缩容资源
		scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
		// 检查资源限制
		checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
		...
		candidateNames = append(candidateNames, node.Name)
		candidateNodeGroups[node.Name] = nodeGroup
	}

  // 寻找一个待移除节点
	nodesToRemove, unremovable, _, err := simulator.FindNodesToRemove(
		candidateNames,
		nodesWithoutMasterNames,
		sd.context.ListerRegistry,
		sd.context.ClusterSnapshot,
		sd.context.PredicateChecker,
		1,
		false,
		sd.podLocationHints,
		sd.usageTracker,
		time.Now(),
		pdbs)

  // 计算时差
	nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
	sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true)

	go func() {
		...
    // 删除节点
		result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, toRemove.DaemonSetPods, nodeGroup)
  }
}

四、CA 使用注意

aws官方说明

4.1 asg 自动发现参数配置

  • 为 AutoScaling 设置两个标签,便于 CA 自动发现
  • 关于跨可用区:
  • 可以设置多个 AutoScaling 组,每个组一个可用区,通过开启--balance-similar-node-groups` 功能。注意:需要为不同的组设置相同的一批标签
  • 也可以设置同一个 AutoScaling 组,但是必须将组设置可跨多个可用区
  • 更推荐使用多个 AutoScaling 组

4.2 优化节点组:

  • 节点组中的每个节点必须具有相同的调度属性,包括标签、污点和资源
    • 策略中指定的第一个实例类型模拟调度。
    • 如果您的策略具有拥有更多资源的其他实例类型,则在横向扩展后可能会浪费资源。
    • 如果您的策略具有其他实例类型,其资源比原始实例类型少,则 Pod 在实例上调度可能失败。
  • 请使用较多节点配置较少数量的节点组,因为相反的配置可能会对可扩展性产生不利影响。

4.3 AutoScaling

  • 混合实例策略:支持多个实例类型,配置时推荐使用相似的资源类型:比如:M4M5M5a,M5n
  • 可以通过 configmap 设置不同 AutoScaling 的优先级
  • AutoScaling 的机型也支持权重
  • 支持启动配置、启动模板两种模式
  • 启动模板里面指定机型
  • 启动模板覆盖项支持配置多个机型

4.4 Expander 策略

选择要扩展的节点组提供的不同策略,通过 --expander=least-waste 参数指定

可选参数包括:

  • random:随机选择
  • most-pods:能满足最多 pod 调度的
  • Least-waste:最少 cpu 和 memroy
  • Price:成本最小
  • priority:按用户指定的优先级
  • grpc:调用外部 grpc 服务选择扩容节点

4.5 超额配置

  • 通过配置一个空的Deployment,占用资源,如果资源不足优先驱逐,达到尽快扩容的目的

4.6 防止pod被驱逐

配置 `cluster-autoscaler.kubernetes.io/safe-to-evict=false 注解,可以确保 pod不被驱逐,pod所在 node 不被缩减

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 三、主流程
    • 3.1 main 启动入口
      • 3.2 run
        • 3.3 autoscaler
          • 3.3.1 buildAutoscaler
          • 3.3.2 NewAutoscaler
          • 3.3.3 Autoscaler.Start
          • 3.3.4 Autoscaler.RunOnce
        • 3.4 ScaleUp
          • 3.4.1 ScaleUpInfo
          • 3.4.2 computeExpansionOption
        • 3.5 Estimate
          • 3.5.1 优化目标
          • 3.5.2 实现分析
        • 3.6 Expander 策略
          • 3.6.1 概述
          • 3.6.2 实现分析
          • 3.6.3 Filter 接口
        • 3.7 缩容实现
          • 3.7.1 缩容概述
          • 3.7.2 源码分析
      • 四、CA 使用注意
        • 4.1 asg 自动发现参数配置
          • 4.2 优化节点组:
            • 4.3 AutoScaling
              • 4.4 Expander 策略
                • 4.5 超额配置
                  • 4.6 防止pod被驱逐
                  相关产品与服务
                  容器服务
                  腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档