前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【K8s源码品读】012:Phase 1 - kube-controller-manager - 了解控制管理中心

【K8s源码品读】012:Phase 1 - kube-controller-manager - 了解控制管理中心

作者头像
junedayday
发布2021-08-05 13:07:10
3080
发布2021-08-05 13:07:10
举报
文章被收录于专栏:Go编程点滴Go编程点滴

聚焦目标

理解 kube-controller-manager 的运行机制

目录

  1. 运行的主函数
  2. 控制器的启动函数
  3. 引入概念ReplicaSet
  4. 查看ReplicaSetController
  5. ReplicaSet的核心实现函数
  6. 总结

Run

我们找到了对应的主函数,看看其中的内容

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
 // configz 模块,在kube-scheduler分析中已经了解
 if cfgz, err := configz.New(ConfigzName); err == nil {
  cfgz.Set(c.ComponentConfig)
 } else {
  klog.Errorf("unable to register configz: %v", err)
 }

 // 健康监测与http服务,跳过
 var checks []healthz.HealthChecker
 var unsecuredMux *mux.PathRecorderMux

 run := func(ctx context.Context) {
  rootClientBuilder := controller.SimpleControllerClientBuilder{
   ClientConfig: c.Kubeconfig,
  }
   
    // client认证相关
  var clientBuilder controller.ControllerClientBuilder
  
    // 创建controller的上下文context
  controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
  if err != nil {
   klog.Fatalf("error building controller context: %v", err)
  }
  saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

  if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
   klog.Fatalf("error starting controllers: %v", err)
  }

    // 这里的 InformerFactory 和我们在kube-scheduler中看的 SharedInformerFactory 基本一致
  controllerContext.InformerFactory.Start(controllerContext.Stop)
  controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
  close(controllerContext.InformersStarted)

  select {}
 }

  // 是否进行选举
 if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
  run(context.TODO())
  panic("unreachable")
 }

  // 拼接出一个全局唯一的id
 id, err := os.Hostname()
 if err != nil {
  return err
 }
 id = id + "_" + string(uuid.NewUUID())

 rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
  c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
  c.ComponentConfig.Generic.LeaderElection.ResourceName,
  c.LeaderElectionClient.CoreV1(),
  c.LeaderElectionClient.CoordinationV1(),
  resourcelock.ResourceLockConfig{
   Identity:      id,
   EventRecorder: c.EventRecorder,
  })
 if err != nil {
  klog.Fatalf("error creating lock: %v", err)
 }

  // 正常情况下都是阻塞在RunOrDie这个函数中,不停地进行选举相关的工作
 leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
  Lock:          rl,
  LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
  RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
  RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
  Callbacks: leaderelection.LeaderCallbacks{
      // 开始成为Leader的时候,调用run函数
   OnStartedLeading: run,
   OnStoppedLeading: func() {
    klog.Fatalf("leaderelection lost")
   },
  },
  WatchDog: electionChecker,
  Name:     "kube-controller-manager",
 })
 panic("unreachable")
}

StartControllers

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
 // 关键性的循环,启动每个controllers,key为控制器名字,value为初始化函数
 for controllerName, initFn := range controllers {
    // 是否允许启动
  if !ctx.IsControllerEnabled(controllerName) {
   klog.Warningf("%q is disabled", controllerName)
   continue
  }
  time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
  klog.V(1).Infof("Starting %q", controllerName)
    // 调用init函数进行启动
  debugHandler, started, err := initFn(ctx)
  if err != nil {
   klog.Errorf("Error starting %q", controllerName)
   return err
  }
  if !started {
   klog.Warningf("Skipping %q", controllerName)
   continue
  }
    // 注册对应controller到debug的url中
  if debugHandler != nil && unsecuredMux != nil {
   basePath := "/debug/controllers/" + controllerName
   unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
   unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
  }
  klog.Infof("Started %q", controllerName)
 }

 return nil
}

// 我们再去传入controller的函数去看看,对应的controller有哪些,这里有我们很多常见的概念,今天不一一细讲
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
 controllers := map[string]InitFunc{}
 controllers["endpoint"] = startEndpointController
 controllers["endpointslice"] = startEndpointSliceController
 controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
 controllers["replicationcontroller"] = startReplicationController
 controllers["podgc"] = startPodGCController
 controllers["resourcequota"] = startResourceQuotaController
 controllers["namespace"] = startNamespaceController
 controllers["serviceaccount"] = startServiceAccountController
 controllers["garbagecollector"] = startGarbageCollectorController
 controllers["daemonset"] = startDaemonSetController
 controllers["job"] = startJobController
 controllers["deployment"] = startDeploymentController
 controllers["replicaset"] = startReplicaSetController
 controllers["horizontalpodautoscaling"] = startHPAController
 controllers["disruption"] = startDisruptionController
 controllers["statefulset"] = startStatefulSetController
 controllers["cronjob"] = startCronJobController
 controllers["csrsigning"] = startCSRSigningController
 controllers["csrapproving"] = startCSRApprovingController
 controllers["csrcleaner"] = startCSRCleanerController
 controllers["ttl"] = startTTLController
 controllers["bootstrapsigner"] = startBootstrapSignerController
 controllers["tokencleaner"] = startTokenCleanerController
 controllers["nodeipam"] = startNodeIpamController
 controllers["nodelifecycle"] = startNodeLifecycleController
 if loopMode == IncludeCloudLoops {
  controllers["service"] = startServiceController
  controllers["route"] = startRouteController
  controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
 }
 controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
 controllers["attachdetach"] = startAttachDetachController
 controllers["persistentvolume-expander"] = startVolumeExpandController
 controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
 controllers["pvc-protection"] = startPVCProtectionController
 controllers["pv-protection"] = startPVProtectionController
 controllers["ttl-after-finished"] = startTTLAfterFinishedController
 controllers["root-ca-cert-publisher"] = startRootCACertPublisher
 controllers["ephemeral-volume"] = startEphemeralVolumeController

 return controllers
}

