前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【深入浅出】Kubernetes控制器:云原生架构的无形守护者

【深入浅出】Kubernetes控制器:云原生架构的无形守护者

作者头像
希里安
发布2024-01-30 15:39:26
1410
发布2024-01-30 15:39:26
举报
文章被收录于专栏:希里安

在云原生的海洋中,Kubernetes如同一艘航母,它的控制器系统则是维持应用稳定运行的“自动驾驶仪”。今天,让我们一起研究下控制器,深入理解它如何精确地管理我们的容器应用。

什么是k8s控制器

Controller是Kubernetes系统中的一种控制循环机制,其基本职能是监测集群的状态,确保系统的当前状态和预期的目标状态相匹配。如果不匹配,控制器将采取措施尝试修正这种差异。

在Kubernetes中,控制器使用API server监视集群资源的状态,并作出相应的更改或响应。控制器负责运行和维护集群的核心功能,如处理故障转移,滚动更新和自动扩展等。

控制器的秘密武器:声明式API

控制器是Kubernetes的核心组件之一,负责确保集群的当前状态与用户声明的期望状态相匹配。想要理解控制器,首先我们需要了解它所依赖的声明式API是如何工作的。

什么是声明式API?

在Kubernetes中,用户不需要告诉系统要执行哪一步操作,而是声明他们想要的最终状态。例如,当你告诉Kubernetes需要一个运行中的Pod时,你不用关心启动它的具体步骤,Kubernetes会自动为你处理。

控制器循环:观察-分析-行动

控制器的运作可以描述为一个持续的控制循环:

  1. 观察(Observe) - 控制器通过API服务器监测集群的当前状态。
  2. 分析(Analyze) - 控制器比较当前状态与用户声明的期望状态。
  3. 行动(Act) - 如果两者不一致,控制器会采取必要的步骤来使当前状态转变为期望状态。

举个例子:假设你声明了一个拥有3个副本的Pod(也就是你希望有3个相同的Pod运行)。如果其中一个Pod失败了,控制器会注意到现在只有2个副本,然后它会创建一个新的Pod,恢复到期望的3个副本。

控制器的种类:

在Kubernetes中,有多种类型的控制器,包括:

  • Deployment控制器 - 管理无状态应用的多个副本
  • StatefulSet控制器 - 管理有状态应用的多个副本
  • Job控制器 - 管理短暂的一次性任务
  • DaemonSet Controller - 确保所有(或某些)节点上都运行一个Pod的副本,当有新节点加入集群时,DaemonSet也会在新节点上添加所需的Pod。
  • ...等等

核心原理

当我们深入研究Kubernetes控制器的内部工作原理时,会发现InformerWorkQueue是构成每个控制器核心逻辑的两个关键组件。下面对这两个组件进行详细解释:

Informer

  1. 作用:Informer是Kubernetes客户端库(client-go)中的一部分,它负责监视(watch)Kubernetes API服务上某种资源类型(如Pods, Services等)的变化。
  2. 原理
    • Informer利用Kubernetes API的watch机制来监听集群状态的变化。
    • 当Informer监听到资源变化时,它会将变化的信息添加到本地存储中,并且触发注册的事件处理程序。
    • 这个本地存储是一个索引器(Indexer)和本地缓存,它保存了从API服务器获取的所有对象的最新状态。
  3. 事件处理
    • Informer在本地存储中为每个资源对象建立索引,当状态变更被检测到时,Informer会调用特定的事件处理回调函数(如AddFunc, UpdateFunc, DeleteFunc)。
    • 这些回调函数通常不直接执行业务逻辑,而是将相关的工作项推送到WorkQueue中,以便控制器可以异步处理这些变更。

WorkQueue

  1. 作用:WorkQueue是一种用于存储即将被处理的工作项的数据结构,这些工作项通常是对Kubernetes资源(如Pods, Nodes等)的变更。
  2. 原理
    • WorkQueue中的项目通常是由Informer检测到的变化事件或需要重试的失败操作。
    • 控制器会从WorkQueue中取出项目,并执行相应的逻辑来处理这些变更,如创建或更新资源。
  3. 特点
    • WorkQueue能够去重,即同一个资源的多次变更可以合并为一次处理,避免了不必要的重复操作。
    • WorkQueue通常具有重试逻辑,如果控制器在执行操作时失败,可以将工作项放回队列中,以便稍后重试。

整合 Informer 和 WorkQueue

在控制器中,Informer和WorkQueue协同工作,形成了一个有效的事件驱动机制:

  • Informer将API服务器的资源变更事件转化为工作项,并将这些工作项推送到WorkQueue。
  • 控制器从WorkQueue中接收工作项,并基于当前的资源状态与期望的状态执行业务逻辑,如创建、更新或删除资源。
  • 如果某个操作失败,控制器可以决定重试,将该工作项重新放入WorkQueue。

通过这种设计,Kubernetes控制器可以异步地处理资源变更,能够有效地响应集群状态的变化,同时保证控制器逻辑的执行顺序性和幂等性。

