首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >K8S-Node自动扩容项目CA源码分析(上)

K8S-Node自动扩容项目CA源码分析(上)

原创
作者头像
kinnylee
发布2022-07-02 21:58:09
9910
发布2022-07-02 21:58:09
举报

一、概述

上一篇文章介绍了 k8s 自动扩缩容的三种方式:HPA、VPA、CA,以及各自的使用场景和架构。本文针对 CA 做源码分析。

1.1 CA架构回顾

参考

CA由一下几个模块组成:

  • autoscaler:核心模块,负责整体扩缩容功能
  • Estimator:负责评估计算扩容节点
  • Simulator:负责模拟调度,计算缩容节点
  • Cloud Provider:与云上 IaaS 层交互,负责增删改查节点。云厂商需要实现相关接口。
ca
ca

1.2 仓库代码结构

源码地址

CA 代码在 k8s 官方的 autoscaler 仓库下,该仓库存放自动扩缩容相关组件,包括前文介绍的 VPA、今天的主角CA、还有一个VPA修改pod资源的插件 Addon Resizer。使用的版本是 cluster-autoscaler-release-1.21,目录结构如下

.
├── CONTRIBUTING.md
├── LICENSE
├── OWNERS
├── README.md
├── SECURITY_CONTACTS
├── addon-resizer       # addon-resizer 代码
├── builder
├── charts
├── cluster-autoscaler  # CA 代码
├── code-of-conduct.md
├── hack
└── vertical-pod-autoscaler # VPA 代码

1.3 CA 代码结构

.
├── FAQ.md															# FAQ,里面有很多关于 CA 原理和使用的说明
├── cloudprovider												# cloud provider模块,包括接口和各个云厂商的实现
│   ├── alicloud
│   ├── aws
│   ├── azure
│   ├── baiducloud
│   ├── builder
│   ├── cloud_provider.go								# cloud provider 接口,要对接自己的云,需要实现该接口操作 IaaS 资源
│   ├── gce
│   ├── huaweicloud
├── core                                # CA 核心模块
│   ├── autoscaler.go										# 定义 Autoscaler 接口
│   ├── equivalence_groups.go						# 资源不足的 pod 按扩容类型分类的处理逻辑
│   ├── filter_out_schedulable.go
│   ├── scale_down.go     							# 缩容
│   ├── scale_up.go											# 扩容
│   ├── static_autoscaler.go					  # Autoscaler 的实现类
│   └── utils
├── estimator														# Estimator 模块,评估扩容节点
│   ├── binpacking_estimator.go					# Estimator 实现类,实现首次适应背包算法(装箱算法)
│   └── estimator.go										# 定义 Estimator 接口
├── expander				
│   ├── expander.go											# expander 定义了选择多个符合条件的 NodeGroup 的策略接口
│   ├── factory													# 根据传入的不同策略名称,创建对应的实现类
│   ├── mostpods												# mostpods 策略:调度最多的 pod
│   ├── price														# price 策略:价格最低
│   ├── priority												# priority 策略:根据 NodeGroup 的优先级选择
│   ├── random													# random 策略:随机选择符合条件的 NodeGroup 中的一个
│   └── waste														# waste 策略:资源利用率最高
├── go.mod
├── go.sum															
├── main.go															# main 方法
├── metrics															# 指标采集
├── processors
│   ├── callbacks
│   ├── customresources
│   ├── nodegroupconfig
│   ├── nodegroups
│   ├── nodegroupset
│   ├── nodeinfos
│   ├── nodes
│   ├── pods
│   ├── processors.go
│   └── status
├── proposals													# 提案,设计文档信息
│   ├── balance_similar.md
│   ├── circumvent-tag-limit-aws.md
│   ├── clusterstate.md
│   ├── images
│   ├── kubemark_integration.md
│   ├── metrics.md
│   ├── min_at_zero_gcp.md
│   ├── node_autoprovisioning.md
│   ├── plugable-provider-grpc.md
│   ├── pricing.md
├── simulator														# 模拟调度模块,主要用于缩容
│   ├── basic_cluster_snapshot.go
│   ├── cluster.go
│   ├── cluster_snapshot.go
│   ├── delegating_shared_lister.go
│   ├── delta_cluster_snapshot.go
│   ├── drain.go
│   ├── nodes.go
│   ├── nodes_test.go
│   ├── predicate_error.go
│   ├── predicates_checker_interface.go
│   ├── scheduler_based_predicates_checker.go
│   ├── test_predicates_checker.go
│   ├── test_utils.go
│   ├── tracker.go

