前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【笔记】Operator课程(1-6)

【笔记】Operator课程(1-6)

作者头像
Yuyy
发布2023-03-27 09:12:38
2760
发布2023-03-27 09:12:38
举报

白丁云原生:k8s编程-operator篇【已完结】

clientgo架构

image-20230326150144483
image-20230326150144483
  1. 以前以为client-go只是通过restclient去调用api server,其实不然,它还具备controller的逻辑,以实现更强大的k8s操作。

RESTClient原理

  1. 创建了所有版本的client,例如corev1,体现k8s向后兼容,只要正式版发布的接口不会删除,beta预发布的会在几个迭代后删除
  2. restclient就是httpclient,可用来直接调用k8s api的url
  3. clientset封装了对k8s资源的操作

Reflector原理

通过listwatch从api server获取资源列表,并监听其变化

通过debug定位监听阻塞的地方,在go标准库里,json stream。具体方法未能定位到

获取变更事件后,调用了store接口,保存数据。例如delta fifo queue

代码语言:javascript
复制
switch event.Type {
        case watch.Added:
            err := store.Add(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
            }
        case watch.Modified:
            err := store.Update(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
            }
        case watch.Deleted:
            // TODO: Will any consumers need access to the "last known
            // state", which is passed in event.Object? If so, may need
            // to change this.
            err := store.Delete(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
            }

resource version标识k8s资源版本,体现变更,watch的时候用到

DeltaFIFO原理

实现store接口,记录历史操作

代码语言:javascript
复制
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}

其他实现类

ExpirationCache

  1. 过期原理:保存的结构体里包括时间戳
代码语言:javascript
复制
c.cacheStorage.Add(key, &TimestampedEntry{obj, c.clock.Now(), key})
  1. 相关知识:redis清理过期key

UndeltaStore

  1. 数据变更时,推送保存的所有数据
代码语言:javascript
复制
func (u *UndeltaStore) Add(obj interface{}) error {
    if err := u.Store.Add(obj); err != nil {
        return err
    }
    u.PushFunc(u.Store.List())
    return nil
}

func (u *UndeltaStore) Update(obj interface{}) error {
    if err := u.Store.Update(obj); err != nil {
        return err
    }
    u.PushFunc(u.Store.List())
    return nil
}
  1. 代理模式

FIFO

  1. 和delta fifo的区别是没有保存delta(历史操作)

使用数组加map,实现有序+O(1)读取,空间换时间

代码语言:javascript
复制
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]Deltas
queue []string
...
}

map保存了delta数组,也就是操作记录(命令模式)

代码语言:javascript
复制
type DeltaType string

const (
Added   DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced"
Sync DeltaType = "Sync"
)

type Delta struct {
Type   DeltaType
Object interface{}
}

sync.Cond实现阻塞队列

记录历史操作

代码语言:javascript
复制
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
    return KeyError{obj, err}
}
...

oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)

if len(newDeltas) > 0 {
    if _, exists := f.items[id]; !exists {
        f.queue = append(f.queue, id)
    }
    f.items[id] = newDeltas
    f.cond.Broadcast()
} 
...

取出历史记录并处理

代码语言:javascript
复制
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.closed {
                return nil, ErrFIFOClosed
            }

      // 阻塞获取
            f.cond.Wait()
        }
        isInInitialList := !f.hasSynced_locked()
        id := f.queue[0]
        f.queue = f.queue[1:]
        depth := len(f.queue)
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // This should never happen
            klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
            continue
        }
        delete(f.items, id)
        // Only log traces if the queue depth is greater than 10 and it takes more than
        // 100 milliseconds to process one item from the queue.
        // Queue depth never goes high because processing an item is locking the queue,
        // and new items can't be added until processing finish.
        // https://github.com/kubernetes/kubernetes/issues/103789
        if depth > 10 {
            trace := utiltrace.New("DeltaFIFO Pop Process",
                utiltrace.Field{Key: "ID", Value: id},
                utiltrace.Field{Key: "Depth", Value: depth},
                utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
            defer trace.LogIfLong(100 * time.Millisecond)
        }

    // 处理
        err := process(item, isInInitialList)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-3-26 1,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • clientgo架构
  • RESTClient原理
  • Reflector原理
  • DeltaFIFO原理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档