前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【K8s源码品读】013:Phase 1 - kubelet - 节点上控制容器生命周期的管理者

【K8s源码品读】013:Phase 1 - kubelet - 节点上控制容器生命周期的管理者

作者头像
junedayday
发布2021-08-05 14:43:56
5270
发布2021-08-05 14:43:56
举报
文章被收录于专栏:Go编程点滴

聚焦目标

理解 kubelet 的运行机制

目录

  1. 运行的主函数
  2. 运行kubelet
  3. 核心数据管理Kubelet
  4. 同步循环
  5. 处理pod的同步工作
  6. 总结

Run

从主函数找到run函数,代码较长,我精简了一下

代码语言:javascript
复制
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
 // 一长串的配置初始化与验证

  // done channel,用来通知运行结束
 done := make(chan struct{})
 
 // 注册到configz模块
 err = initConfigz(&s.KubeletConfiguration)
 if err != nil {
  klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
 }

  // 获取节点的相关信息
 hostName, err := nodeutil.GetHostname(s.HostnameOverride)
 if err != nil {
  return err
 }
 nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
 if err != nil {
  return err
 }

 switch {
  // 独立运行模式
 case standaloneMode:
 // 对客户端进行初始化
 case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
 }

  // cgroup 相关初始化
 var cgroupRoots []string
 nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
 cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
 kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
 if err != nil {
  klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)
 } else if kubeletCgroup != "" {
  cgroupRoots = append(cgroupRoots, kubeletCgroup)
 }

 runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
 if err != nil {
  klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
 } else if runtimeCgroup != "" {
  cgroupRoots = append(cgroupRoots, runtimeCgroup)
 }

 if s.SystemCgroups != "" {
  cgroupRoots = append(cgroupRoots, s.SystemCgroups)
 }

  // 下面一大块都是对 ContainerManager 的初始化
 if kubeDeps.ContainerManager == nil {
  if s.CgroupsPerQOS && s.CgroupRoot == "" {
   klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
   s.CgroupRoot = "/"
  }
    
  // cpu相关信息
  var reservedSystemCPUs cpuset.CPUSet

    // ContainerManager的实例化
  kubeDeps.ContainerManager, err = cm.NewContainerManager(
   kubeDeps.Mounter,
   kubeDeps.CAdvisorInterface,
      // Node 相关配置
   cm.NodeConfig{},
   s.FailSwapOn,
   devicePluginEnabled,
   kubeDeps.Recorder)

  if err != nil {
   return err
  }
 }

 // 内存OOM相关
 oomAdjuster := kubeDeps.OOMAdjuster
 if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
  klog.Warning(err)
 }

 // 预初始化Runtime
 err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
  kubeDeps, &s.ContainerRuntimeOptions,
  s.ContainerRuntime,
  s.RuntimeCgroups,
  s.RemoteRuntimeEndpoint,
  s.RemoteImageEndpoint,
  s.NonMasqueradeCIDR)
 if err != nil {
  return err
 }

  // 运行Kubelet
 if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
  return err
 }

 // 通知deamon的systemd
 go daemon.SdNotify(false, "READY=1")

  // 阻塞
 select {
 case <-done:
  break
 case <-ctx.Done():
  break
 }

 return nil
}

RunKubelet

代码语言:javascript
复制
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
 // 获取节点信息
  hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
 if err != nil {
  return err
 }
 nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
 if err != nil {
  return err
 }
 hostnameOverridden := len(kubeServer.HostnameOverride) > 0

  // 创建并初始化 kubelet
 k, err := createAndInitKubelet()
 if err != nil {
  return fmt.Errorf("failed to create kubelet: %v", err)
 }

 if runOnce {
  if _, err := k.RunOnce(podCfg.Updates()); err != nil {
   return fmt.Errorf("runonce failed: %v", err)
  }
  klog.Info("Started kubelet as runonce")
 } else {
    // 开始kubelet
  startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
  klog.Info("Started kubelet")
 }
 return nil
}

// 开始运行,都是并发的
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
 // 运行
 go k.Run(podCfg.Updates())

 // 开启kubelet的http服务端
 if enableServer {
  go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth,
   enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling, kubeCfg.EnableSystemLogHandler)

 }
  // 只读端口
 if kubeCfg.ReadOnlyPort > 0 {
  go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
 }
 if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
  go k.ListenAndServePodResources()
 }
}

Kubelet

代码语言:javascript
复制
// 这里的k是一个interface定义,我们需要回头看看
type Bootstrap interface {
 GetConfiguration() kubeletconfiginternal.KubeletConfiguration
 BirthCry()
 StartGarbageCollection()
 ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler bool)
 ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
 ListenAndServePodResources()
 Run(<-chan kubetypes.PodUpdate)
 RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}

// 查看对应的实例化函数
func createAndInitKubelet() (k kubelet.Bootstrap, err error) {
 k, err = kubelet.NewMainKubelet()
 return k, nil
}

func NewMainKubelet() (*Kubelet, error) {
 // 参数的初始化
 
  // klet 的实例化结构
 klet := &Kubelet{}

  // 下面是klet中各种参数的填充
 return klet, nil
}

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
 // 内部模块的初始化
 if err := kl.initializeModules(); err != nil {
  kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
  klog.Fatal(err)
 }

 go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

 if kl.kubeClient != nil {
    // 与kube-apiserver同步节点状态
  go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
  go kl.fastStatusUpdateOnce()
  go kl.nodeLeaseController.Run(wait.NeverStop)
 }
 go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

 if kl.makeIPTablesUtilChains {
  kl.initNetworkUtil()
 }

  // 一个kill pod的goroutine
 go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)

 kl.statusManager.Start()
 kl.probeManager.Start()

 if kl.runtimeClassManager != nil {
  kl.runtimeClassManager.Start(wait.NeverStop)
 }

 kl.pleg.Start()
  // 同步的主逻辑
 kl.syncLoop(updates, kl)
}

