前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kubernetes 源码学习之延时队列

Kubernetes 源码学习之延时队列

作者头像
我是阳明
发布2020-09-22 15:52:27
1.2K0
发布2020-09-22 15:52:27
举报
文章被收录于专栏:k8s技术圈k8s技术圈

client-go 中的 workqueue,类似于 golang 语言中的 channel,主要用于并发程序之间的数据同步。Kubernetes 的控制器模型通过 client-go 的 informer watch 资源变化,当资源发生变化时会通过回调函数将资源写入队列,由控制器中的消费者完成业务处理。

延时队列

client-go 中实现了多种队列,包括通用队列、延时队列、限速队列,本文首先介绍延时队列的实现。延时队列是在通用队列基础上进行扩展的,因为它本质还是一个队列,只是加了一个新的函数来进行延迟,对应的接口定义如下所示:

代码语言:javascript
复制
// k8s.io/client-go/util/workqueue/queue.go

// 通用队列接口定义
type Interface interface {
 Add(item interface{})  // 向队列中添加一个元素
 Len() int  // 获取队列长度
 Get() (item interface{}, shutdown bool)  // 获取队列头部的元素,第二个返回值表示队列是否已经关闭
 Done(item interface{})  // 标记队列中元素已经处理完
 ShutDown()  // 关闭队列
 ShuttingDown() bool  // 查询队列是否正在关闭
}

// k8s.io/client-go/util/workqueue/delaying_queue.go

// DelayingInterface 是一个延时队列,可以在以后的时间来添加元素的接口
// 这使得它更容易在处理失败后重新入队列,而不至于陷入 hot-loop
type DelayingInterface interface {
  // 扩展通用队列
 Interface
 // 在指定的时间后将元素添加到工作队列中
 AddAfter(item interface{}, duration time.Duration)
}

延时队列的定义很简单,就是增加了一个函数来实现元素的延迟添加而已,接下来我们继续来查看该接口的具体实现方式:

代码语言:javascript
复制
// k8s.io/client-go/util/workqueue/delaying_queue.go

// delayingType 包装了 Interface 通用接口,并提供了延迟重新入队列
type delayingType struct {
 Interface  // 一个通用队列

 // 时钟用于跟踪延迟触发的时间
 clock clock.Clock

 // 关闭信号
 stopCh chan struct{}
 // 用来保证只发出一次关闭信号
 stopOnce sync.Once

 // 在触发之前确保我们等待的时间不超过 maxWait
 heartbeat clock.Ticker

 // waitingForAddCh 是一个 buffered channel,提供了一个缓冲通道
  // 延迟添加的元素封装成 waitFor 放到 channel 中
 waitingForAddCh chan *waitFor

 // 记录重试的次数
 metrics retryMetrics
}

// waitFor 持有要添加的数据和应该添加的时间
type waitFor struct {
 data    t  // 添加的元素数据
 readyAt time.Time  // 在什么时间添加到队列中
 index int  // 优先级队列(heap)中的索引
}

在延时队列的实现 delayingType 结构体中包含一个通用队列 Interface 的实现,然后最重要的一个属性就是 waitingForAddCh,这是一个 buffered channel,将延迟添加的元素封装成 waitFor 放到通道中,意思就是当到了指定的时间后就将元素添加到通用队列中去进行处理,还没有到时间的话就放到这个缓冲通道中。要了解是如何实现延时队列的我们还需要了解另外一个数据结构,那就是 waitForPriorityQueue

代码语言:javascript
复制
// k8s.io/client-go/util/workqueue/delaying_queue.go

// waitForPriorityQueue 为 waitFor 的元素集合实现了一个优先级队列
// 把需要延迟的元素放到一个队列中,然后在队列中按照元素的延时添加时间(readyAt)从小到大排序
// 其实这个优先级队列就是实现的 golang 中内置的 container/heap/heap.go 中的 Interface 接口
// 最终实现的队列就是 waitForPriorityQueue 这个集合是有序的,按照时间从小到大进行排列
type waitForPriorityQueue []*waitFor

