前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >client-go 源码分析(8) - workerqueue之延时队列DelayingQueue

client-go 源码分析(8) - workerqueue之延时队列DelayingQueue

作者头像
后端云
发布2023-02-10 14:19:51
4550
发布2023-02-10 14:19:51
举报
文章被收录于专栏:后端云

延时队列DelayingQueue,从下面的接口可以看出添加的元素,有一个延迟时间,延时时间到了之后才能加入队列。

延迟队列的实现是,根据加入队列的时间,构造一个最小堆min-heap,然后到时间点后,将从最小堆pop一个元素加入queue中(因为最小堆是按照延时时间从小到大排序的)。

代码语言:javascript
复制
type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}

type delayingType struct {
	Interface

	// clock tracks time for delayed firing
	clock clock.Clock

	// stopCh lets us signal a shutdown to the waiting loop
	stopCh chan struct{}
	// stopOnce guarantees we only signal shutdown a single time
	stopOnce sync.Once

	// heartbeat ensures we wait no more than maxWait before firing
	heartbeat clock.Ticker

	// waitingForAddCh is a buffered channel that feeds waitingForAdd
	waitingForAddCh chan *waitFor

	// metrics counts the number of retries
	metrics retryMetrics
}

延迟队列的实现用到了很多概念和数据结构,需要搞清楚这些概念和数据结构才能理解相关代码,且在判断时间点是否到达,用到了最小堆机制,心跳机制和channel机制。

waitFor结构体保存了要加入队列的数据对象,加入队列的时间,waitFor最为最小堆item的堆上的index。

代码语言:javascript
复制
type waitFor struct {
   data    t          // 准备添加到队列中的数据
   readyAt time.Time  // 应该被加入队列的时间
   index int          // 在 heap 中的索引
}

下面代码实现了堆heap的接口的5个方法。堆heap的元素item就是上面的waitFor结构体。

代码语言:javascript
复制
type waitForPriorityQueue []*waitFor

func (pq waitForPriorityQueue) Len() int {
	return len(pq)
}
func (pq waitForPriorityQueue) Less(i, j int) bool {
	return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}
func (pq *waitForPriorityQueue) Push(x interface{}) {
	n := len(*pq)
	item := x.(*waitFor)
	item.index = n
	*pq = append(*pq, item)
}
func (pq *waitForPriorityQueue) Pop() interface{} {
	n := len(*pq)
	item := (*pq)[n-1]
	item.index = -1
	*pq = (*pq)[0:(n - 1)]
	return item
}
// 返回第一个元素,非heap接口的实现方法
func (pq waitForPriorityQueue) Peek() interface{} {
    return pq[0]
}

延时队列DelayingQueue的核心就是运行 waitingLoop方法。

代码语言:javascript
复制
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
	ret := &delayingType{
		Interface:       q,
		clock:           clock,
		heartbeat:       clock.NewTicker(maxWait),
		stopCh:          make(chan struct{}),
		waitingForAddCh: make(chan *waitFor, 1000),
		metrics:         newRetryMetrics(name),
	}

	go ret.waitingLoop()
	return ret
}

AddAfter方法是对DelayingInterface接口的实现。AddAfter方法是在给定延迟后将给定项目添加到工作队列。具体是通过将两个入参组装成waitFor结构体 &waitFor{data: item, readyAt: q.clock.Now().Add(duration)} 放入到channel waitingForAddCh: make(chan *waitFor, 1000) 中去。(最大可以缓存1000个 &waitForm items)

代码语言:javascript
复制
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {
		return
	}

	q.metrics.retry()

	// immediately add things with no delay
	if duration <= 0 {
		q.Add(item)
		return
	}

	select {
	case <-q.stopCh:
		// unblock if ShutDown() is called
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
	}
}

waitingLoop方法,随着delayingType实例的创建而启动,并一直运行下去直到workqueue被shutdown。waitingLoop方法一直在做的事情就是 不停的将上面的 AddAfter方法 放进 q.waitingForAddCh channel的item取出来,根据item的时间是早于现在还是晚于现在,早于现在就加入工作队列,晚于现在就放到heap上。并不断的从heap pop出第一个item,检测item的到期时间,根据item的时间是早于现在还是晚于现在,早于现在就加入工作队列,晚于现在啥也不做,item继续保留在heap上。

代码语言:javascript
复制
func (q *delayingType) waitingLoop() {
	defer utilruntime.HandleCrash()

	// Make a placeholder channel to use when there are no items in our list
	never := make(<-chan time.Time)

	// Make a timer that expires when the item at the head of the waiting queue is ready
	var nextReadyAtTimer clock.Timer

	waitingForQueue := &waitForPriorityQueue{}
	heap.Init(waitingForQueue)

	waitingEntryByData := map[t]*waitFor{}

	for {
		if q.Interface.ShuttingDown() {
			return
		}

		now := q.clock.Now()

		// Add ready entries
		for waitingForQueue.Len() > 0 {
			entry := waitingForQueue.Peek().(*waitFor)
			if entry.readyAt.After(now) {
				break
			}

			entry = heap.Pop(waitingForQueue).(*waitFor)
			q.Add(entry.data)
			delete(waitingEntryByData, entry.data)
		}

		// Set up a wait for the first item's readyAt (if one exists)
		nextReadyAt := never
		if waitingForQueue.Len() > 0 {
			if nextReadyAtTimer != nil {
				nextReadyAtTimer.Stop()
			}
			entry := waitingForQueue.Peek().(*waitFor)
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
			nextReadyAt = nextReadyAtTimer.C()
		}

		select {
		case <-q.stopCh:
			return

		case <-q.heartbeat.C():
			// continue the loop, which will add ready items

		case <-nextReadyAt:
			// continue the loop, which will add ready items

		case waitEntry := <-q.waitingForAddCh:
			if waitEntry.readyAt.After(q.clock.Now()) {
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
				q.Add(waitEntry.data)
			}

			drained := false
			for !drained {
				select {
				case waitEntry := <-q.waitingForAddCh:
					if waitEntry.readyAt.After(q.clock.Now()) {
						insert(waitingForQueue, waitingEntryByData, waitEntry)
					} else {
						q.Add(waitEntry.data)
					}
				default:
					drained = true
				}
			}
		}
	}
}

上面的代码中的select方法,满足心跳时间 或者 pop后的heap的第一个元素的时间已经到了 或者q.waitingForAddCh channel有数据,就进入下一次的for循环。

其中,从q.waitingForAddCh取出数据后,根据item的到期时间,决定是放入堆中(item的到期时间晚于现在的时间),还是放入工作队列(item的到期时间早于现在的时间)。本次的放入工作队列不同于上面几行的放入工作队列的代码,区别是上次是从堆里拿出的item,这次是从channel拿出的item跳过了放入堆的过程直接放入工作队列。(因为item的到期时间已经晚于现在的时间,没必要放投入堆里进行排序了,提高执行效率,避免做无用功)

insert方法往heap添加元素,分两种情况。若元素存在则update,若不存在,push该元素到heap中,并将入参的 knownEntries(即waitingLoop方法的waitingEntryByData) set集合添加该元素的值,为了保证不重复。

代码语言:javascript
复制
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
	// if the entry already exists, update the time only if it would cause the item to be queued sooner
	existing, exists := knownEntries[entry.data]
	if exists {
		if existing.readyAt.After(entry.readyAt) {
			existing.readyAt = entry.readyAt
			heap.Fix(q, existing.index)
		}

		return
	}

	heap.Push(q, entry)
	knownEntries[entry.data] = entry
}

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-12-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端云 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档