二、CloudProvider 模块

2.1 CloudProvider 接口

包含配置信息、与云厂商交互的方法。核心方法有:

  • Name():提供唯一的名称
  • Refresh():刷新云厂商资源信息
  • NodeGroups():获取所有的节点组
  • NodeGroupForNode(...):获取某个节点所属的节点组
type CloudProvider interface {
  // cloud provider 名称
	Name() string
	// 返回 cloud privider 配置的所有 node group
	NodeGroups() []NodeGroup
	// 返回给定 Node 节点所属的 NodeGroup
  // 如果节点不应该被 autoscaler 处理,应该返回 nil
	NodeGroupForNode(*apiv1.Node) (NodeGroup, error)
  // 可选实现,价格模型
	Pricing() (PricingModel, errors.AutoscalerError)
  // 可选实现,获取 cloud provider 支持的所有机器型号
	GetAvailableMachineTypes() ([]string, error)
  // 基于定义的 node,构建理论的 node group
  // 阻塞方法,直到 node group 创建出来
  // 可选实现
	NewNodeGroup(machineType string, labels map[string]string, systemLabels map[string]string,
		taints []apiv1.Taint, extraResources map[string]resource.Quantity) (NodeGroup, error)
	// 返回结构化资源限额
  GetResourceLimiter() (*ResourceLimiter, error)
	// 返回添加到 Node 上的 GPU 资源标签
	GPULabel() string
	// 返回所有可用的 GPU 类型
	GetAvailableGPUTypes() map[string]struct{}
	// 清理工作,比如:go 协程
	Cleanup() error
	// 在每次主循环之前调用,并且用于动态更新 cloud provider 状态
  // 尤其是由 NodeGroups 改变,导致返回一个 node group 列表
	Refresh() error
}

2.2 NodeGroup 节点组

NodeGroup提供相关接口,操作具有相同容量和标签的一组节点。核心方法有:

  • MaxSize():节点组允许的最大扩容数量
  • MinSize():节点组允许的最小缩容数量
  • TargetSize():节点组当前数量
  • IncreaseSize(delta int):新增 delta 个节点的方法
  • DecreaseTargetSize(delta int):减少节点的方法
  • DeleteNodes(...):删除实例的方法
  • TemplateNodeInfo():

包含配置信息和方法控制

type NodeGroup interface {
  // 返回 node group 的最大数量
	MaxSize() int
	// 返回 node group 的最小数量
	MinSize() int
  // 必须实现该方法。
  // 返回当前目标数量,必须实现该方法
  // 有可能 k8s 节点数量和这个值不相等,但是一旦一切稳定(node完成启动和注册、节点彻底删除)就应该等于 Size()
	TargetSize() (int, error)
  // 必须实现该方法。
  // 增加 node group 的数量,为了删除节点你需要显示指定名称并调用 DeleteNode 方法
  // 该方法会阻塞知道 node group 数量更新
	IncreaseSize(delta int) error
  // 必须实现该方法。
  // 从 node group 中删除节点。
  // 如果失败或者 node 不属于 node group 将会报错。
  // 该方法会阻塞知道 node group 数量更新
	DeleteNodes([]*apiv1.Node) error
  // 从 Node group 中减少目标数量
  // 该方法不允许删除任何节点,仅仅用于减少没有完全填充的新节点
  // 参数 delta 必须是负数,假定 cloud provider 在调整目标数量时,将不会删除存在的节点
	DecreaseTargetSize(delta int) error
	// 返回 node group 唯一标识
	Id() string
	// 返回调试信息
	Debug() string
  // 返回所有属于 node group 的节点列表
  // Instance 对象必须包含 id 字段,其他字段值可选
  // 列表也包含不会成为 k8s 的那些节点
	Nodes() ([]Instance, error)

	// 可选实现
  // 返回包含空 node 新的的调度结构体,将被用于扩容仿真,以预测一个新的扩容节点是什么样的
  // 返回的 NodeInfo 信息包含完整的 Node 对象信N,包括:标签、容量、能分配的 pod 信息(类似kube-proxy)
	TemplateNodeInfo() (*schedulerframework.NodeInfo, error)
  // 必须实现。返回 node group 是否存在
	Exist() bool
	// 可选实现。创建 node group
	Create() (NodeGroup, error)
  // 可选实现,删除 node group
	Delete() error
  // 是否支持自动供应
	Autoprovisioned() bool
  // 可选实现,返回配置参数
	GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error)
}

