前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >K8s 系列(四) - 浅谈 Informer

K8s 系列(四) - 浅谈 Informer

作者头像
astraw99
发布2021-09-22 12:11:40
1.3K1
发布2021-09-22 12:11:40
举报
文章被收录于专栏:K8s 系列K8s 系列

1. 概述

进入 K8s 的世界,会发现有很多的 Controller,它们都是为了完成某类资源(如 pod 是通过 DeploymentController, ReplicaSetController 进行管理)的调谐,目标是保持用户期望的状态。

K8s 中有几十种类型的资源,如何能让 K8s 内部以及外部用户方便、高效的获取某类资源的变化,就是本文 Informer 要实现的。本文将从 Reflector(反射器)、DeltaFIFO(增量队列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享资源通知器)、processorListener(事件监听处理器)、workqueue(事件处理工作队列) 等方面进行解析。

本文及后续相关文章都基于 K8s v1.22

K8s-informer

2. 从 Reflector 说起

Reflector 的主要职责是从 apiserver 拉取并持续监听(ListAndWatch) 相关资源类型的增删改(Add/Update/Delete)事件,存储在由 DeltaFIFO 实现的本地缓存(local Store) 中。

首先看一下 Reflector 结构体定义:

代码语言:javascript
复制

从结构体定义可以看到,通过指定目标资源类型进行 ListAndWatch,并可进行分页相关设置。

第一次拉取全量资源(目标资源类型) 后通过 syncWith 函数全量替换(Replace) 到 DeltaFIFO queue/items 中,之后通过持续监听 Watch(目标资源类型) 增量事件,并去重更新到 DeltaFIFO queue/items 中,等待被消费。

watch 目标类型通过 Go reflect 反射实现如下:

代码语言:javascript
复制

•通过反射确认目标资源类型,所以命名为 Reflector 还是比较贴切的;•List/Watch 的目标资源类型在 NewSharedIndexInformer.ListerWatcher 进行了确定,但 Watch 还会在 watchHandler 中再次比较一下目标类型;

3. 认识 DeltaFIFO

还是先看下 DeltaFIFO 结构体定义:

代码语言:javascript
复制

DeltaType 可分为以下类型:

代码语言:javascript
复制

通过上面的 Reflector 分析可以知道,DeltaFIFO 的职责是通过队列加锁处理(queueActionLocked)、去重(dedupDeltas)、存储在由 DeltaFIFO 实现的本地缓存(local Store) 中,包括 queue(仅存 objKeys) 和 items(存 objKeys 和对应的 Deltas 增量变化),并通过 Pop 不断消费,通过 Process(item) 处理相关逻辑。

K8s-DeltaFIFO

4. 索引 Indexer

上一步 ListAndWatch 到的资源已经存储到 DeltaFIFO 中,接着调用 Pop 从队列进行消费。实际使用中,Process 处理函数由 sharedIndexInformer.HandleDeltas 进行实现。HandleDeltas 函数根据上面不同的 DeltaType 分别进行 Add/Update/Delete,并同时创建、更新、删除对应的索引。

具体索引实现如下:

代码语言:javascript
复制

索引函数(IndexFunc):就是计算索引的函数,这样允许扩展多种不同的索引计算函数。默认也是最常用的索引函数是:MetaNamespaceIndexFunc。索引值(indexedValue):有些地方叫 indexKey,表示由索引函数(IndexFunc) 计算出来的索引值(如 ns1)。对象键(objKey):对象 obj 的 唯一 key(如 ns1/pod1),与某个资源对象一一对应。

K8s-indexer

可以看到,Indexer 由 ThreadSafeStore 接口集成,最终由 threadSafeMap 实现。

•索引函数 IndexFunc(如 MetaNamespaceIndexFunc)、KeyFunc(如 MetaNamespaceKeyFunc) 区别:前者表示如何计算索引,后者表示如何获取对象键(objKey);•索引键(indexKey,有些地方是 indexedValue)、对象键(objKey) 区别:前者表示由索引函数(IndexFunc) 计算出来的索引键(如 ns1),后者则是 obj 的 唯一 key(如 ns1/pod1);

5. 总管家 Controller

Controller 作为核心中枢,集成了上面的组件 Reflector、DeltaFIFO、Indexer、Store,成为连接下游消费者的桥梁。

Controller 由 controller 结构体进行具体实现:

在 K8s 中约定俗成:大写定义的 interface 接口,由对应小写定义的结构体进行实现。

代码语言:javascript
复制

Controller 中以 goroutine 协程方式启动 Run 方法,会启动 Reflector 的 ListAndWatch(),用于从 apiserver 拉取全量和监听增量资源,存储到 DeltaFIFO。接着,启动 processLoop 不断从 DeltaFIFO Pop 进行消费。在 sharedIndexInformer 中 Pop 出来进行处理的函数是 HandleDeltas,一方面维护 Indexer 的 Add/Update/Delete,另一方面调用下游 sharedProcessor 进行 handler 处理。

6. 启动 SharedInformer

SharedInformer 接口由 SharedIndexInformer 进行集成,由 sharedIndexInformer(这里看到了吧,又是大写定义的 interface 接口,由对应小写定义的结构体进行实现) 进行实现。

看一下结构体定义:

代码语言:javascript
复制