waitForPriorityQueue 是一个有序的 waitFor 的集合,按照添加的时间从小到大进行排列,这就形成了一个优先队列

优先队列

其实这个优先队列是 golang 中内置的 container/heap/heap.go 文件中的 Interface 接口(常说的数据结构堆)的一个实现,我们要想实现自己的队列也完全可以去实现这个接口即可:

代码语言:javascript
复制
// $GOROOT/src/container/heap/heap.go

// 堆接口定义
// 注意:这个接口中的 Push 和 Pop 函数是给 heap 包的实现调用的
// 任何实现了本接口的类型都可以用于构建小顶堆
// 小顶堆可以通过 heap.Init 建立,数据是递增顺序或者空的话也是最小堆
// 小顶堆的约束条件是:
// !h.Less(j, i) for 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len()

// 要从堆中添加和删除元素,可以直接使用 heap.Push 和 heap.Pop 函数。
type Interface interface {
 sort.Interface  // 扩展排序接口
 Push(x interface{}) 
 Pop() interface{}   
}

// Init 初始化一个堆
// 使用任何堆操作之前应先初始化,Init 函数对于堆的约束性是幂等的,并可能在任何时候堆的约束性被破坏时被调用
// 本函数复杂度为O(n),其中n等于h.Len()。
func Init(h Interface) {
 // 构建堆
 n := h.Len()
 for i := n/2 - 1; i >= 0; i-- {
  down(h, i, n)
 }
}

// Push 将元素 x 添加到堆上
// 复杂度为 O(log n),其中 n = h.Len()
func Push(h Interface, x interface{}) {
 h.Push(x)
  // 要保证堆的结构,所以添加进来的元素要重新调整
  // 元素添加到最后,然后不断上浮,因为要满足任一节点的值要小于左右子树的值
 up(h, h.Len()-1)
}

// 将元素 j 重新排到正确的位置
func up(h Interface, j int) {
 for {
  i := (j - 1) / 2 // 父节点的索引
    // 如果 j 就是父节点或者 j >= i(父节点元素 < 子节点元素)那就不用调整了
  if i == j || !h.Less(j, i) {
   break
  }
    // 父节点元素 > j 的元素,就交换二者
  h.Swap(i, j)
  j = i
 }
}

// Pop 从堆中移除并返回最小元素(根据 Less)
// 复杂度为 O(log n),其中 n = h.Len()
// Pop 相当于 Remove(h, 0)
func Pop(h Interface) interface{} {
 n := h.Len() - 1
  // 将最后一个元素填充到堆顶,Pop 是弹出堆顶的元素
 h.Swap(0, n)
  // 然后不断的下沉这个元素
 down(h, 0, n)
  // 调用外部的实现者,h.Pop() 实现中会删除最后一个元素
 return h.Pop()
}

func down(h Interface, i0, n int) bool {
 i := i0  // 父节点索引
 for {
  j1 := 2*i + 1  // 左子节点的索引
  if j1 >= n || j1 < 0 { // 下沉到最后一个节点就不处理了
   break
  }
  j := j1  // 左子节点索引
  if j2 := j1 + 1; j2 < n && h.Less(j2, j1) {
   j = j2 // = 2*i + 2  // 右子节点索引
  }
    // 子节点 >= 父节点索引,那就不用处理了
  if !h.Less(j, i) {
   break
  }
    // 子节点 < 父节点,则交换父节点和子节点
  h.Swap(i, j)
  i = j
 }
 return i > i0
}

// 重新调整结构
func Fix(h Interface, i int) {
 if !down(h, i, h.Len()) {
  up(h, i)
 }
}

// $GOROOT/src/sort/sort.go

// 排序的接口定义
type Interface interface {
 // 集合元素大小
 Len() int
 // 比较索引 i 和 j 位置的元素大小
 Less(i, j int) bool
 // 交换索引 i 和 j 位置的元素
 Swap(i, j int)
}

堆是一种经过排序的完全二叉树,其中任一非终端节点的数据值均不大于(或不小于)其左孩子和右孩子节点的值。golang 中内置的堆是小顶堆(最小堆),任一节点的值是其子树所有结点的最小值:

堆又被称为优先队列,尽管名为优先队列,但堆并不是队列。因为队列中允许的操作是先进先出(FIFO),在队尾插入元素,在队头取出元素。而堆虽然在堆底插入元素,在堆顶取出元素,但是堆中元素的排列不是按照到来的先后顺序,而是按照一定的优先顺序排列的。

下图是插入一个元素的示意图:

下图是从堆中删除一个元素的示意图:

延时队列实现

接下来我们来看下 waitForPriorityQueue 是如何实现这个优先队列的:

代码语言:javascript
复制
// k8s.io/client-go/util/workqueue/delaying_queue.go

// 获取队列长度,pq 就是一个 waitFor 集合,直接返回长度即可
func (pq waitForPriorityQueue) Len() int {
 return len(pq)
}

// 判断索引 i 和 j 上的元素大小
func (pq waitForPriorityQueue) Less(i, j int) bool {
  // 根据时间先后顺序来决定先后顺序
  // i 位置的元素时间在 j 之前,则证明索引 i 的元素小于索引 j 的元素
 return pq[i].readyAt.Before(pq[j].readyAt)
}

// 交换索引 i 和 j 的元素
func (pq waitForPriorityQueue) Swap(i, j int) {
  // 交换元素
 pq[i], pq[j] = pq[j], pq[i]
  // 更新元素里面的索引信息
 pq[i].index = i
 pq[j].index = j
}

// 添加元素到队列中
// 要注意不应该直接调用 Push 函数,而应该使用 `heap.Push`
func (pq *waitForPriorityQueue) Push(x interface{}) {
 n := len(*pq)
 item := x.(*waitFor)
 item.index = n
 *pq = append(*pq, item)
}

// 从队列中弹出最后一个元素
// 要注意不应该直接调用 Pop 函数,而应该使用 `heap.Pop`
func (pq *waitForPriorityQueue) Pop() interface{} {
 n := len(*pq)
 item := (*pq)[n-1]
 item.index = -1
 *pq = (*pq)[0:(n - 1)]
 return item
}

// 直接获取队列开头的元素,不会删除元素或改变队列
func (pq waitForPriorityQueue) Peek() interface{} {
 return pq[0]
}

上面就是 waitForPriorityQueue 这个优先队列的实现,接下来我们就来分析延时队列的具体实现了,因为延时队列集成通用队列,所以这里只对新增的函数做说明:

代码语言:javascript
复制
// k8s.io/client-go/util/workqueue/delaying_queue.go

// 在指定的延迟时间之后将元素 item 添加到队列中
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
 // 如果队列关闭了则直接退出
 if q.ShuttingDown() {
  return
 }

 q.metrics.retry()

 // 如果延迟的时间<=0,则相当于通用队列一样添加元素
 if duration <= 0 {
  q.Add(item)
  return
 }
 
  // select 没有 default case,所以可能会被阻塞
 select {
  // 如果调用了 ShutDown() 则解除阻塞
 case <-q.stopCh:
  // 把元素封装成 waitFor 传给 waitingForAddCh
 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
 }
}

AddAfter 的函数实现比较简单,就是把元素和添加的时间封装成一个 waitFor 对象,然后发送给 waitingForAddCh 通道,所以具体怎么添加的元素需要查看如何从这个通道消费数据的地方,也就是 waitingLoop 函数,这个函数在实例化 DelayingInterface 后就用一个单独的协程启动了:

代码语言:javascript
复制
// k8s.io/client-go/util/workqueue/delaying_queue.go