2.3 CloudProvider 实现的厂商

大部分云厂商都实现了该接口,参考

  • 国外的:亚马逊AWS、谷歌GCE、微软Azure
  • 国内的:阿里云、华为云、腾讯云、百度云
  • 其他:......

2.4 AWS 接口实现

以 aws 为例分析实现实现逻辑,代码结构如下

.
├── auto_scaling.go
├── auto_scaling_groups.go	 # 获取 asg 信息,保存在 asgCache 缓存中
├── aws_cloud_provider.go		 # awsCloudProvider 实现 CloudProvider 接口里的方法
├── aws_manager.go					 # 根据账号密码,构建 awsManager 对象来操作 aws 资源
├── aws_util.go							 # 获取机型、可用区等信息
├── ec2.go
├── ec2_instance_types			
│   └── gen.go
├── ec2_instance_types.go		 # 默认机型列表
└── examples
    ├── cluster-autoscaler-autodiscover.yaml	# 自动发现模式部署 ca
    ├── cluster-autoscaler-multi-asg.yaml			# 多 asg 模式部署 ca
    ├── cluster-autoscaler-one-asg.yaml				# 单 asg 模式部署 ca
    └── cluster-autoscaler-run-on-control-plane.yaml
2.4.1 Name

返回 provider 的名称 aws

// AwsProviderName = "aws"
func (aws *awsCloudProvider) Name() string {
   return cloudprovider.AwsProviderName
}
2.4.2 Refresh

调用链:CloudProvider -> AwsManager -> asgCache

refresh 的功能是获取 aws 中的 asg,以及模板、实例等信息保存到缓存中

func (aws *awsCloudProvider) Refresh() error {
	// 调用 awsManager 的 Refresh 方法
  return aws.awsManager.Refresh()
}

func (m *AwsManager) Refresh() error {
	...
  // 调用 forceRefresh
	return m.forceRefresh()
}

func (m *AwsManager) forceRefresh() error {
  // 调用 asgCache 的 regenerate
	if err := m.asgCache.regenerate(); err != nil {
		...
	}
	...
	return nil
}

func (m *asgCache) regenerate() error {
	...
	newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
	newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)

	// Build list of knowns ASG names
  // 获取所有的 asg name
	refreshNames, err := m.buildAsgNames()

  // 根据 asg name 获取 asg 对象
  // 调用 aws-sdk-go 中 AutoScaling 的 DescribeAutoScalingGroupsPages
	groups, err := m.service.getAutoscalingGroupsByNames(refreshNames)
	// 填充 asg 启动配置的实例模板
	err = m.service.populateLaunchConfigurationInstanceTypeCache(groups)
	if err != nil {
		klog.Warningf("Failed to fully populate all launchConfigurations: %v", err)
	}

	// If currently any ASG has more Desired than running Instances, introduce placeholders
	// for the instances to come up. This is required to track Desired instances that
	// will never come up, like with Spot Request that can't be fulfilled
	groups = m.createPlaceholdersForDesiredNonStartedInstances(groups)

	// Register or update ASGs
	exists := make(map[AwsRef]bool)
	for _, group := range groups {
		asg, err := m.buildAsgFromAWS(group)
		if err != nil {
			return err
		}
		exists[asg.AwsRef] = true
		// 注册 asg
		asg = m.register(asg)

		newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))

    // 将 group 中所有的实例信息保存到缓存中
		for i, instance := range group.Instances {
      // 根据 group 的 instance 信息构造 instance
			ref := m.buildInstanceRefFromAWS(instance)
			newInstanceToAsgCache[ref] = asg
			newAsgToInstancesCache[asg.AwsRef][i] = ref
		}
	}

  // 注销不存在的 asg
	for _, asg := range m.registeredAsgs {
		if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
			m.unregister(asg)
		}
	}

  // 生成 asg -> instance 的缓存
	m.asgToInstances = newAsgToInstancesCache
  // 生成 instance -> asg 的缓存
	m.instanceToAsg = newInstanceToAsgCache
	return nil
}
2.4.3 NodeGroups

调用 awsManager 获取所有的 asg