ReplicaSet

由于我们的示例是创建一个nginx的pod,涉及到kube-controller-manager的内容很少。

但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明:

A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。

简单来说,ReplicaSet 就是用来生成指定个数的Pod。

ReplicaSetController

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
 if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
  return nil, false, nil
 }
  
  // 用goroutine异步运行,包含了 ReplicaSet和Pod 的两个Informer
  // 这一点很好理解:我们是要控制ReplicaSet声明的数量和运行的Pod数量一致,需要同时观察者两种资源
 go replicaset.NewReplicaSetController(
  ctx.InformerFactory.Apps().V1().ReplicaSets(),
  ctx.InformerFactory.Core().V1().Pods(),
  ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
  replicaset.BurstReplicas,
 ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
 return nil, true, nil
}

// 运行函数
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
 defer utilruntime.HandleCrash()
 defer rsc.queue.ShutDown()

 controllerName := strings.ToLower(rsc.Kind)
 klog.Infof("Starting %v controller", controllerName)
 defer klog.Infof("Shutting down %v controller", controllerName)

 if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
  return
 }

 for i := 0; i < workers; i++ {
    // 工作的函数
  go wait.Until(rsc.worker, time.Second, stopCh)
 }

 <-stopCh
}

func (rsc *ReplicaSetController) worker() {
  // 继续查找实现
 for rsc.processNextWorkItem() {
 }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
  // 这里也有个queue的概念,可以类比kube-scheduler中的实现
  // 不同的是,这里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具体实现今天不细看
  
  // 获取元素
 key, quit := rsc.queue.Get()
 if quit {
  return false
 }
 defer rsc.queue.Done(key)

  // 处理对应的元素
 err := rsc.syncHandler(key.(string))
 if err == nil {
  rsc.queue.Forget(key)
  return true
 }

 utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
 rsc.queue.AddRateLimited(key)

 return true
}

// 再回过头,去查看syncHandler的具体实现
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
 gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
 
 rsc.syncHandler = rsc.syncReplicaSet

 return rsc
}

syncReplicaSet

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
 startTime := time.Now()
 defer func() {
  klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
 }()
  
 // 从key中拆分出 namespace 和 name
 namespace, name, err := cache.SplitMetaNamespaceKey(key)
 if err != nil {
  return err
 }
  
  // 根据name,从 Lister 获取对应的 ReplicaSets 信息
 rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
 if errors.IsNotFound(err) {
  klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
  rsc.expectations.DeleteExpectations(key)
  return nil
 }
 if err != nil {
  return err
 }

 rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  // 获取 selector (k8s 是根据selector中的label来匹配 ReplicaSets 和 Pod 的)
 selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
 if err != nil {
  utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
  return nil
 }

 // 根据namespace和labels获取所有的pod
 allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
 if err != nil {
  return err
 }
 
  // 过滤无效的pod
 filteredPods := controller.FilterActivePods(allPods)

 // 根据selector再过滤pod
 filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
 if err != nil {
  return err
 }

 var manageReplicasErr error
 if rsNeedsSync && rs.DeletionTimestamp == nil {
    // 管理 ReplicaSet,下面详细分析
  manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
 }
 rs = rs.DeepCopy()
 newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

 // 更新状态
 updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
 if err != nil {
  return err
 }
 if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
  updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
  updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
  rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
 }
 return manageReplicasErr
}

// 我们再一起看看,当Pod数量和ReplicaSet中声明的不同时,是怎么工作的
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
 // diff = 当前pod数 - 期望pod数
  diff := len(filteredPods) - int(*(rs.Spec.Replicas))
 rsKey, err := controller.KeyFunc(rs)
 if err != nil {
  utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
  return nil
 }
  
  // diff小于0,表示需要扩容,即新增Pod
 if diff < 0 {
  
    // 具体的实现暂时不细看
    
  // diff 大于0,即需要缩容
 } else if diff > 0 {
  
 }

 return nil
}

Summary

kube-controller-manager 的核心思想是:根据期望状态当前状态,管理Kubernetes中的资源。

以ReplicaSet为例,它对比了定义声明的Pod数当前集群中满足条件的Pod数,进行相对应的扩缩容。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Go编程点滴 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 聚焦目标
    • 目录
      • Run
        • StartControllers
          • ReplicaSet
            • ReplicaSetController
              • syncReplicaSet
                • Summary
                相关产品与服务
                容器服务
                腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档