首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kubernetes controller 解析

kubernetes controller 解析

原创
作者头像
王磊-字节跳动
修改2019-10-08 19:29:08
1.7K0
修改2019-10-08 19:29:08
举报
文章被收录于专栏:01ZOO01ZOO

Controller

image
image
  • List/Watch:List是列举apiserver中对象的接口,Watch是监控apiserver资源变化的接口;
  • Reflector:实现对apiserver指定类型对象的监控,其中反射实现的就是把监控的结果实例化成具体的对象;
  • DeltaIFIFO:将Reflector监控的变化的对象形成一个FIFO队列,此处的Delta就是变化
  • LocalStore:指的就是Indexer的实现cache,这里面缓存的就是apiserver中的对象(其中有一部分可能还在DeltaFIFO中),此时使用者再查询对象的时候就直接从cache中查找,减少了apiserver的压力;
  • Callbacks:通知回调函数,Infomer感知的所有对象变化都是通过回调函数通知使用者(Listener);

参考

Controller 之 Indexer

controller内部有个内存cache,cache 一般和lister/ indexer 一起配合使用, 用一个 Indexer interface进行的包装

type fooCache struct {
	lister  listers.FooLister
	indexer cache.Indexer
}

type cache struct {
	// cacheStorage bears the burden of thread safety for the cache
	cacheStorage ThreadSafeStore
	// keyFunc is used to make the key for objects stored in and retrieved from items, and
	// should be deterministic.
	keyFunc KeyFunc
}

func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
	return &threadSafeMap{
		items:    map[string]interface{}{},
		indexers: indexers,
		indices:  indices,
	}
}

可以看出 外部使用的 fooCache -> cache.Indexer -> cache -> KeyFunc+ThreadSafeStore -> map+indexers+indices (参考 thread_safe_store.go)

实际存的时候会 使用 keyFunc 存到 map,同时更新所有的 indices

Indexer是一个 Interface,对cache 增加了 index的能力,即索引

// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
	Store
	// Retrieve list of objects that match on the named indexing function
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// IndexKeys returns the set of keys that match on the named indexing function.
	IndexKeys(indexName, indexKey string) ([]string, error)
	// ListIndexFuncValues returns the list of generated values of an Index func
	ListIndexFuncValues(indexName string) []string
	// ByIndex lists object that match on the named indexing function with the exact key
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	// GetIndexer return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

一个 store 比如 NewThreadSafeStore,是一个内存 map实现,本来就有 index的能力,那么增加这个复杂的 indexer是为什么呢?

indexer是为了从不同维度查询内存数据更方便设计的,比如 IndexByPodNodeName, 通过他可以用 node name 查询所有的 pod.

里面有很多概念,理解几个概念就能很快理解:

  1. 索引键: index key -> object keys
  2. 对象键:object key -> object
  3. indexFunc: 从对象到索引键,用于Indexer
  4. keyFunc: 从对象到对象键,用于cache

kubernetes 里面主要用到的IndexFunc有(其实用得不多,设计通用了,复杂性增加了不少)

  1. MetaNamespaceIndexFunc:namespace作为索引键查询同namespace的所有对象
  2. indexByPodNodeName:nodename作为索引键,查询同nodeName的所有对象(Pod)

Controller 之 deltaFifo, WorkQueue

注意区别这FIFO 和 WorkQueue,在kubernetes contoller 语意里面他们处于不同的位置,FIFO 用于 cache层,连接reflector和local cache。而 worker queue则把变化提供给controller上层的 worker处理,Fifo的功能比较简单,一般都是用 deltaFifo;而worker queue有很多种,比如通用队列、限速队列、延时队列,用于满足上层的应用需求

deltaFifo

deltaFifo 是一种 fifo,fifo实现了 store + Pop/AddIfNotPresent

deltaFifo 使用 mapstringDeltas 存储数据,delta是一种object同时带 DeltaType,这样对于一个 object key下面的 object变化都会归类到同一个key下面的 Deltas

deltafifo 的 knownObjects KeyListerGetter 即Indexer,即用户可见的 Localcache 内容,deltaFifo 像是对Local cache的一个管道,需要对比 LocalCache里面的对象中的一些状态来简化、合并一些变化。