syncLoop

代码语言:javascript
复制
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  // 开始运行kubelet的主同步循环
 klog.Info("Starting kubelet main sync loop.")
  
  // ticker每秒一次
 syncTicker := time.NewTicker(time.Second)
 defer syncTicker.Stop()
  // housekeeping 清理周期
 housekeepingTicker := time.NewTicker(housekeepingPeriod)
 defer housekeepingTicker.Stop()
 
 for {
  kl.syncLoopMonitor.Store(kl.clock.Now())
    // 同步
  if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
   break
  }
  kl.syncLoopMonitor.Store(kl.clock.Now())
 }
}

// 这里的3个channel比较重要:configCh用于配置,syncCh用于触发同步,housekeepingCh用于触发清理
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
 syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
 select {
 case u, open := <-configCh:
    // config channel关闭
  if !open {
   klog.Errorf("Update channel is closed. Exiting the sync loop.")
   return false
  }
  // 对应不同的操作
  switch u.Op {
  case kubetypes.ADD:
   klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
   handler.HandlePodAdditions(u.Pods)
  case kubetypes.UPDATE:
   klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
   handler.HandlePodUpdates(u.Pods)
  case kubetypes.REMOVE:
   klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
   handler.HandlePodRemoves(u.Pods)
  case kubetypes.RECONCILE:
   klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
   handler.HandlePodReconcile(u.Pods)
  case kubetypes.DELETE:
   klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
   handler.HandlePodUpdates(u.Pods)
  case kubetypes.SET:
   klog.Errorf("Kubelet does not support snapshot update")
  default:
   klog.Errorf("Invalid event type received: %d.", u.Op)
  }

  kl.sourcesReady.AddSource(u.Source)

 case e := <-plegCh:
  
 case <-syncCh:
  // 获取需要同步的pod,里面的逻辑暂不细看
    // 我们在这里接收到示例中要创建的nginx pod
  podsToSync := kl.getPodsToSync()
  if len(podsToSync) == 0 {
   break
  }
  klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
    // 开始处理
  handler.HandlePodSyncs(podsToSync)
 case update := <-kl.livenessManager.Updates():

 case <-housekeepingCh:
  if !kl.sourcesReady.AllReady() {
      // 清理没有ready,直接跳过
   klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
  } else {
      // 开始清理pod
   klog.V(4).Infof("SyncLoop (housekeeping)")
   if err := handler.HandlePodCleanups(); err != nil {
    klog.Errorf("Failed cleaning pods: %v", err)
   }
  }
 }
 return true
}

handler

往前查找代码,handler就是Kubelet

代码语言:javascript
复制
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
 start := kl.clock.Now()
 for _, pod := range pods {
    // 获取pod,然后分发
  mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
 }
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
 // 调用UpdatePod的函数
 kl.podWorkers.UpdatePod(&UpdatePodOptions{
  Pod:        pod,
  MirrorPod:  mirrorPod,
  UpdateType: syncType,
  OnCompleteFunc: func(err error) {
   if err != nil {
    metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
   }
  },
 })
}

// 查到初始化的地方  klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
 pod := options.Pod
 uid := pod.UID
 var podUpdates chan UpdatePodOptions
 var exists bool

 p.podLock.Lock()
 defer p.podLock.Unlock()
  // 当pod不存在时,满足示例,是新建的pod
 if podUpdates, exists = p.podUpdates[uid]; !exists {
  podUpdates = make(chan UpdatePodOptions, 1)
  p.podUpdates[uid] = podUpdates

    // 并发处理
  go func() {
   defer runtime.HandleCrash()
   p.managePodLoop(podUpdates)
  }()
 }
 if !p.isWorking[pod.UID] {
  p.isWorking[pod.UID] = true
  podUpdates <- *options
 } else {
  update, found := p.lastUndeliveredWorkUpdate[pod.UID]
  if !found || update.UpdateType != kubetypes.SyncPodKill {
   p.lastUndeliveredWorkUpdate[pod.UID] = *options
  }
 }
}

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
 var lastSyncTime time.Time
 for update := range podUpdates {
  err := func() error {
      // 同步pod的函数
   err = p.syncPodFn(syncPodOptions{
    mirrorPod:      update.MirrorPod,
    pod:            update.Pod,
    podStatus:      status,
    killPodOptions: update.KillPodOptions,
    updateType:     update.UpdateType,
   })
   lastSyncTime = time.Now()
   return err
  }()
  
  p.wrapUp(update.Pod.UID, err)
 }
}

// 找到syncPodFn被实例化的函数
func (kl *Kubelet) syncPod(o syncPodOptions) error {
  
  // 这里有一长串逻辑,不方便阅读,我们只关注最核心的部分
  
  // 调用 container runtime进行创建pod,再往下就是容器相关了
 result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
 kl.reasonCache.Update(pod.UID, result)
 if err := result.Error(); err != nil {
  for _, r := range result.SyncResults {
   if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
    return err
   }
  }
  return nil
 }
 return nil
}

Summary

  1. kubelet是kubernetes的Node节点上的管理者
  2. kubelet接收来自kube-apiserver上的pod消息,用Ticker这种周期性的方式触发同步函数
  3. kubelet会异步地对容器进行管理,调用对应容器的接口(Container Runtime Interface)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Go编程点滴 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 聚焦目标
  • 目录
  • Run
  • RunKubelet
  • Kubelet
  • syncLoop
  • handler
  • Summary
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档