controller-runtime 是基于 client-go 的 K8s 控制器开发框架,帮助开发者聚焦业务代码,快速高效的开发控制器。
本文由一次工作中遇到的故障说起,重点介绍了 controller-runtime 带有缓存机制的 Client 组件及其实现原理,并总结了一些与控制器缓存相关的实践注意事项。
PS:文末附【K8s 控制器学习 Roadmap】
一次故障引发的困惑
想象这样一个场景,有一个运行在集群中的控制器,在它的 Reconcile 逻辑中会使用 K8s Client 读取一个 K8s 对象,例如一个 ReplicaSet,假如控制器使用的 ServiceAccount 缺少了对 ReplicaSet 资源的 Get/List 权限,会发生什么?如下代码,我们会看到 “Bang!” 的错误日志吗?
ctx := context.TODO()
rs := &appsv1.ReplicaSet{}
if err := c.Get(ctx, client.ObjectKey{Namespace: "foo",Name: "bar"}, rs);err!=nil{
log.Error(err, "Bang!")
}
上述就是对工作中遇到的一次控制器故障的简化描述。当时我的直观判断是,如果缺少权限,Get 时应该直接返回缺少权限相关的报错,进而结束当前 Reconcile。但实际现象却是:控制器会一直“阻塞”在 c.Get 这一步,并不会执行到 log.Error,但会不断出现类似下文的错误日志:
E0203 10:04:51.433967 1 reflector.go: 178] sigs.k8s.to/controller-runtme/pkg/cache/internal/informers_map.go:224: Failed to list apps/v1, Kind=ReplicaSet:replicasets.apps is forbidden: User "system: serviceaccount:xxx:yyy" cannot list resource "replicasets" in API group "apps" at the cluster scope
明明错因就摆在眼前,却和预期的错误姿势对不上号,这种困惑的感觉可太难受了。
鉴于我们的控制器是基于 controller-runtime 开发的,K8s Client 也是通过 controller-runtime 的框架代码自动创建的,为了解释故障现象,我认为需要深入了解一下 controller-runtime Client 的来龙去脉。
从 ControllerManager 初始化说起
注:本文使用的 controller-runtime 源码基于 v0.6.2 版本
基于 controller-runtime 开发的控制器通常都从以下代码开始:
import (
ctrl "sigs.k8s.io/controller-runtime"
)
// 这段代码创建了一个新的 ControllerManager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
// ...
})
ControllerManager 正如其名,它会负责管理一个或多个控制器,其中一项重要工作就是初始化一个 K8s Client,上述故障中我们使用的 Client 正是由此而来。在创建 CtrMgr 的方法参数中,ctrl.Options 包含了 2 个重要配置项:新建 Cache 与 新建 Client。
// NewCache is the function that will create the cache to be used
// by the manager. If not set this will use the default new cache function.
NewCache cache.NewCacheFunc
// NewClient will create the client to be used by the manager.
// If not set this will create the default DelegatingClient that will
// use the cache for reads and the client for writes.
NewClient func(cache cache.Cache, _ *rest.Config, _ client.Options) (client.Client, error)
Cache 接口定义了 2 个功能,一是维护一组 Informer,每个 Informer 持续监听(ListAndWatch)某一类(GVK) K8s 对象的资源事件,并按照特定的索引计算方式,将对象数据存储在本地缓存中;二是提供 Get/List 方法以读取 Informers 缓存的 K8s 对象。既然提到了 Informer ,我们就得复习一下这张祖师爷卷轴:
图中展示了一个完整的基于 client-go 基础组件的控制器结构与功能流程,以贯穿全图的水平横线为界,上半部分描述了基于 List&Watch 的 Informer 机制,Informer 一方面将 K8s 对象缓存到 Indexer 中,另一方面为 K8s 资源事件注册各种 Handler,如果注册的 Handler 是将 事件的资源对象 key 存入一个队列中,那么就来到了图中的下半部分:WorkQueue + Reconciler,WorkQueue 类似一个消息队列,接收生产者(Informer 注册的 Handler)的消息(K8s objects key),并将消息持续提供给消费者(Reconciler),Reconciler 就是控制器的业务逻辑的核心所在。
controller-runtime 的 Cache 维护的一组 Informer,正是对应图中上半部分的内容,再将 Get/List 方法对接到每个 Informer 关联的 Indexer(真正保存缓存数据的对象) 上,就能实现以缓存的形式读取 K8s 资源的功能。
Cache 和 Client 有什么关系
前文提到,新建的 Cache 将被用于创建 Client ,现在我们知道 Cache 实现了 Client 中的 Get/List 接口方法,一个直观的念头是,Cache 实现直接“塞进” Client 实现中就好了,Client 的所有 Get/List 都交给 Cache 来做。但实际是这样吗?我们看 NewClient 的源码注释:如果用户在创建 ControllerManager 时没有提供 NewClientFunc 方法,将使用 DelegatingClient 作为默认的 Client 实现,那我们继续探究一下 DelegatingClient 对象。
// sigs.k8s.io/controller-runtime/pkg/manager/manager.go:374
func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// 创建一个直接与 api-server 交互的客户端
directC, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
// Reader 用于 Get/List 操作
Reader: &client.DelegatingReader{
// 新建的 cache 用在了这里
CacheReader: cache,
ClientReader: directC,
},
// Writer 用于 Creat/Update/Delete 等写操作
Writer: directC,
// StatusClient 用于对 Status 资源进行 Update/Patch 操作
StatusClient: directC,
}, nil
}
由源码可见,Cache 被用作了 DelegatingClient 中 Reader 的一部分 —— CacheReader,与之并列的还有一个不使用缓存、直连 api-server 的 ClientReader,为什么这里会用 2 个 Reader 呢?继续看 DelegatingReader 的源码。
// sigs.k8s.io/controller-runtimepkg/client/split.go:40
type DelegatingReader struct {
CacheReader Reader
ClientReader Reader
}
// Get retrieves an obj for a given object key from the Kubernetes Cluster.
func (d *DelegatingReader) Get(ctx context.Context, key ObjectKey, obj runtime.Object) error {
// 对承载 Get 结果的对象做类型断言
_, isUnstructured := obj.(*unstructured.Unstructured)
// 如果是 Unstructured 类型,则使用直连 api-server 的 client
if isUnstructured {
return d.ClientReader.Get(ctx, key, obj)
}
// 否则使用带有缓存机制的 client
return d.CacheReader.Get(ctx, key, obj)
}
由源码可见,DelgatingClient 的 Get/List 操作由 DelegatingReader 实现,DelegatingReader 会根据调用方传入的对象类型(类型断言)来决定是从缓存中,还是从 api-server 读取 K8s 对象,调用示例如下:
// ctrMgr 是基于默认配置创建的 ControllerManager
c := ctrMgr.GetClient()
// 使用 typed object 方式调用 Get 时,将从缓存中获取对象
rs := &appsv1.ReplicaSet{}
c.Get(ctx, request.NamespacedName, rs)
// 使用 untyped object (Unstructured) 方式调用 Get 时,将直接请求 api-server 获取对象
rsU := &unstructured.Unstructured{}
rsU.SetAPIVersion("apps/v1")
rsU.SetKind("ReplicaSet")
c.Get(ctx, request.NamespacedName, rsU)
至此,我们基本搞清楚了 ControllerManager 使用默认方式创建的 Client 中 Cache 的作用:Cache Reader 专用在 typed object 的 Get/List 操作上,而 untyped object 的 Get/List 、所有 Object 的写(Create/Update/Delete ,etc.)操作全部使用直连 api-server 的 Client。
最终还是缓存的锅
再回到前文记录的故障现象,控制器由于一次 Get 操作就“卡”住了,而 Get 操作只是去缓存中读数据而已,缺少权限就直接报错好了,为什么会卡住呢?联系前文介绍的基于 Informer 的 Cache 机制,可以猜想,阻塞应该与 Cache 中的 Informer 有关。
经过一番源码阅读,了解了从调用 c.Get() 到 Informer Cache 创建&启动的整个流程,我们才明白了故障原因 …
1. 先看 c.Get 的具体实现源码
// pkg/cache/informer_cache.go:54
func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runtime.Object) error {
// 从传入对象中获取 GVK
gvk, err := apiutil.GVKForObject(out, ip.Scheme)
if err != nil {
return err
}
// 基于 GVK 去查找 Cache(Informer)
started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
if err != nil {
return err
}
if !started {
return &ErrCacheNotStarted{}
}
// 从找到的 Cache 中读取对象
return cache.Reader.Get(ctx, key, out)
}
2. ip.InformersMap.Get 这一步负责获取 Cache,假如 Cache 不存在应该也是由它来创建 Cache,所以看它的实现源码
// pkg/cache/internal/informers_map.go:168
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
// Cache(Informer) 保存在一个以 GVK 为 key 的 map 中
// 如果找到匹配的 Informer,则直接返回
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
return i, ip.started, ok
}()
// 如果找不到,则创建一个新的 Informer
if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
return started, nil, err
}
}
// 以下部分稍后再看,先看上面的 addInformerToMap
if started && !i.Informer.HasSynced() {
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}
return started, i, nil
}
3. 至此我们可以明确一点,被缓存的数据并不是 Client 初始化时就预加载好的,而是直到 Client 首次请求该资源时才会去创建 Informer 进而加载缓存数据。addInformerToMap 方法负责创建并启动一个新的 SharedIndexInformer:
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
// ... 省略一部分
// 创建 ListWatcher
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
return nil, false, err
}
// 新建一个 client-go 封装的 SharedIndexInformer,即 Informer 最常用(几乎唯一)的实现
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i := &MapEntry{
Informer: ni,
// 把 Informer 关联的 Indexer 对接到 Get/List 方法上,从而读取其中缓存对象
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
}
// 将新建的 Informer 保存到一个以 GVK 为 key 的 map 中,供后续复用
ip.informersByGVK[gvk] = i
// 启动 Informer
if ip.started {
go i.Informer.Run(ip.stop)
}
return i, ip.started, nil
}
4. 再回头看第 2 步中未看完的源码,可以定位到触发阻塞的代码
// pkg/cache/internal/informers_map.go:168
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
// 已读省略 ...
// addInformerToMap 中启动了一个 Informer ...
if started && !i.Informer.HasSynced() {
// 这里必须等待 Informer HasSynced,否则将一直阻塞
// 后文介绍何为 HasSynced
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}
return started, i, nil
}
// k8s.io/client-go@v0.18.6/tools/cache/shared_informer.go:237
// 真正导致阻塞的代码
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollImmediateUntil(syncedPollPeriod,
func() (bool, error) {
// 必须所有 syncFunc 方法都返回 true 才结束阻塞
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
// ...
}
阻塞的代码找到了,新的问题来了:
这里再次祭出祖师爷卷轴:
如图中注释,再结合 ListWatch/Informer 机制的原理与作用,就可以理解必须阻塞等待 HasSynced 的原因了。假如 HasSynced 没有完成,那么同步到 DeltaFIFO 中的某类资源,相对比 api-server/etcd 中存储的对象一定是不完整的或者说不一致的,name依赖 Indexer 缓存该类资源全量数据的所有下游工作,例如控制器的 Reconcile 逻辑、例如上述事故场景中的 Get 操作,都无法正常进行,所以这里必须阻塞等待 HasSynced。
HasSynced 实际是由 Informer 中的 Reflector 中的 DeltaFIFO 实现的,如果对 DeltaFIFO 的实现感兴趣,强烈安利阅读文末“好文路由”【client-go Informer 三剑客】中列出的几篇文章。
最后画图简单总结一下使用 Cache Client Get 的流程。
Cache & Client 实践总结
在某些业务场景下, K8s 控制器可能需要通过 Client 频繁的读取大量 K8s 对象资源的数据,在集群规模稍大的情况下,这可能会给 api-server 带来巨大的负载压力,使用带有 Cache 机制的 Client 可以改善这一问题。
结合工作中的一些实践经验,这里列出一些使用 controller-runtime Cache Client 的方法或注意事项。
开启对 unstructured 资源的缓存机制
如前文所述,ControllerManager 使用默认方式创建的 Client 只会对 typed 对象使用缓存,如果需要对 untyped 对象也使用缓存机制:
在 controller-runtime v0.7.0+ 的版本中可以这样初始化 CtrMgr 的 Client:
mgr, err := manager.New(cfg, manager.Options{
NewClient: func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cache,
Client: c,
// 对这里列出的对象,无论 typed 还是 untyped,都不使用缓存
UncachedObjects: uncachedObjects,
// CacheUnstructured 控制是否对 untyped 对象使用缓存
CacheUnstructured: true,
})
},
})
在 v0.6.0 及以下版本中可以这样 hack 一下:
mgr, err := ctrlmanager.New(ctrl.GetConfigOrDie(), ctrl.Options{
NewClient: func(cache ctrlcache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
// 对所有类型(typed or untyped)资源的 Get/List 都使用缓存
Reader: cache,
Writer: c,
StatusClient: c,
}, nil
},
})
为开启缓存的资源准备好 SA 权限
如上文事故中描述的场景,问题出现的根因就是缺少被缓存资源的 List 权限。其实不仅仅是针对缓存功能,控制器中对任意资源的操作都需要提供相对应的权限,只不过缓存功能所需要的 List&Watch 权限容易被忽视。
警惕缓存导致内存占用过高
通常使用 Cache Client 是为了降低控制器 Get/List 请求对 api-server 的负载压力,但这必然会占用更多内存空间,尤其是缓存 ConfigMap 或 Secret 等体积可能比较大的资源对象,或者在集群规模比较大的情况下,使用 Cache Client 可能导致内存飙升,进而引发控制器 Pod OOM。因此需要合理预判内存消耗情况,尤其是控制器中涉及动态创建 Informer 或 动态 Get/List 资源的场景。
所谓动态创建 Informer 或 动态 Get/List 资源,是指控制器可能在运行过程中根据集群中的资源情况或者业务逻辑,在控制器(及其 Informer)已经启动的情况下,再为新类型的资源创建 Informer,这种情况通常难以预判内存消耗情况。
除了缓存对象本身数量多、体积大造成内存占用过高,ControllerManager 默认创建的 DelgatingClient ,在同时开启了对 typed 和 untyped 资源缓存的情况下,由于对同一类资源会缓存两份,也会造成内存过高。因此在这种情况下,对同一类资源的 Get/List 尽量全程只使用一种类型(typed or untyped)执行读取请求。
只缓存(List&Watch)部分资源
在 controller-runtime v0.9.0+ 版本,为了缓解缓存导致内存过高的问题,我们可以基于具体业务需求,通过配置 LabelSelector 或 FieldSelector,将 Client 设置为只缓存某个类型资源的一部分对象:
mgr, err := ctrlmanager.New(ctrl.GetConfigOrDie(), ctrl.Options{
NewCache: cache.BuilderWithOptions(cache.Options{
Scheme: scheme,
SelectorsByObject: cache.SelectorsByObject{
&appsv1.ReplicaSet{}: {
Label: labels.SelectorFromSet(labels.Set{"foo":"bar"}),
},
&appsv1.ReplicaSet{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "foo"}),
},
},
}),
})
这里需要注意,selector 虽然是出现在构造 Cache 的配置项里,但它实际是作用在 List&Watch 方法中,未被 Selecotor 对象不仅无法从缓存中读取到,更无法通过 Informer 机制触发对这些资源对象的 Reconcile!因为它们的事件压根就不会进入 DeltaFIFO 当然也不会进入 WorkQueue,所以相比对缓存功能的影响,这个配置对控制器的 Reconcile 的影响实际更大,谨慎配置。
延伸思考一个问题:ListWatch 中被 selector 过滤掉的对象一定不会触发对这些对象的 Reconcile 吗?答案:不一定。假设我有一个 ReplicaSet 对象 A,它没有被 Cache Selector 选中,所以通常它不会触发控制器的 Reconcile。但如果控制器同时还 Watch 了其他资源,例如 Pod,在 Pod 的 Event Handler 中我们可以将 Pod 的 Owner ReplicaSet 投入到 ReplicaSet 的 WorkQueue 中,进而触发 Reconcile。也就是说,某类资源的 WorkQueue 消息来源不仅仅是该资源的 Informer,也可以来自其他类型资源的 Informer。这种情况下还有一个小坑,由于 selector 从缓存中过滤掉了 A 对象,但 Pod Informer 可以触发对 A 的 Reconcile,所以此时在 Reconcile 方法中如果通过 Cache Client Get A 就有可能出现 NotFound 错误。
只缓存(List&Watch)资源的 metadata
随着 client-go 支持只 List&Watch K8s 资源的metadata,controller-runtime 在 v0.7.0+ 版本中也加入了对该功能的封装支持。在这种场景下,控制器只关心资源的 metadata,并不关心 spec 和 status。
在实践中,如果你的业务只需要确认某些资源存在,但不关心资源的完整配置或者只关心资源的 OwnerReferences,同时这些资源的完整配置如果体积巨大(例如 ConfigMap),那对这些类型的资源使用 metadata-only watch 就特别合适,可以大大缓解内存占用压力。
关于 metadata-only watch 的使用请参考:
实践样例:https://www.sobyte.net/post/2022-04/controller-runtime/#using-metadata
实现 MR:https://github.com/kubernetes-sigs/controller-runtime/pull/1174
缓存里找不到就真找不到了
常见的缓存系统,例如 web server 缓存,当请求无法从缓存数据中获取到目标对象时,会刷新缓存或尝试从数据源头查找,但是在 controller-runtime 封装的 Cache Client 中却不是这样,如果你的 Get 请求走的是缓存,在缓存中没有找到目标对象,那么 Client 不会再向 api-server 发出请求,也不存在“刷新”缓存的操作(List&Watch 机制下,可以认为一直在做刷新操作),所以在 Cache Client 的默认机制下,缓存里找不到就真的找不到了。
由于缓存的过滤机制或者ListWatch故障(概率极小),被请求的对象不在缓存中就未必代表它不存在于 K8s 集群中的,所以我们可以按照业务需要,对 Cache Client 稍作改造,当缓存中找不到时可以再尝试直接向 api-server 请求。
牢记 Cache Client under the hood
前文对 Cache Client 的各种配置,加上按照业务需求对 Cache Client 做的魔改,一个简单的 client.Get/List 背后可能会有非常复杂的逻辑路径,所以最重要的还是牢记 Cache Client 背后的原理,牢记到底哪些资源在哪些情况下会使用缓存,又有哪些资源在哪些情况下直连 api-server。