type DeltaType string

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// The other types are obvious. You'll get Sync deltas when:
	//  * A watch expires/errors out and a new list/watch cycle is started.
	//  * You've turned on periodic syncs.
	// (Anything that trigger's DeltaFIFO's Replace() method.)
	Sync DeltaType = "Sync"
)

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
	Type   DeltaType
	Object interface{}
}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta

delta fifo的一个好处是,有些delta可以合并,比如:两个连续的删除

workqueue

参考 https://blog.csdn.net/weixin_42663840/article/details/81482553

Controller 之 SharedInformer

Reflector

// 代码源自client-go/tools/cache/reflector.go
type Reflector struct {
    name string                                 // 名字
    metrics *reflectorMetrics                   // 监控
    expectedType reflect.Type                   // 反射的类型,也就是要监控的对象类型,比如Pod
    store Store                                 // 存储 -> DeltaFIFO
    listerWatcher ListerWatcher                 // 从 apiserver 获取资源, 使用 rest api
    period       time.Duration                  // 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
                                                // 这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
    resyncPeriod time.Duration                  // 重新同步的周期,很多人肯定认为这个同步周期指的是从apiserver的同步周期
                                                // 其实这里面同步指的是shared_informer使用者需要定期同步全量对象
    ShouldResync func() bool                    // 是否需要同步
    clock clock.Clock                           // 时钟
    lastSyncResourceVersion string              // 最后一次同步的资源版本
    lastSyncResourceVersionMutex sync.RWMutex   // 最后一次同步的资源版本锁
}
  1. Reflector利用apiserver的client列举全量对象(版本为0以后的对象全部列举出来)
  2. 将全量对象采用Replace()接口同步到DeltaFIFO中,并且更新资源的版本号,这个版本号后续会用到;
  3. 开启一个协程定时执行resync,如果没有设置定时同步则不会执行,同步就是把全量对象以同步事件的方式通知出去;
  4. 通过apiserver的client监控(watch)资源,监控的当前资源版本号以后的对象,因为之前的都已经获取到了;
  5. 一旦有对象发生变化,那么就会根据变化的类型(新增、更新、删除)调用DeltaFIFO的相应接口,产生一个相应的对象Delta,同时更新当前资源的版本;

参考

核心代码

// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    var resourceVersion string
    // 列举全部对象
    options := metav1.ListOptions{ResourceVersion: "0"}
    list, err := r.listerWatcher.List(options)
    // ...略
    items, err := meta.ExtractList(list)
    // ...略
    // 同步到DeltaFIFO中
    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    }
    // ...略
 
    // watch逻辑
    for {
        // ...略
        w, err := r.listerWatcher.Watch(options)
        // watch返回是流,apiserver会将变化的资源通过这个流发送出来,client-go最终通过chan实现的
        // 所以watchHandler()是一个需要持续从chan读取数据的流程,所以需要传入resyncerrc和stopCh
        // 用于异步通知退出或者后台同步协程错误
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
        // ...略
        }
    }
}

// List 之后会 syncWith
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    // ... 略
    // 直接调用了DeltaFIFO的Replace()接口,这个接口就是用于同步全量对象的
    return r.store.Replace(found, resourceVersion)
}

// 实现从watch返回的chan中持续读取变化的资源,并转换为DeltaFIFO相应的调用
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    // 这里就开始无限循环的从chan中读取资源的变化,也可以理解为资源的增量变化,同时还要监控各种信号
loop:
    for {
        select {
        // ...略
        case event, ok := <-w.ResultChan():
            // 如果不OK,说明chan关闭了,就要重新获取,这里面我们可以推测这个chan可能会运行过程中重新创建
            // 否则就应该退出而不是继续循环
            if !ok {
                break loop
            }
            // 看来event可以作为错误的返回值,而不是通过关闭chan,这种方式可以传递错误信息
            if event.Type == watch.Error {
                return apierrs.FromObject(event.Object)
            }
            // ...略
            // 根据事件的类型做不同的DeltaFIFO的操作
            switch event.Type {
            // 向DeltaFIFO添加一个添加的Delta
            case watch.Added:
                err := r.store.Add(event.Object)
            // 更新对象,向DeltaFIFO添加一个更新的Delta
            case watch.Modified:
                err := r.store.Update(event.Object)
            }
            // 删除对象,向DeltaFIFO添加一个删除的Delta
            case watch.Deleted:
                err := r.store.Delete(event.Object)
            }
            }
            // 更新最新资源版本
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }
    // watch返回时间非常短而且没有任何事件要处理,这个属于异常现象,因为我们watch是设置了超时的
    watchDuration := r.clock.Now().Sub(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        r.metrics.numberOfShortWatches.Inc()
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
 
    return nil
}

