前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >K8S之HPA自动扩缩容机制

K8S之HPA自动扩缩容机制

作者头像
tunsuy
发布2023-08-19 09:33:24
5760
发布2023-08-19 09:33:24
举报

简介

kubectl scale 命令可以来实现 Pod 的扩缩容功能,但是这个毕竟是完全手动操作的,要应对线上的各种复杂情况,我们需要能够做到自动化去感知业务,来自动进行扩缩容。为此,Kubernetes 也为我们提供了这样的一个资源对象: Horizontal Pod Autoscaling(Pod 水平自动伸缩) ,简称 HPA ,HPA 通过监控分析一些控制器控制的所有 Pod 的负载变化情况来确定是否需要调整 Pod 的副本数量

HPA(Horizontal Pod Autoscaler)是kubernetes的一种资源对象,能够根据某些指标对在statefulset、replicacontroller、replicaset等集合中的pod数量进行动态伸缩,使运行在上面的服务对指标的变化有一定的自适应能力。

HPA目前支持四种类型的指标,分别是Resource、Object、External、Pods。其中在稳定版本autoscaling/v1只支持对CPU指标的动态伸缩,在测试版本autoscaling/v2beta2中支持memory和自定义指标的动态伸缩,并以annotation的方式工作在autoscaling/v1版本中。 注意:Pod的自动缩放不适用于无法缩放的对象。

设置

可以通过使用kubectl来创建HPA。如通过 kubectl create 命令创建一个 HPA 对象,也可以通过kubectl autoscale来创建 HPA 对象。 例如,命令 kubectl autoscale rs foo --min=2 --max=5 --cpu-percent=80 将会为名 为 foo 的 ReplicationSet 创建一个 HPA 对象, 目标 CPU 使用率为 80%,副本数量配置为 2 到 5 之间。 如果指标变化太频繁,我们也可以使用 --horizontal-pod-autoscaler-downscale-stabilization 指令设置扩缩容延迟时间,表示的是自从上次缩容执行结束后,多久可以再次执行缩容,默认是5m。

配置

代码语言:javascript
复制
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
  name: php-apache
  namespace: default
spec:
  # HPA的伸缩对象描述,HPA会动态修改该对象的pod数量
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: php-apache
  # HPA的最小pod数量和最大pod数量
  minReplicas: 1
  maxReplicas: 10
  # 监控的指标数组,支持多种类型的指标共存
  metrics:
  # Object类型的指标
  - type: Object
    object:
      metric:
        # 指标名称
        name: requests-per-second
      # 监控指标的对象描述,指标数据来源于该对象
      describedObject:
        apiVersion: networking.k8s.io/v1beta1
        kind: Ingress
        name: main-route
      # Value类型的目标值,Object类型的指标只支持Value和AverageValue类型的目标值
      target:
        type: Value
        value: 10k
  # Resource类型的指标
  - type: Resource
    resource:
      name: cpu
      # Utilization类型的目标值,Resource类型的指标只支持Utilization和AverageValue类型的目标值
      target:
        type: Utilization
        averageUtilization: 50
  # Pods类型的指标
  - type: Pods
    pods:
      metric:
        name: packets-per-second
      # AverageValue类型的目标值,Pods指标类型下只支持AverageValue类型的目标值
      target:
        type: AverageValue
        averageValue: 1k
  # External类型的指标
  - type: External
    external:
      metric:
        name: queue_messages_ready
        # 该字段与第三方的指标标签相关联
        selector:
          matchLabels:
            env: "stage"
            app: "myapp"
      # External指标类型下只支持Value和AverageValue类型的目标值
      target:
        type: AverageValue
        averageValue: 30

源码分析

先上结论: HPA在kubernetes中也由一个controller控制,controller会间隔循环HPA,检查每个HPA中监控的指标是否触发伸缩条件,默认的间隔时间为15s。一旦触发伸缩条件,controller会向kubernetes发送请求,修改伸缩对象(statefulSet、replicaController、replicaSet)子对象scale中控制pod数量的字段。kubernetess响应请求,修改scale结构体,然后会刷新一次伸缩对象的pod数量。伸缩对象被修改后,自然会通过list/watch机制增加或减少pod数量,达到动态伸缩的目的。

  • 对于每个pod的资源指标(如CPU),控制器从资源指标API中获取每一个HorizontalPodAutoscaler指定的pod的指标,如果设置了目标使用率,控制器会获取每个Pod中的容器资源使用情况,并计算资源使用率。如果使用原始值,将直接使用原始数据,进而计算出目标副本数。这里注意的是,如果Pod某些容器不支持资源采集,那么该控制器将不会使用该pod的CPU使用率。
  • 如果pod使用自定义指标,控制器机制与资源指标类型,区别在于自定义的指标只适用原始值,而不是利用率。
  • 如果pod使用的对象指标和外部指标(每个指标描述一个对象信息),这个指标将直接跟目标指标设定值相比较,并生成一个上述的缩放比例。在最新的autoscaling/v2beta2版本API中,这个指标也可以根据pod数量平分后再进行计算。通常情况,控制器从一系列的聚合API(metrics.k8s.io,custom.metrics.k8s.io和external.metrics.k8s.io)中获取指标数据。metrics.k8s.io API通常由metrics-server(这里需要额外启动)提供。