func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
  // 调用 awsManager 获取所有的 asg 
  asgs := aws.awsManager.getAsgs()
   ngs := make([]cloudprovider.NodeGroup, len(asgs))
   for i, asg := range asgs {
      ngs[i] = &AwsNodeGroup{
         asg:        asg,
         awsManager: aws.awsManager,
      }
   }

   return ngs
}
2.4.4 NodeGroupForNode
  • 从 Node.Spec.ProviderID 中获取 id
  • 调用 awsManager 获取 id 对应的 asg
func (aws *awsCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) {
   if len(node.Spec.ProviderID) == 0 {
      klog.Warningf("Node %v has no providerId", node.Name)
      return nil, nil
   }
   // 从 Node.Spec.ProviderID 中获取  id
   ref, err := AwsRefFromProviderId(node.Spec.ProviderID)
   if err != nil {
      return nil, err
   }
   // 获取 asg
   asg := aws.awsManager.GetAsgForInstance(*ref)

   if asg == nil {
      return nil, nil
   }

   return &AwsNodeGroup{
      asg:        asg,
      awsManager: aws.awsManager,
   }, nil
}
2.4.5 IncreaseSize

调用链:CloudProvider -> AwsManager -> asgCache -> aws-sdk-go

通过传入操作 aws asg 的参数,调用 aws-sdk-go 的 asg 接口,实现新增节点的效果

func (ng *AwsNodeGroup) IncreaseSize(delta int) error {
  // 增量不能小于 0
	if delta <= 0 {
		return fmt.Errorf("size increase must be positive")
	}
	size := ng.asg.curSize
  // 增加后不能超过最大值
	if size+delta > ng.asg.maxSize {
		return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.asg.maxSize)
	}
  // 调用 awsManager 设置 size 为当前 size + delta
	return ng.awsManager.SetAsgSize(ng.asg, size+delta)
}

// SetAsgSize sets ASG size.
func (m *AwsManager) SetAsgSize(asg *asg, size int) error {
	return m.asgCache.SetAsgSize(asg, size)
}

// 加锁操作
func (m *asgCache) SetAsgSize(asg *asg, size int) error {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	return m.setAsgSizeNoLock(asg, size)
}

func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
  // 拼接参数:name、size、honorCooldown
	params := &autoscaling.SetDesiredCapacityInput{
		AutoScalingGroupName: aws.String(asg.Name),
		DesiredCapacity:      aws.Int64(int64(size)),
		HonorCooldown:        aws.Bool(false),
	}
	klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
  // 调用 aws-sdk-go 操作 ASG 的 AutoScaling 接口完成操作
	_, err := m.service.SetDesiredCapacity(params)
	if err != nil {
		return err
	}

	// Proactively set the ASG size so autoscaler makes better decisions
	asg.curSize = size

	return nil
}
2.4.6 DecreaseTargetSize

执行代码同 IncreaseSize,不同的仅仅是参数 delta是负数。

func (ng *AwsNodeGroup) DecreaseTargetSize(delta int) error {
	// 增量必须为负数
  if delta >= 0 {
		return fmt.Errorf("size decrease size must be negative")
	}

	size := ng.asg.curSize
  // 查询目前 ASG 的节点数
	nodes, err := ng.awsManager.GetAsgNodes(ng.asg.AwsRef)
	if err != nil {
		return err
	}
  // 删除的数量不能超过已有数量
	if int(size)+delta < len(nodes) {
		return fmt.Errorf("attempt to delete existing nodes targetSize:%d delta:%d existingNodes: %d",
			size, delta, len(nodes))
	}
  // 方法同 IncreaseSize 中一样
	return ng.awsManager.SetAsgSize(ng.asg, size+delta)
}
2.4.7 DeleteNodes

调用链:CloudProvider -> AwsManager -> aws-sdk-go

通过 Node.Spec.ProviderID 拿到实例唯一 id,调用 SDK 时传入 id,执行删除操作

func (ng *AwsNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
	size := ng.asg.curSize
	if int(size) <= ng.MinSize() {
		return fmt.Errorf("min size reached, nodes will not be deleted")
	}
	refs := make([]*AwsInstanceRef, 0, len(nodes))
	for _, node := range nodes {
    // 判断待删除 Node 是否是当前 ASG
		belongs, err := ng.Belongs(node)
		if err != nil {
			return err
		}
		if belongs != true {
			return fmt.Errorf("%s belongs to a different asg than %s", node.Name, ng.Id())
		}
    // 获取 Node 的 aws 唯一凭证信息
    // providerID 保存在 node.Spec.ProviderID 字段中
		awsref, err := AwsRefFromProviderId(node.Spec.ProviderID)
		if err != nil {
			return err
		}
		refs = append(refs, awsref)
	}
  // 调用 AwsManager 的删除实例方法
	return ng.awsManager.DeleteInstances(refs)
}

