前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >K8s源码分析(26)-Queue组件和DeltaFIFO组件

K8s源码分析(26)-Queue组件和DeltaFIFO组件

作者头像
TA码字
发布2022-10-30 13:27:15
3320
发布2022-10-30 13:27:15
举报
文章被收录于专栏:TA码字TA码字

上一篇文章里,我们主要介绍了和对象存储相关的组件 Store 接口以及它的实现结构体 cache,本质上说该接口和它的实现是对以前文章中介绍的 ThreadSafeStore 接口和它具体实现的更高级抽象,即 ThreadSafeStore 接口的操作需要针对资源对象以及对象的 key, 而 Store 接口有能力获取资源对象的 key, 所以该接口只针对资源对象操作。当然,两种组件针对资源对象的操作在底层上都是并发安全的。本篇文章中我们主要来介绍 Queue 和 DeltaFIFO 组件 ,也是资源对象存储组件。

Queue 接口

Queue 是接口,图解和源码如下:

代码语言:javascript
复制
//staging/src/k8s.io/client-go/tools/cache/fifo.go
type Queue interface {
  Store

  Pop(PopProcessFunc) (interface{}, error)

  AddIfNotPresent(interface{}) error

  HasSynced() bool

  Close()
}
  • 该接口是对于上一篇文章中介绍的 Store 接口的扩展,当然本质上也是并发安全的,因为 Store 就是并发安全的。
  • 该接口中扩展了一系列额外方法,例如 Pop/AddIfNotPresent/HasSynced/Close 等等,其中的 Pop 方法就是典型的 Queue 数据结构相关操作。

Delta 结构体

Delta 结构体定义资源对象的创建,更新,删除等操作的元数据信息,图解和源码如下:

代码语言:javascript
复制
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string

type Delta struct {
  Type   DeltaType
  Object interface{}
}

type Deltas []Delta

const (
  Added   DeltaType = "Added"
  Updated DeltaType = "Updated"
  Deleted DeltaType = "Deleted"
  Replaced DeltaType = "Replaced"
  Sync DeltaType = "Sync"
)
  • Delta 结构体定义中封装了 DelteType 和 Object,代表针对资源对象的操作类型以及资源对象本身。
  • Added/Updated/Deleted/Replaced/Sync 等针对资源的不同操作类型被定义在 DelteType 中,其本质上就是 String 的别名。
  • 从上面图解中看,一个资源对象会有其对应的 key,而针对同一个资源对象的操作可能会有多个,例如创建,更新,删除等等, 所以一个 key 会对应多个 Delta 对象。

DeltaFIFO 结构体

DeltaFIFI 结构体实现了上面介绍的 Queue 接口,针对的元素都是 Delta 类型的对象,其图解和源码如下:

代码语言:javascript
复制
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
  lock sync.RWMutex
  cond sync.Cond

  items map[string]Deltas

  queue []string

  populated bool

  initialPopulationCount int

  keyFunc KeyFunc

  knownObjects KeyListerGetter

  closed bool

  emitDeltaTypeReplaced bool
}

func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error){...}
func (f *DeltaFIFO) Add(obj interface{}) error{...}
func (f *DeltaFIFO) Update(obj interface{}) error{...}
func (f *DeltaFIFO) Delete(obj interface{}){...}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error{...} 
......//other methods impl defined in Store interface
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  f.lock.Lock()
  defer f.lock.Unlock()
  for {
    for len(f.queue) == 0 {
      if f.closed {
        return nil, ErrFIFOClosed
      }

      f.cond.Wait()
    }
    id := f.queue[0]
    f.queue = f.queue[1:]
    depth := len(f.queue)
    if f.initialPopulationCount > 0 {
      f.initialPopulationCount--
    }
    item, ok := f.items[id]
    if !ok {
      klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
      continue
    }
    delete(f.items, id)
    if depth > 10 {
      trace := utiltrace.New("DeltaFIFO Pop Process",
        utiltrace.Field{Key: "ID", Value: id},
        utiltrace.Field{Key: "Depth", Value: depth},
        utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
      defer trace.LogIfLong(100 * time.Millisecond)
    }
    err := process(item)
    if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item)
      err = e.Err
    }
    return item, err
  }
}
  • 该结构体中有 map[string]Deltas 类型的属性,针对的元素都是 Deltas 类型,所以本质上该结构体就是存储资源对象操作元数据的一个队列。
  • 该结构体实现了上面介绍的 Queue 接口。
  • 在 Pop 方法的实现中,我们看到的就是典型队列的列操作定义,取出第一个元素,然后用传来的 PopProcessFunc 处理该元素。

DeltaFIFO 结构体的创建

DeltaFIFI 结构体创建的源码如下:

代码语言:javascript
复制
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFOOptions struct {

  KeyFunction KeyFunc

  KnownObjects KeyListerGetter

  EmitDeltaTypeReplaced bool
}

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
  return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KeyFunction:  keyFunc,
    KnownObjects: knownObjects,
  })
}

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
  if opts.KeyFunction == nil {
    opts.KeyFunction = MetaNamespaceKeyFunc
  }

  f := &DeltaFIFO{
    items:        map[string]Deltas{},
    queue:        []string{},
    keyFunc:      opts.KeyFunction,
    knownObjects: opts.KnownObjects,

    emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
  }
  f.cond.L = &f.lock
  return f
}

// k8s.io/client-go/tools/cache/store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
  if key, ok := obj.(ExplicitKey); ok {
    return string(key), nil
  }
  meta, err := meta.Accessor(obj)
  if err != nil {
    return "", fmt.Errorf("object has no meta: %v", err)
  }
  if len(meta.GetNamespace()) > 0 {
    return meta.GetNamespace() + "/" + meta.GetName(), nil
  }
  return meta.GetName(), nil
}
  • 结构体 DeltaFIFOOptions 封装定义了创建 DeltaFIFO 对象所需要的一些关键参数,例如 KeyFunc 等。
  • DeltaFIFO 可以通过 DeltaFIFOOptions 来创建,也可以直接用相关参数进行,例如直接传入 KeyFunc 和 KeyListerGetter。
  • 对于没有指定 KeyFunc 的时候,默认会使用 MetaNamespaceKeyFunc 来作为资源对象的 key 生成函数。
  • 在 MetaNamespaceKeyFunc 的定义中,如果资源是基于 namespace 的,那么 key 为 {namespace}/{resource-name} 。如果资源对象不是基于 namespace 的,那么 key 的值为

目前我们先写到这里,在下一篇文章中我们继续来介绍 kubernetes 资源对象的 list and watch 机制。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-09-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TA码字 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档