下面来看下具体的源码分析 hpa也是由一个controller控制的,因此: 入口:cmd/kube-controller-manager/app/controllermanager.go

代码语言:javascript
复制
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
  controllers := map[string]InitFunc{}
  controllers["endpoint"] = startEndpointController
  controllers["endpointslice"] = startEndpointSliceController
  controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
  controllers["replicationcontroller"] = startReplicationController
  controllers["podgc"] = startPodGCController
  controllers["resourcequota"] = startResourceQuotaController
  controllers["namespace"] = startNamespaceController
  controllers["serviceaccount"] = startServiceAccountController
  controllers["garbagecollector"] = startGarbageCollectorController
  controllers["daemonset"] = startDaemonSetController
  controllers["job"] = startJobController
  controllers["deployment"] = startDeploymentController
  controllers["replicaset"] = startReplicaSetController
  controllers["horizontalpodautoscaling"] = startHPAController
}

从上面可以看出:HPA Controller和其他的Controller一样,都在NewControllerInitializers方法中进行注册,然后通过startHPAController来启动。

根据启动函数,层层调用,最终会走到HorizontalController的run方法

代码语言:javascript
复制
// Run begins watching and syncing.
func (a *HorizontalController) Run(ctx context.Context) {
   defer utilruntime.HandleCrash()
   defer a.queue.ShutDown()

   klog.Infof("Starting HPA controller")
   defer klog.Infof("Shutting down HPA controller")

   if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
      return
   }

   // start a single worker (we may wish to start more in the future)
   go wait.UntilWithContext(ctx, a.worker, time.Second)

   <-ctx.Done()
}

从上面可以看出:

是启动了一个异步线程,每秒执行一次worker方法

下面来看下worker方法,经过层层调用,最终走到下面的主逻辑方法中:

代码语言:javascript
复制
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
   // make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
   hpa := hpaShared.DeepCopy()
   hpaStatusOriginal := hpa.Status.DeepCopy()

   reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

   targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
   if err != nil {
      a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
      a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
      return fmt.Errorf("invalid API version in scale target reference: %v", err)
   }

   targetGK := schema.GroupKind{
      Group: targetGV.Group,
      Kind: hpa.Spec.ScaleTargetRef.Kind,
   }

   mappings, err := a.mapper.RESTMappings(targetGK)
   if err != nil {
      a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
      a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
      return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
   }

   scale, targetGR, err := a.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
   if err != nil {
      a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
      a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
      return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
   }
   setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
   currentReplicas := scale.Spec.Replicas
   a.recordInitialRecommendation(currentReplicas, key)

   var (
      metricStatuses []autoscalingv2.MetricStatus
      metricDesiredReplicas int32
      metricName string
   )

   desiredReplicas := int32(0)
   rescaleReason := ""

   var minReplicas int32

   if hpa.Spec.MinReplicas != nil {
      minReplicas = *hpa.Spec.MinReplicas
   } else {
      // Default value
      minReplicas = 1
   }

   rescale := true

   if scale.Spec.Replicas == 0 &amp;&amp; minReplicas != 0 {
      // Autoscaling is disabled for this resource
      desiredReplicas = 0
      rescale = false
      setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
   } else if currentReplicas > hpa.Spec.MaxReplicas {
      rescaleReason = "Current number of replicas above Spec.MaxReplicas"
      desiredReplicas = hpa.Spec.MaxReplicas
   } else if currentReplicas < minReplicas {
      rescaleReason = "Current number of replicas below Spec.MinReplicas"
      desiredReplicas = minReplicas
   } else {
      var metricTimestamp time.Time
      metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
      if err != nil {
         a.setCurrentReplicasInStatus(hpa, currentReplicas)
         if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
            utilruntime.HandleError(err)
         }
         a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
         return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
      }

      klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)

      rescaleMetric := ""
      if metricDesiredReplicas > desiredReplicas {
         desiredReplicas = metricDesiredReplicas
         rescaleMetric = metricName
      }
      if desiredReplicas > currentReplicas {
         rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
      }
      if desiredReplicas < currentReplicas {
         rescaleReason = "All metrics below target"
      }
      if hpa.Spec.Behavior == nil {
         desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
      } else {
         desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
      }
      rescale = desiredReplicas != currentReplicas
   }

   if rescale {
      scale.Spec.Replicas = desiredReplicas
      _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
      if err != nil {
         a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
         setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
         a.setCurrentReplicasInStatus(hpa, currentReplicas)
         if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
            utilruntime.HandleError(err)
         }
         return fmt.Errorf("failed to rescale %s: %v", reference, err)
      }
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
      a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
      a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
      klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
         hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
   } else {
      klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
      desiredReplicas = currentReplicas
   }

   a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
   return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
}

HPA的主要伸缩流程如下:

1)判断当前Pod数量是否在HPA设定的Pod数量空间中,如果不在,过小返回最小值,过大返回最大值,结束伸缩。

2)判断指标的类型,并向api server发送对应的请求,拿到设定的监控指标。一般来说指标会从下面系列聚合API中获取(metrics.k8s.io,custom.metrics.k8s.io和external.metrics.k8s.io)。其中metrics.k8s.io一般由kubernetes自带的metrics-server来提供,主要是cpu、memory使用率指标。另外两种需要第三方的adapter来提供。custom.metrics.k8s.io提供的自定义指标数据,一般与kubernetes集群有关,比如跟特定的pod相关。external.metrics.k8s.io同样提供自定义指标数据,但一般与kubernetes集群无关,许多知名的第三方监控平台提供了adapter实现上述api(如prometheus),可以将监控和adapter一同部署在kubenetes集群中提供服务。甚至能够替换原来的metrics-server来提供上述三类api指标,达到深度定制监控数据的目标。

3)根据获取的指标,使用相关的算法计算出一个伸缩系数,并乘以当前pod数量以获得期望的pod数量。这里系数是指标的期望值与目前值的比值,如果大于1表示扩容,小于1表示缩容。指数数值有平均值(AverageValue)、平均使用率(Utilization)、裸值(Value)三种类型 每种类型的数值都有对应的算法。注意下面事项:如果系数有小数点,统一进一;系数如果未达到某个容忍值,HPA认为变化太小,会忽略这次变化,容忍值默认为0.1。

  • 这里HPA扩容算法比较保守,如果出现获取不到指标的情况,扩容时算最小值,缩容时算最大值。如果需要计算平均值,出现pod没准备好的情况,我们保守地假设尚未就绪的pods消耗了试题指标的0%,从而进一步降低了伸缩的幅度。
  • 一个HPA支持多个指标的监控,HPA会循环获取所有的指标,并计算期望的pod数量,并从期望结果中获得最大的pod数量作为最终的伸缩的pod数量。一个伸缩对象在k8s中允许对应多个HPA,但是只是k8s不会报错而已,事实上HPA彼此不知道自己监控的是同一个伸缩对象,在这个伸缩对象中的pod会被多个HPA无意义地来回修改pod数量,给系统增加消耗,如果想要指定多个监控指标,可以如上述所说,在一个HPA中添加多个监控指标。
  • 4)检查最终pod数量是否在HPA设定的pod数量范围的区间,如果超过最大值或不足最小值都会修改为最大值或者最小值。然后会向kubernetes发出请求,修改伸缩对象的子对象scale的pod数量,结束一个HPA的检查,获取下一个HPA,完成一个伸缩流程。

下面来看下计算副本的逻辑:

代码语言:javascript
复制
func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
   specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
   timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {

   switch spec.Type {
   case autoscalingv2.ObjectMetricSourceType:
      metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
      if err != nil {
         condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
      }
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
      }
   case autoscalingv2.PodsMetricSourceType:
      metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
      if err != nil {
         condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
      }
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
      }
   case autoscalingv2.ResourceMetricSourceType:
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
      }
   case autoscalingv2.ContainerResourceMetricSourceType:
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
      }
   case autoscalingv2.ExternalMetricSourceType:
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
      }
   default:
      errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
      err = fmt.Errorf(errMsg)
      condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
      return 0, "", time.Time{}, condition, err
   }
   return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

这段代码就是根据hpa yaml文件中不同类型的metrics,调用不同的metrics restclient从对应的apiserver中获取指标,参见图2-1及说明。三种资源类型由统一接口分别调用获取对应指标。

代码语言:javascript
复制
// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
   // GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
   // for all pods matching the specified selector in the given namespace
   GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error)
   // GetRawMetric gets the given metric (and an associated oldest timestamp)
   // for all pods matching the specified selector in the given namespace
   GetRawMetric(metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (PodMetricsInfo, time.Time, error)
   // GetObjectMetric gets the given metric (and an associated timestamp) for the given
   // object in the given namespace
   GetObjectMetric(metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, metricSelector labels.Selector) (int64, time.Time, error)
   // GetExternalMetric gets all the values of a given external metric
   // that match the specified selector.
   GetExternalMetric(metricName string, namespace string, selector labels.Selector) ([]int64, time.Time, error)
}

GetResourceMetric请求的是metrics-server通常获取workload的每个pod cpu/mem指标使用信息。GetRawMetric 、GetObjectMetric 对接的是custom-metrics-server,GetExternalMetric对接的是external-metrics-apiserver,两者分别有对应的adapter方法提供获取指标的标准接口,开发者只需按照自己的逻辑去实现对应接口,以及按照apiservice服务注册的方式进行部署。限于篇幅,在此不对custom-metrics-apiserver开发作详细讲解。有兴趣可参考 http://www.github.com/kubernetes-incubator/custom-metrics-apiserver 、https://github.com/ DirectXMan12/k8s-prometheus-adapter.git。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-06-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有文化的技术人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 设置
  • 配置
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档