Controller 和 IndexerInformer/Informer

这里 Controller 和 IndexerInformer是一回事,NewIndexerInformer/NewInformer 返回的就是一个 Contoller

Controller的作用很简单,就是连接 Reflector 和 Handller, IndexerInformer/Informer 连接的同时还会在 clientState 做同步,clientState 就是LocalCache =》 clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

核心代码

func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
) Controller {
	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(Deltas) {
				switch d.Type {
				case Sync, Added, Updated:
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						h.OnUpdate(old, d.Object)
					} else {
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						h.OnAdd(d.Object)
					}
				case Deleted:
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					h.OnDelete(d.Object)
				}
			}
			return nil
		},
	}
	return New(cfg)
}

SharedInformer

有Informer已经够用了,但是如果client新建了很多 Informer,那么client state也就是 local cache就会存很多重复数据,这样是一种浪费,SharedInformer 是来解决这个问题的, 解决办法是 共享index存储,同时事件对 listener多次分发。(client 直接使用SharedInformer效率更高)

// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
// one behavior change compared to a standard Informer.  When you receive a notification, the cache
// will be AT LEAST as fresh as the notification, but it MAY be more fresh.  You should NOT depend
// on the contents of the cache exactly matching the notification you've received in handler
// functions.  If there was a create, followed by a delete, the cache may NOT have your item.  This
// has advantages over the broadcaster since it allows us to share a common cache across many
// controllers. Extending the broadcaster would have required us keep duplicate caches for each
// watch.
type SharedInformer interface {
	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
	// period.  Events to a single handler are delivered sequentially, but there is no coordination
	// between different handlers.
	AddEventHandler(handler ResourceEventHandler)
	// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
	// specified resync period.  Events to a single handler are delivered sequentially, but there is
	// no coordination between different handlers.
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// GetStore returns the Store.
	GetStore() Store
	// GetController gives back a synthetic interface that "votes" to start the informer
	GetController() Controller
	// Run starts the shared informer, which will be stopped when stopCh is closed.
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has synced.
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
	// store. The value returned is not synchronized with access to the underlying store and is not
	// thread-safe.
	LastSyncResourceVersion() string
}

核心代码

// 代码源自client-go/tools/cache/shared_informer.go
// 分发,写放大
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    // ... 略
 
    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