从结构体定义可以看到,通过集成的 controller(上面已分析) 进行 Reflector ListAndWatch,并存储到 DeltaFIFO,并启动 Pop 消费队列,在 sharedIndexInformer 中 Pop 出来进行处理的函数是 HandleDeltas。

所有的 listeners 通过 sharedIndexInformer.AddEventHandler 加入到 processorListener 数组切片中,并通过判断当前 controller 是否已启动做不同处理如下:

代码语言:javascript
复制

接着,在 HandleDeltas 中,根据 obj 的 Delta 类型(Added/Updated/Deleted/Replaced/Sync) 调用 sharedProcessor.distribute 给所有监听 listeners 处理。

7. 注册 SharedInformerFactory

SharedInformerFactory 作为使用 SharedInformer 的工厂类,提供了高内聚低耦合的工厂类设计模式,其结构体定义如下:

代码语言:javascript
复制

以 PodInformer 为例,说明使用者如何构建自己的 Informer,PodInformer 定义如下:

代码语言:javascript
复制

由使用者传入目标类型(&corev1.Pod{})、构造函数(defaultInformer),调用 SharedInformerFactory.InformerFor 实现目标 Informer 的注册,然后调用 SharedInformerFactory.Start 进行 Run,就启动了上面分析的 SharedIndexedInformer -> Controller -> Reflector -> DeltaFIFO 流程。

通过使用者自己传入目标类型、构造函数进行 Informer 注册,实现了 SharedInformerFactory 高内聚低耦合的设计模式。

8. 回调 processorListener

所有的 listerners 由 processorListener 实现,分为两组:listeners, syncingListeners,分别遍历所属组全部 listeners,将数据投递到 processorListener 进行处理。

•因为各 listeners 设置的 resyncPeriod 可能不一致,所以将没有设置(resyncPeriod = 0) 的归为 listeners 组,将设置了 resyncPeriod 的归到 syncingListeners 组;•如果某个 listener 在多个地方(sharedIndexInformer.resyncCheckPeriod, sharedIndexInformer.AddEventHandlerWithResyncPeriod)都设置了 resyncPeriod,则取最小值 minimumResyncPeriod;

代码语言:javascript
复制

从代码可以看到 processorListener 巧妙地使用了两个 channel(addCh, nextCh) 和一个 pendingNotifications(由 slice 实现的滚动 Ring) 进行 buffer 缓冲,默认的 initialBufferSize = 1024。既做到了高效传递数据,又不阻塞上下游处理,值得学习。

K8s-processorListener

9. workqueue 忙起来

通过上一步 processorListener 回调函数,交给内部 ResourceEventHandler 进行真正的增删改(CUD) 处理,分别调用 OnAdd/OnUpdate/OnDelete 注册函数进行处理。

为了快速处理而不阻塞 processorListener 回调函数,一般使用 workqueue 进行异步化解耦合处理,其实现如下:

K8s-workqueue.png

从图中可以看到,workqueue.RateLimitingInterface 集成了 DelayingInterface,DelayingInterface 集成了 Interface,最终由 rateLimitingType 进行实现,提供了 rateLimit 限速、delay 延时入队(由优先级队列通过小顶堆实现)、queue 队列处理 三大核心能力。

另外,在代码中可看到 K8s 实现了三种 RateLimiter:BucketRateLimiter, ItemExponentialFailureRateLimiter, ItemFastSlowRateLimiter,Controller 默认采用了前两种如下:

代码语言:javascript
复制

这样,在用户侧可以通过调用 workqueue 相关方法进行灵活的队列处理,比如失败多少次就不再重试,失败了延时入队的时间控制,队列的限速控制(QPS)等,实现非阻塞异步化逻辑处理。

10. 小结

本文通过分析 K8s 中 Reflector(反射器)、DeltaFIFO(增量队列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享资源通知器)、processorListener(事件监听处理器)、workqueue(事件处理工作队列) 等组件,对 Informer 实现机制进行了解析,通过源码、图文方式说明了相关流程处理,以期更好的理解 K8s Informer 运行流程。

可以看到,K8s 为了实现高效、非阻塞的核心流程,大量采用了 goroutine 协程、channel 通道、queue 队列、index 索引、map 去重等方式;并通过良好的接口设计模式,给使用者开放了很多扩展能力;采用了统一的接口与实现的命名方式等,这些都值得深入学习与借鉴。

PS: 更多内容请关注 k8s-club[1]

参考资料

1.Kubernetes 官方文档[2]2.Kubernetes 源码[3]3.Kubernetes Architectural Roadmap[4]

相关链接

[1] k8s-club: https://github.com/k8s-club/k8s-club [2] Kubernetes 官方文档: https://kubernetes.io/ [3] Kubernetes 源码: https://github.com/kubernetes/kubernetes [4] Kubernetes Architectural Roadmap: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/architectural-roadmap.md

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 稻草人生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 从 Reflector 说起
  • 3. 认识 DeltaFIFO
  • 4. 索引 Indexer
  • 5. 总管家 Controller
  • 6. 启动 SharedInformer
  • 7. 注册 SharedInformerFactory
  • 8. 回调 processorListener
  • 9. workqueue 忙起来
  • 10. 小结
    • 参考资料
      • 相关链接
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档