简单的控制器源码

下面是一个简单Kubernetes控制器示例代码,在实际应用中,控制器会更加复杂,包括详细的错误处理机制、资源状态同步、事件广播等。此代码仅作为Kubernetes控制器结构和基本逻辑的简单演示。

代码语言:javascript
复制
package main

import (
    "context"
    "fmt"
    "time"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/apimachinery/pkg/watch"
    corev1 "k8s.io/api/core/v1"
)

// Controller 代表管理Pod生命周期的控制器
type Controller struct {
    clientset     kubernetes.Interface
    queue         workqueue.RateLimitingInterface
    informer      cache.SharedIndexInformer
}

// NewController 创建一个新的Controller
func NewController() *Controller {
    // 使用Kubernetes client-go的in-cluster配置创建一个新配置
    // In-cluster配置会使用kubernetes为pods提供的服务账户来连接到集群
    config, err := rest.InClusterConfig()
    if err != nil {
        panic(err.Error())
    }

    // 创建一个新的clientset,包括所有不同API组的rest客户端
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 创建一个pod informer来观察pod变化
    podInformer := informers.NewSharedInformerFactoryWithOptions(
        clientset, 
        time.Minute*10,
        informers.WithTweakListOptions(func(options *metav1.ListOptions) {
            options.FieldSelector = fields.OneTermEqualSelector("status.phase", string(corev1.PodRunning)).String()
        }),
    ).Core().V1().Pods().Informer()

    // 为控制器创建一个限速的工作队列
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    // 定义增加/更新/删除事件的处理方法
    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(old, new interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(new)
            if err == nil {
                queue.Add(key)
            }
        },
    })

    // 返回一个新的Controller
    return &Controller{
        clientset: clientset,
        informer:  podInformer,
        queue:     queue,
    }
}

// Run 启动控制器
func (c *Controller) Run(stopCh <-chan struct{}) {
    // 运行informer以开始监听更新事件
    go c.informer.Run(stopCh)

    // 等待缓存同步完成后再开始工作
    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("缓存同步错误"))
        return
    }

    // 开始一个工作循环来处理工作队列
    go wait.Until(c.runWorker, time.Second, stopCh)

    // 阻塞直到stop通道被关闭
    <-stopCh
}

// runWorker 是一个长时间运行的函数,不断调用processNextItem函数
// 以便读取并处理工作队列上的消息
func (c *Controller) runWorker() {
    for c.processNextItem() {
        // 继续处理
    }
}
// processNextItem 从工作队列中读取单个工作项并尝试处理它
func (c *Controller) processNextItem() bool {
    // 从工作队列中获取下一个项目
    key, quit := c.queue.Get()
    if quit {
        return false
    }

    // 告诉队列我们已经完成了这个键的处理
    defer c.queue.Done(key)

    // 处理键
    err := c.syncHandler(key.(string))
    if err == nil {
        // 没有错误,重置速率限制计数器
        c.queue.Forget(key)
    } else if c.queue.NumRequeues(key) < 5 {
        // 重试处理
        c.queue.AddRateLimited(key)
    } else {
        // 重试太多次,放弃项目
        c.queue.Forget(key)
        runtime.HandleError(err)
    }

    return true
}

// syncHandler 处理键并执行业务逻辑
func (c *Controller) syncHandler(key string) error {
    // 将键分割为命名空间和对象名
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    // 从集群中获取这个命名空间/名字的Pod资源
    pod, err := c.clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
    if err != nil {
        return err
    }

    // 在这里写你的控制器逻辑,例如,打印Pod名称和它的状态
    fmt.Printf("正在处理Pod: %s, 状态: %s\n", pod.GetName(), pod.Status.Phase)

    // 在这个例子中,我们只是返回nil
    return nil
}

func main() {
    // 构建我们的控制器并运行它
    controller := NewController()

    // 创建一个信号通道来指示我们何时想要停止控制器
    stop := make(chan struct{})

    // 运行控制器
    controller.Run(stop)
}

总结

控制器是Kubernetes自我修复能力的关键,它确保了即使在出现故障的情况下,用户的应用也能快速恢复正常。

如果你对Kubernetes控制器的工作原理还有疑问,或者想要探讨更多容器技术的话题,请在后台留言,我们一起进步!

参考:

【1】以上图片源自文章:https://able8.medium.com/how-to-write-a-kubernetes-custom-controller-622841d1d3f6

【2】控制器示例代码来源:ChatGPT多次调试结果

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-01-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 希里安 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是k8s控制器
  • 控制器的秘密武器:声明式API
    • 什么是声明式API?
      • 控制器循环:观察-分析-行动
        • 控制器的种类:
        • 核心原理
          • Informer
            • WorkQueue
              • 整合 Informer 和 WorkQueue
                • 简单的控制器源码
                  • 总结
                  相关产品与服务
                  容器服务
                  腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档