// 这个函数是通过sharedProcessor利用wait.Group启动的
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    // nextCh是在这里,函数退出前析构的
    defer close(p.nextCh)
    // 临时变量,下面会用到
    var nextCh chan<- interface{}
    var notification interface{}
    // 进入死循环啦
    for {
        select {
        // 有两种情况,nextCh还没有初始化,这个语句就会被阻塞,这个我在《深入浅出golang之chan》说过
        // nextChan后面会赋值为p.nextCh,因为p.nextCh也是无缓冲的chan,数据不发送成功就阻塞               
        
        case nextCh <- notification:
            // 如果发送成功了,那就从缓冲中再取一个事件出来
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok {
                // 如果没有事件,那就把nextCh再次设置为nil,接下来对于nextCh操作还会被阻塞
                nextCh = nil
            }
        // 从p.addCh读取一个事件出来,这回看到消费p.addCh的地方了
        case notificationToAdd, ok := <-p.addCh:
            // 说明p.addCh关闭了,只能退出
            if !ok {
                return
            }
            // notification为空说明当前还没发送任何事件给处理器
            if notification == nil {
                // 那就把刚刚获取的事件通过p.nextCh发送个处理器
                notification = notificationToAdd
                nextCh = p.nextCh
            } else {
                // 上一个事件还没有发送成功,那就先放到缓存中
                // pendingNotifications可以想象为一个slice,这样方便理解,是一个动态的缓存,
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

这里面对逻辑有点绕,简单来说 就是 reflector -> deltafifo -> processor.distribute (indexer也同步一份) -> listeners.add -> processorListener.addCh -> processorListener.nextCh -> p.handler 处理

addCh, nextCh两个chan的作用是保证 addCh 不阻塞

总结

  • 利用apiserver的api实现资源的列举和监控(Reflector实现);
  • 利用cache存储apiserver中的部分对象,通过对象类型进行制定,并在cache中采用Namespace做对象的索引
  • 先通过apiserver的api将对象的全量列举出来存储在cache中,然后再watch资源,一旦有变化就更新cache中;
  • 更新到cache中的过程通过DeltaFIFO实现的有顺序的更新,因为资源状态是通过全量+增量方式实现同步的,所以顺序错误会造成状态不一致;
  • 使用者可以注册回调函数(类似挂钩子),在更新到cache的同时通知使用者处理,为了保证回调处理不被某一个处理器阻塞,SharedInformer实现了processorListener异步缓冲处理;
  • 真个过程是Controller是发动机,驱动整个流程运转;
image
image

图片来源

Controller 之 SharedInformerFactory

SharedInformerFactory 是 SharedInformer的工厂类,用于简化生产各种SharedInformer

具体的 SharedIndexInformer 构建一般都是代码自动生成

// 代码源自client-go/informers/internalinterfaces/factory_interfaces.go 
type SharedInformerFactory interface {
    // 核心逻辑函数,类似于很多类的Run()函数
    Start(stopCh <-chan struct{})
    // 这个很关键,通过对象类型,返回SharedIndexInformer,这个SharedIndexInformer管理的就是指定的对象
    // NewInformerFunc用于当SharedInformerFactory没有这个类型的Informer的时候创建使用
    InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
// 创建Informer的函数定义,这个函数需要apiserver的客户端以及同步周期,这个同步周期在SharedInformers反复提到
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer


// 代码源自client-go/informers/factory.go
// 其实sharedInformerFactory的Start()函数就是启动所有具体类型的Informer的过程
// 因为每个类型的Informer都是SharedIndexInformer,需要需要把每个SharedIndexInformer都要启动起来
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    // 加锁操作
    f.lock.Lock()
    defer f.lock.Unlock()
    // 遍历informers这个map
    for informerType, informer := range f.informers {
        // 看看这个Informer是否已经启动过
        if !f.startedInformers[informerType] {
            // 如果没启动过,那就启动一个协程执行SharedIndexInformer的Run()函数,我们在分析SharedIndexInformer的时候
            // 我们知道知道Run()是整个Informer的启动入口点,看了《深入浅出kubernetes之client-go的SharedInformer》
            // 的同学应该会想Run()是谁调用的呢?这里面应该给你们答案了吧?
            go informer.Run(stopCh)
            // 设置Informer已经启动的标记
            f.startedInformers[informerType] = true
        }
    }
}

SharedInformerFactory一共有两类用户参考自

  • 通过Informer获取信息的人,比如kube-controller-manager,这类用户通过调用Core()、Events()、Storage()这类的接口获取各个Informer分组,使用者通过Informer就可以获取信息,这个后面会有章节介绍;
  • 向SharedInformerFactory里面注册Informer的人,比如PodInformer,这类用户是通过调用类似Core()这些接口而被动触发形成的,他们肯定知道怎么创建自己,由他们负责把自己注册到SharedInformerFactory里面;

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Controller
    • Controller 之 Indexer
      • Controller 之 deltaFifo, WorkQueue
        • deltaFifo
        • workqueue
      • Controller 之 SharedInformer
        • Reflector
        • Controller 和 IndexerInformer/Informer
        • SharedInformer
      • Controller 之 SharedInformerFactory
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档