// providerID 的格式是:aws:///zone/name
func AwsRefFromProviderId(id string) (*AwsInstanceRef, error) {
	if validAwsRefIdRegex.FindStringSubmatch(id) == nil {
		return nil, fmt.Errorf("wrong id: expected format aws:///<zone>/<name>, got %v", id)
	}
	splitted := strings.Split(id[7:], "/")
	return &AwsInstanceRef{
		ProviderID: id,
		Name:       splitted[1],
	}, nil
}
2.4.8 TemplateNodeInfo
  • getAsgTemplate:获取 template
func (ng *AwsNodeGroup) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) {
	// 获取 asg 的模板信息
  template, err := ng.awsManager.getAsgTemplate(ng.asg)
	
	// 通过模板构造 Node 对象
	node, err := ng.awsManager.buildNodeFromTemplate(ng.asg, template)
	// 通过调度框架接口构造调度对象,用于 CA 模拟调度
	nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(ng.asg.Name))
	nodeInfo.SetNode(node)
	return nodeInfo, nil
}

2.5 AwsManager 实现

通过前面的分析发现,aws接口实现中和底层IaaS操作的很多逻辑都封装到了 AwsManager 中,这里专门针对 AwsManager做分析。

2.5.1 getAsgTemplate

获取 asg 中第一个可用区的模板信息

func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) {
	// 判断是否有可用区信息
  if len(asg.AvailabilityZones) < 1 {
		return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name)
	}

  // asg可配置多个az信息, 默认选择 asg 中第一个可用 az
	az := asg.AvailabilityZones[0]
	region := az[0 : len(az)-1]

	if len(asg.AvailabilityZones) > 1 {
		klog.V(4).Infof("Found multiple availability zones for ASG %q; using %s for %s label\n", asg.Name, az, apiv1.LabelFailureDomainBetaZone)
	}
	// 获取可用机型,通过调用 aws-sdk-go 获取
	instanceTypeName, err := m.buildInstanceType(asg)
	
	if t, ok := m.instanceTypes[instanceTypeName]; ok {
		return &asgTemplate{
			InstanceType: t,
			Region:       region,
			Zone:         az,
			Tags:         asg.Tags,
		}, nil
	}
	return nil, fmt.Errorf("ASG %q uses the unknown EC2 instance type %q", asg.Name, instanceTypeName)
}
2.5.2 buildNodeFromTemplate

根据 template 构造 node 对象,预调度就是通过虚拟出来的 Node 对象,传给调度框架来实现。

Node 数据的来源包括:

  • asg 模板:提供node 的 cpu、memory、机型、az等信息
  • asg tag:提供node 的 taint、label 信息
