List-Watch是kubernetes的核心机制。组件kubelet、kube-controller-manager、kube-scheduler需要监控各种资源(pod、service等)的变化,当这些对象发生变化时(add、delete、update),kube-apiserver会主动通知这些组件。这个过程类似一个发布-订阅系统。本文章将从代码角度探究一下list-watch的实现方式。
转载自https://zhuanlan.zhihu.com/p/33335726,有修改
**第一部分:**kube-apiserver对etcd的List-watch机制
流程示意图
构建PodStorage
- kube-apiserver针对每一类资源(pod、service、endpoint、replication controller、depolyments)都会构建Storage对象,如:PodStorage;
- PodStorage.Pod.Store封装了对etcd的操作;
- store.CompleteWithOptions会调用etcdOptions.GetRESTOptions,此方法将
调用generic.UndecoratedStorage创建无cache的Store;
或者调用genericregistry.StorageWithCacher创建带Cache的Store;
StorageWithCacher
- 调用NewCacherFromConfig,将创建Cacher对象;
创建Cacher
- 首先,创建watchCache对象和cacheListerWatcher对象,cacheListWatcher对象是ListerWatcher接口实现,实现了List()和Watch()方法;
- 构建Cacher对象,主要的数据成员:watchCache、reflector、watchers及incoming channel;
-
- watchCache是一个cache,用来存储apiserver从etcd那里watch到的对象;
- watchers是一个map,map的值类型为cacheWatcher,当kubelet、kube-scheduler需要watch某类资源时,他们会向kube-apiserver发起watch请求,kube-apiserver就会生成一个cacheWatcher,cacheWatcher负责将watch的资源通过http从apiserver传递到kubelet、kube-scheduler;
- Reflector对象,主要数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();listerWatcher包装了Storage,主要是将watch到的对象存到watchCache中;
- incoming channel接收watchCacheEvent;
- 协程调用cacher.dispatchEvents,watchCache将incoming channel接收watchCacheEvent添加到watchers的inputChan中;
- 协程调用cacher.startCaching;
StartCaching
- 执行cacheListerWatcher的List方法和Watch方法;
- 调用reflector的watchHandler方法;
cacheListWatcher.List/cacheListWatcher.Watch
- List方法将调用storage.List方法,这里是etcdHelper.List方法;
- Watch方法将调用storage.watch方法,这里是etcdHelper.WatchList方法;
etcdHelper.List/etcdHelper.Watch
- etcdHelper对象是Storage接口对象的实现;
- etcdHelper的List方法:
-
- 获取etcd的对象(包括resourceVersion信息);
- etcdHelper的WatchList方法:
-
- 创建etcdWatcher;
- etcdWatcher对象,实现了Watch接口;
- etcdWatcher对象,主要的数据成员是etcdIncoming channel和outgoing channel;
- 协程执行etcdWatcher.translate;
- 最后,协程运行etcdWatcher.etcdWatch;
etcdWatcher.etcdWatch
- 如果resourceVersion==0, 运行etcdGetInitialWatchState(),获取所有的pods,并将结果输入到etcdIncoming channel;
- 之后,不停的调用watcher.Next(),并将结果输入到etcdIncoming channel;
etcdWatcher.translate
- 读取etcdIncoming channel信息;
- 调用etcdWatcher.sendResult进行转化;
- 发送到outgoing channel;
reflector.watchHandler
- 读取outgoing channel信息,操作watchCache;
操作watchCache
处理事件watchCache.processEvent
- 创建watchCacheEvent
- 调用watchCache.updateCache,更新watchCache;
到此分析完kube-apiserver对etcd的watch机制,除此之外,kube-apiserver会向其他组件提供watch接口,下面将分析kube-apiserver的watch API。
第二部分:kube-apiserver的watch restful API
kube-apiserver提供watch restful API给其他组件(kubelet、kube-controller-manager、kube-scheduler、kube-proxy)。watch restful API的处理流程和PUT、DELETE、GET等REST API处理流程类似。
流程示意图
registerResourceHandlers
ListResource
1.调用rw.watch方法,这里将会调用Store.watch;
2.调用serveWatch方法;
Store.watch
- 调用Storage.Watch方法和Storage.WatchList方法,这里将调用Cacher.watch方法和Cacher.WatchList方法
Cacher.watch
- watch方法中将调用newCacheWatcher;
- newCacheWatcher方法:
- 生成一个watcher,并将watcher插入到cacher.watchers中;
- 协程调用cacheWatcher.process方法,此方法将会操作input channel的消息;
操作input channel
- 读取input channel的信息,并调用sendWatchCacheEvent方法;
sendWatchCacheEvent
- kube-apiserver的watch会带过滤功能;
- 对watchCacheEvent进行Filter,发送到cacher.result channel中;