// waitingLoop 一直运行直到工作队列关闭为止
// 并对要添加的元素列表进行检查
func (q *delayingType) waitingLoop() {
 defer utilruntime.HandleCrash()

 // 创建一个占位符通道,当列表中没有元素的时候利用这个变量实现长时间等待
 never := make(<-chan time.Time)

 // 构造一个定时器,当等待队列头部的元素准备好时,该定时器就会失效
 var nextReadyAtTimer clock.Timer
 
  // 构造一个优先级队列
 waitingForQueue := &waitForPriorityQueue{}
  // 构造小顶堆结构
 heap.Init(waitingForQueue)
 
  // 用来避免元素重复添加,如果重复添加了就只更新时间
 waitingEntryByData := map[t]*waitFor{}
  
  // 死循环
 for {
    // 队列如果关闭了,则直接退出
  if q.Interface.ShuttingDown() {
   return
  }
    
    // 获取当前时间
  now := q.clock.Now()

  // 如果优先队列中有元素的话
  for waitingForQueue.Len() > 0 {
      // 获取第一个元素
   entry := waitingForQueue.Peek().(*waitFor)
      // 如果第一个元素指定的时间还没到时间,则跳出循环
      // 因为第一个元素是时间最小的
   if entry.readyAt.After(now) {
    break
   }

      // 时间已经过了,那就把它从优先队列中拿出来放入通用队列中
      // 同时要把元素从上面提到的 map 中删除,因为不用再判断重复添加了
   entry = heap.Pop(waitingForQueue).(*waitFor)
   q.Add(entry.data)
   delete(waitingEntryByData, entry.data)
  }

  nextReadyAt := never
    // 如果优先队列中还有元素,那就用第一个元素指定的时间减去当前时间作为等待时间
    // 因为优先队列是用时间排序的,后面的元素需要等待的时间更长,所以先处理排序靠前面的元素
  if waitingForQueue.Len() > 0 {
   if nextReadyAtTimer != nil {
    nextReadyAtTimer.Stop()
   }
      // 获取第一个元素
   entry := waitingForQueue.Peek().(*waitFor)
      // 第一个元素的时间减去当前时间作为等待时间
   nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
   nextReadyAt = nextReadyAtTimer.C()
  }

  select {
    // 退出信号
  case <-q.stopCh:
   return

    // 定时器,每过一段时间没有任何数据,那就再执行一次大循环
  case <-q.heartbeat.C():

    // 上面的等待时间信号,时间到了就有信号
    // 激活这个case,然后继续循环,添加准备好了的元素
  case <-nextReadyAt:
    
    // AddAfter 函数中放入到通道中的元素,这里从通道中获取数据
  case waitEntry := <-q.waitingForAddCh:
      // 如果时间已经过了就直接放入通用队列,没过就插入到有序队列
   if waitEntry.readyAt.After(q.clock.Now()) {
    insert(waitingForQueue, waitingEntryByData, waitEntry)
   } else {
        // 放入通用队列
    q.Add(waitEntry.data)
   }
      // 下面就是把channel里面的元素全部取出来
      // 如果没有数据了就直接退出
   drained := false
   for !drained {
    select {
    case waitEntry := <-q.waitingForAddCh:
     if waitEntry.readyAt.After(q.clock.Now()) {
      insert(waitingForQueue, waitingEntryByData, waitEntry)
     } else {
      q.Add(waitEntry.data)
     }
    default:
     drained = true
    }
   }
  }
 }
}

// 插入元素到有序队列,如果已经存在了则更新时间
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
 // 查看元素是否已经存在
 existing, exists := knownEntries[entry.data]
 if exists {
    // 元素存在,就比较谁的时间靠后就用谁的时间
  if existing.readyAt.After(entry.readyAt) {
   existing.readyAt = entry.readyAt
      // 时间变了需要重新调整优先级队列
   heap.Fix(q, existing.index)
  }
  return
 }
  
  // 把元素放入有序队列中
 heap.Push(q, entry)
  // 并记录在上面的 map 里面,用于判断是否存在
 knownEntries[entry.data] = entry
}

到这里延时队列核心代码就分析完了,其实实现的原理很简单,既然是延时队列那肯定就有元素执行的时间,根据这个时间的先后顺序来构造一个优先级队列,时间到了的话就把这个元素放到通用队列中去进行正常的处理就行。

所以核心重点就是优先队列的实现,而这里使用的优先队列是 golang 内置的 heap 接口实现,所以归根结底底层都是数据结构与算法的运用


本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 k8s技术圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 延时队列
  • 优先队列
  • 延时队列实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档