func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*apiv1.Node, error) {
	node := apiv1.Node{}
  // 生成随机字符串,拼接上 {asgName}-asg-{rand.Int63} 作为主机名
	nodeName := fmt.Sprintf("%s-asg-%d", asg.Name, rand.Int63())

	node.ObjectMeta = metav1.ObjectMeta{
		Name:     nodeName,
		SelfLink: fmt.Sprintf("/api/v1/nodes/%s", nodeName),
		Labels:   map[string]string{},
	}
	/
	node.Status = apiv1.NodeStatus{
		Capacity: apiv1.ResourceList{},
	}

  // 将 asg 返回的机器规格信息填充到 node.Status 中,便于后续调度
	node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(110, resource.DecimalSI)
  // 构造 node 的 cpu 信息
	node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(template.InstanceType.VCPU, resource.DecimalSI)
  // 构造 node 的 gpu 信息
	node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(template.InstanceType.GPU, resource.DecimalSI)
  // 构造 node 的 memory 信息
  // asg 的实例类型的 MemroyMb * 1024 * 1024 作为 node 的 memory
	node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(template.InstanceType.MemoryMb*1024*1024, resource.DecimalSI)

	resourcesFromTags := extractAllocatableResourcesFromAsg(template.Tags)
	for resourceName, val := range resourcesFromTags {
		node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val
	}

	// TODO: use proper allocatable!!
	node.Status.Allocatable = node.Status.Capacity

	// 生成 node 的 generic 信息,填充到 label
  // - "kubernetes.io/arch":asg instance 的机型
  // - "kubernetes.io/os":linux
	// - "node.kubernetes.io/instance-type"
	// - "topology.kubernetes.io/region"
  // - "topology.kubernetes.io/zone"
	// - "kubernetes.io/hostname"
	node.Labels = cloudprovider.JoinStringMaps(node.Labels, buildGenericLabels(template, nodeName))

  // 填充 node.Label 信息
	// 读取所有的 k8s.io/cluster-autoscaler/node-template/label/ 前缀的 tags
	node.Labels = cloudprovider.JoinStringMaps(node.Labels, extractLabelsFromAsg(template.Tags))
	// 填充 node.Spec.Taints 信息
  // 读取所有的 k8s.io/cluster-autoscaler/node-template/taint/ 前缀
  // 且满足正则 "(.*):(?:NoSchedule|NoExecute|PreferNoSchedule)" 的 tags
	node.Spec.Taints = extractTaintsFromAsg(template.Tags)
	// 填充 node.Status.Conditions
	node.Status.Conditions = cloudprovider.BuildReadyConditions()
	return &node, nil
}

2.6 asgCache

asgCache用于缓存 aws 中所有的 asg 信息

// asgCache 保存 aws 当前所有的 asg 缓存信息
type asgCache struct {
  // 所有的 asg 列表
	registeredAsgs []*asg
  // asg -> instance 的映射
	asgToInstances map[AwsRef][]AwsInstanceRef
  // instance -> asg 的映射
	instanceToAsg  map[AwsInstanceRef]*asg
	mutex          sync.Mutex
	service        autoScalingWrapper
	interrupt      chan struct{}

	asgAutoDiscoverySpecs []asgAutoDiscoveryConfig
	explicitlyConfigured  map[AwsRef]bool
}

// asg 对应 aws 的 AutoScalingGroup
type asg struct {
	AwsRef

	minSize int
	maxSize int
	curSize int

	AvailabilityZones       []string
	LaunchConfigurationName string
	LaunchTemplate          *launchTemplate
	MixedInstancesPolicy    *mixedInstancesPolicy
	Tags                    []*autoscaling.TagDescription
}
2.6.1 regenerate

CA 配置自动发现 asg 机制后,该方法会查找所有打了响应标签的 asg,并将asg的基本信息、实例信息同步到本地内存

func (m *asgCache) regenerate() error {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
	newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)

	// 通过 CA 启动参数中配置的标签,查找符合条件的所有 asg
	refreshNames, err := m.buildAsgNames()
	
  // 根据 asg 名称,查找完整的 asg 详细信息
	groups, err := m.service.getAutoscalingGroupsByNames(refreshNames)
	
 
	for _, group := range groups {
		asg, err := m.buildAsgFromAWS(group)
		if err != nil {
			return err
		}
		exists[asg.AwsRef] = true
		// 注册 asg 到缓存
		asg = m.register(asg)

		newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))
		// 建立映射关系
		for i, instance := range group.Instances {
			ref := m.buildInstanceRefFromAWS(instance)
			newInstanceToAsgCache[ref] = asg
			newAsgToInstancesCache[asg.AwsRef][i] = ref
		}
	}

	// Unregister no longer existing auto-discovered ASGs
	for _, asg := range m.registeredAsgs {
		if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
			m.unregister(asg)
		}
	}

	m.asgToInstances = newAsgToInstancesCache
	m.instanceToAsg = newInstanceToAsgCache
	return nil
}

2.7 Aws provider 创建的流程

调用链路: NewCloudProvider -> buildCloudProvider -> BuildAWS -> CreateAwsManager -> BuildAwsCloudProvider

  • AWS账号初始化
    • 读取配置文件
    • 根据配置文件创建 AWSSDKProvider
    • 创建 Session,创建 AwsService
  • 构造 asgCache
    • 解析自动发现 asg 的tag等入参信息
    • 自动同步符合 tag 的 aws asg 到本地 asgCache
  • 初始化 awsManager
func initializeDefaultOptions(opts *AutoscalerOptions) error {
  ...
  if opts.CloudProvider == nil {
    // 创建一个 CloudProvider
		opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
	}
  ...
}

