在云原生的海洋中,Kubernetes如同一艘航母,它的控制器系统则是维持应用稳定运行的“自动驾驶仪”。今天,让我们一起研究下控制器,深入理解它如何精确地管理我们的容器应用。
Controller是Kubernetes系统中的一种控制循环机制,其基本职能是监测集群的状态,确保系统的当前状态和预期的目标状态相匹配。如果不匹配,控制器将采取措施尝试修正这种差异。
在Kubernetes中,控制器使用API server监视集群资源的状态,并作出相应的更改或响应。控制器负责运行和维护集群的核心功能,如处理故障转移,滚动更新和自动扩展等。
控制器是Kubernetes的核心组件之一,负责确保集群的当前状态与用户声明的期望状态相匹配。想要理解控制器,首先我们需要了解它所依赖的声明式API是如何工作的。
在Kubernetes中,用户不需要告诉系统要执行哪一步操作,而是声明他们想要的最终状态。例如,当你告诉Kubernetes需要一个运行中的Pod时,你不用关心启动它的具体步骤,Kubernetes会自动为你处理。
控制器的运作可以描述为一个持续的控制循环:
举个例子:假设你声明了一个拥有3个副本的Pod(也就是你希望有3个相同的Pod运行)。如果其中一个Pod失败了,控制器会注意到现在只有2个副本,然后它会创建一个新的Pod,恢复到期望的3个副本。
在Kubernetes中,有多种类型的控制器,包括:
当我们深入研究Kubernetes控制器的内部工作原理时,会发现Informer
和WorkQueue
是构成每个控制器核心逻辑的两个关键组件。下面对这两个组件进行详细解释:
在控制器中,Informer和WorkQueue协同工作,形成了一个有效的事件驱动机制:
通过这种设计,Kubernetes控制器可以异步地处理资源变更,能够有效地响应集群状态的变化,同时保证控制器逻辑的执行顺序性和幂等性。
下面是一个简单Kubernetes控制器示例代码,在实际应用中,控制器会更加复杂,包括详细的错误处理机制、资源状态同步、事件广播等。此代码仅作为Kubernetes控制器结构和基本逻辑的简单演示。
package main
import (
"context"
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
corev1 "k8s.io/api/core/v1"
)
// Controller 代表管理Pod生命周期的控制器
type Controller struct {
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
}
// NewController 创建一个新的Controller
func NewController() *Controller {
// 使用Kubernetes client-go的in-cluster配置创建一个新配置
// In-cluster配置会使用kubernetes为pods提供的服务账户来连接到集群
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// 创建一个新的clientset,包括所有不同API组的rest客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 创建一个pod informer来观察pod变化
podInformer := informers.NewSharedInformerFactoryWithOptions(
clientset,
time.Minute*10,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("status.phase", string(corev1.PodRunning)).String()
}),
).Core().V1().Pods().Informer()
// 为控制器创建一个限速的工作队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 定义增加/更新/删除事件的处理方法
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
})
// 返回一个新的Controller
return &Controller{
clientset: clientset,
informer: podInformer,
queue: queue,
}
}
// Run 启动控制器
func (c *Controller) Run(stopCh <-chan struct{}) {
// 运行informer以开始监听更新事件
go c.informer.Run(stopCh)
// 等待缓存同步完成后再开始工作
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("缓存同步错误"))
return
}
// 开始一个工作循环来处理工作队列
go wait.Until(c.runWorker, time.Second, stopCh)
// 阻塞直到stop通道被关闭
<-stopCh
}
// runWorker 是一个长时间运行的函数,不断调用processNextItem函数
// 以便读取并处理工作队列上的消息
func (c *Controller) runWorker() {
for c.processNextItem() {
// 继续处理
}
}
// processNextItem 从工作队列中读取单个工作项并尝试处理它
func (c *Controller) processNextItem() bool {
// 从工作队列中获取下一个项目
key, quit := c.queue.Get()
if quit {
return false
}
// 告诉队列我们已经完成了这个键的处理
defer c.queue.Done(key)
// 处理键
err := c.syncHandler(key.(string))
if err == nil {
// 没有错误,重置速率限制计数器
c.queue.Forget(key)
} else if c.queue.NumRequeues(key) < 5 {
// 重试处理
c.queue.AddRateLimited(key)
} else {
// 重试太多次,放弃项目
c.queue.Forget(key)
runtime.HandleError(err)
}
return true
}
// syncHandler 处理键并执行业务逻辑
func (c *Controller) syncHandler(key string) error {
// 将键分割为命名空间和对象名
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 从集群中获取这个命名空间/名字的Pod资源
pod, err := c.clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return err
}
// 在这里写你的控制器逻辑,例如,打印Pod名称和它的状态
fmt.Printf("正在处理Pod: %s, 状态: %s\n", pod.GetName(), pod.Status.Phase)
// 在这个例子中,我们只是返回nil
return nil
}
func main() {
// 构建我们的控制器并运行它
controller := NewController()
// 创建一个信号通道来指示我们何时想要停止控制器
stop := make(chan struct{})
// 运行控制器
controller.Run(stop)
}
控制器是Kubernetes自我修复能力的关键,它确保了即使在出现故障的情况下,用户的应用也能快速恢复正常。
如果你对Kubernetes控制器的工作原理还有疑问,或者想要探讨更多容器技术的话题,请在后台留言,我们一起进步!
参考:
【1】以上图片源自文章:https://able8.medium.com/how-to-write-a-kubernetes-custom-controller-622841d1d3f6
【2】控制器示例代码来源:ChatGPT多次调试结果