func NewCloudProvider(opts config.AutoscalingOptions) cloudprovider.CloudProvider {
	...
  // 根据 options参数,构建 provider
	provider := buildCloudProvider(opts, do, rl)
	if provider != nil {
		return provider
	}
}

func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
	switch opts.CloudProviderName {
	...
  // aws 的 provider
	case cloudprovider.AwsProviderName:
		return aws.BuildAWS(opts, do, rl)
  ...
 }
}

func BuildAWS(...) {
  ...
  // 初始化 awsManager
  manager, err := CreateAwsManager(config, do, instanceTypes)
	// 初始化 provider
	provider, err := BuildAwsCloudProvider(manager, rl)
	
	return provider
}

func CreateAwsManager(...) (*AwsManager, error) {
	return createAWSManagerInternal(configReader, discoveryOpts, nil, nil, instanceTypes)
}

func createAWSManagerInternal(
	configReader io.Reader,
	discoveryOpts cloudprovider.NodeGroupDiscoveryOptions,
	autoScalingService *autoScalingWrapper,
	ec2Service *ec2Wrapper,
	instanceTypes map[string]*InstanceType,
) (*AwsManager, error) {
	// 读取配置文件
	cfg, err := readAWSCloudConfig(configReader)
	...
  // 解析自动发现 asg 的入参信息
	specs, err := parseASGAutoDiscoverySpecs(discoveryOpts)
	...
  // 初始化 asgCache
	cache, err := newASGCache(*autoScalingService, discoveryOpts.NodeGroupSpecs, specs)

  // 初始化 awsManager
	manager := &AwsManager{
		autoScalingService: *autoScalingService,
		ec2Service:         *ec2Service,
		asgCache:           cache,
		instanceTypes:      instanceTypes,
	}
	// 执行刷新操作,将 aws 的 asg 信息同步到本地 asgCache
	if err := manager.forceRefresh(); err != nil {
		return nil, err
	}

	return manager, nil
}
  
 
2.7.1 BuildAWS
func BuildAWS(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
	// 读取参数中配置相关的 CloudConfig 文件
  var config io.ReadCloser
	if opts.CloudConfig != "" {
		var err error
		config, err = os.Open(opts.CloudConfig)
		if err != nil {
			klog.Fatalf("Couldn't open cloud provider configuration %s: %#v", opts.CloudConfig, err)
		}
		defer config.Close()
	}

	// 获取机型
	instanceTypes, lastUpdateTime := GetStaticEC2InstanceTypes()
  // 获取静态机型,可能会过时
	if opts.AWSUseStaticInstanceList {
		klog.Warningf("Using static EC2 Instance Types, this list could be outdated. Last update time: %s", lastUpdateTime)
	} else {
    // 实时当前可用区
    // 先读取环境变量:AWS_REGION,找不到再调接口获取
		region, err := GetCurrentAwsRegion()
		...
    // 获取机型
		generatedInstanceTypes, err := GenerateEC2InstanceTypes(region)
		...
	}
	// 创建 AwsManager
  // AwsManager 用于操作 aws 资源
	manager, err := CreateAwsManager(config, do, instanceTypes)
	...
  // 创建 provider
	provider, err := BuildAwsCloudProvider(manager, rl)
	...
  // 注册指标
	RegisterMetrics()
	return provider
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
    • 1.1 CA架构回顾
      • 1.2 仓库代码结构
        • 1.3 CA 代码结构
        • 二、CloudProvider 模块
          • 2.1 CloudProvider 接口
            • 2.2 NodeGroup 节点组
              • 2.3 CloudProvider 实现的厂商
                • 2.4 AWS 接口实现
                  • 2.4.1 Name
                  • 2.4.2 Refresh
                  • 2.4.3 NodeGroups
                  • 2.4.4 NodeGroupForNode
                  • 2.4.5 IncreaseSize
                  • 2.4.6 DecreaseTargetSize
                  • 2.4.7 DeleteNodes
                  • 2.4.8 TemplateNodeInfo
                • 2.5 AwsManager 实现
                  • 2.5.1 getAsgTemplate
                  • 2.5.2 buildNodeFromTemplate
                • 2.6 asgCache
                  • 2.6.1 regenerate
                • 2.7 Aws provider 创建的流程
                  • 2.7.1 BuildAWS
              相关产品与服务
              容器服务
              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档