专栏首页四颗咖啡豆k8s-client-go源码剖析(三)
原创

k8s-client-go源码剖析(三)

云原生社区活动---Kubernetes源码剖析第一期第三周作业, 也是最后一周作业.

本文主要讲述下client-go中workqueue, 看一下client-go的一个整体数据走向.如下图:

而workqueue主要是在listener这里引用,listener使用chan获取到数据之后将数据放入到工作队列进行处理。主要是由于chan过于简单,已经无法满足K8S的场景,所以衍生出了workqueue,

特性


  1. 有序
  2. 去重
  3. 并发
  4. 延迟处理
  5. 限速

当前有三种workqueue


  1. 基本队列
  2. 延迟队列
  3. 限速队列

其中延迟队列是基于基本队列实现的,而限流队列基于延迟队列实现

基本队列


看一下基本队列的接口

// client-go源码路径util/workqueue/queue.go
type Interface interface {
	//新增元素 可以是任意对象
	Add(item interface{})
	//获取当前队列的长度
	Len() int
	// 阻塞获取头部元素(先入先出)  返回元素以及队列是否关闭
	Get() (item interface{}, shutdown bool)
	// 显示标记完成元素的处理
	Done(item interface{})
	//关闭队列
	ShutDown()
	//队列是否处于关闭状态
	ShuttingDown() bool
}

看一下基本队列的数据结构,只看三个重点处理的,其他的没有展示出来

type Type struct {
	//含有所有元素的元素的队列 保证有序
	queue []t
	//所有需要处理的元素 set是基于map以value为空struct实现的结构,保证去重
	dirty set
	//当前正在处理中的元素
	processing set
	...
}

type empty struct{}
type t interface{}
type set map[t]empty

基本队列的hello world也很简单

 wq := workqueue.New()
	wq.Add("hello")
	v, _ := wq.Get()

基本队列Add


func (q *Type) Add(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	//如果当前处于关闭状态,则不再新增元素
	if q.shuttingDown {
		return
	}
	//如果元素已经在等待处理中,则不再新增
	if q.dirty.has(item) {
		return
	}
	//添加到metrics
	q.metrics.add(item)
	//加入等待处理中
	q.dirty.insert(item)
	//如果目前正在处理该元素 就不将元素添加到队列
	if q.processing.has(item) {
		return
	}
	q.queue = append(q.queue, item)
	q.cond.Signal()
}

基本队列Get


func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	//如果当前没有元素并且不处于关闭状态,则阻塞
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
	...
	item, q.queue = q.queue[0], q.queue[1:]
	q.metrics.get(item)
	//把元素添加到正在处理队列中
	q.processing.insert(item)
	//把队列从等待处理队列中删除
	q.dirty.delete(item)
	return item, false
}

基本队列实例化


func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
	t := &Type{
		clock:                      c,
		dirty:                      set{},
		processing:                 set{},
		cond:                       sync.NewCond(&sync.Mutex{}),
		metrics:                    metrics,
		unfinishedWorkUpdatePeriod: updatePeriod,
	}
        //启动一个协程 定时更新metrics
	go t.updateUnfinishedWorkLoop()
	return t
}

func (q *Type) updateUnfinishedWorkLoop() {
	t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
	defer t.Stop()
	for range t.C() {
		if !func() bool {
			q.cond.L.Lock()
			defer q.cond.L.Unlock()
			if !q.shuttingDown {
				q.metrics.updateUnfinishedWork()
				return true
			}
			return false

		}() {
			return
		}
	}
}

延迟队列


延迟队列的实现思路主要是使用优先队列存放需要延迟添加的元素,每次判断最小延迟的元素书否已经达到了加入队列的要求(延迟的时间到了),如果是则判断下一个元素,直到没有元素或者元素还需要延迟为止。

看一下延迟队列的数据结构

type delayingType struct {
	Interface
        ...
	//放置延迟添加的元素
	waitingForAddCh chan *waitFor
       ...
}

主要是使用chan来保存延迟添加的元素,而具体实现是通过一个实现了一个AddAfter方法,看一下具体的内容

//延迟队列的接口
type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	...
	//如果延迟实现小于等于0 直接添加到队列
	if duration <= 0 {
		q.Add(item)
		return
	}
	select {
	case <-q.stopCh:
	//添加到chan,下面会讲一下这个chan的处理
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
	}
}

延迟元素的处理

func (q *delayingType) waitingLoop() {

	defer utilruntime.HandleCrash()

	never := make(<-chan time.Time)

	var nextReadyAtTimer clock.Timer

	waitingForQueue := &waitForPriorityQueue{}
	//这里是初始化一个优先队列 具体实现有兴趣的同学可以研究下
	heap.Init(waitingForQueue)

	waitingEntryByData := map[t]*waitFor{}

	for {
		if q.Interface.ShuttingDown() {
			return
		}

		now := q.clock.Now()

		// Add ready entries
		for waitingForQueue.Len() > 0 {
			entry := waitingForQueue.Peek().(*waitFor)
			//看一下第一个元素是否已经到达延迟的时间了
			if entry.readyAt.After(now) {
				break
			}

			//时间到了,将元素添加到工作的队列,并且从延迟的元素中移除
			entry = heap.Pop(waitingForQueue).(*waitFor)
			q.Add(entry.data)
			delete(waitingEntryByData, entry.data)
		}

		// Set up a wait for the first item's readyAt (if one exists)
		nextReadyAt := never
		if waitingForQueue.Len() > 0 {
			if nextReadyAtTimer != nil {
				nextReadyAtTimer.Stop()
			}
			//如果还有需要延迟的元素,计算第一个元素的延迟时间(最小延迟的元素)
			entry := waitingForQueue.Peek().(*waitFor)
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
			nextReadyAt = nextReadyAtTimer.C()
		}

		select {
		case <-q.stopCh:
			return
		case <-q.heartbeat.C():
			//定时检查下是否有元素达到延迟的时间
		case <-nextReadyAt:
			//这里是上面计算出来的时间,时间到了,处理到达延迟时间的元素
		case waitEntry := <-q.waitingForAddCh:
			//检查是否需要延迟,如果需要延迟就加入到延迟等待
			if waitEntry.readyAt.After(q.clock.Now()) {
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
				//如果不需要延迟就直接添加到队列
				q.Add(waitEntry.data)
			}

			drained := false
			for !drained {
				select {
				case waitEntry := <-q.waitingForAddCh: 

上面waitingLoop 是在实例化延迟队列的时候调用的,看一下实例化时候的逻辑

func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
	//实例化一个数据结构
	ret := &delayingType{
		Interface:       NewNamed(name),
		clock:           clock,
		heartbeat:       clock.NewTicker(maxWait),
		stopCh:          make(chan struct{}),
		waitingForAddCh: make(chan *waitFor, 1000),
		metrics:         newRetryMetrics(name),
	}

	//放到一个协程中处理延迟元素
	go ret.waitingLoop()

	return ret
}

限速队列


当前限速队列支持4中限速模式

  1. 令牌桶算法限速
  2. 排队指数限速
  3. 计数器模式
  4. 混合模式(多种限速算法同时使用)

限速队列的底层实际上还是通过延迟队列来进行限速,通过计算出元素的限速时间作为延迟时间

来看一下限速接口

type RateLimiter interface {
	//
	When(item interface{}) time.Duration
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
	// or for success, we'll stop tracking it
	Forget(item interface{})
	// NumRequeues returns back how many failures the item has had
	NumRequeues(item interface{}) int
}

看一下限速队列的数据结构

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
	DelayingInterface

	//实际上底层还是调用的延迟队列,通过计算出元素的延迟时间 进行限速
	AddRateLimited(item interface{})

	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})

	// NumRequeues returns back how many times the item was requeued
	NumRequeues(item interface{}) int
}

func (q *rateLimitingType) AddRateLimited(item interface{}) {
         //通过when方法计算延迟加入队列的时间
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

令牌桶算法


client-go中的令牌桶限速是通过 golang.org/x/time/rat包来实现的

可以通过 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 来使用令牌桶限速算法,其中第一个参数qps表示每秒补充多少token,burst表示总token上限为多少。

排队指数算法


排队指数可以通过 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 来使用。

这个算法有两个参数:

  1. baseDelay 基础限速时间
  2. maxDelay 最大限速时间

举个例子来理解一下这个算法,例如快速插入5个相同元素,baseDelay设置为1秒,maxDelay设置为10秒,都在同一个限速期内。第一个元素会在1秒后加入到队列,第二个元素会在2秒后加入到队列,第三个元素会在4秒后加入到队列,第四个元素会在8秒后加入到队列,第五个元素会在10秒后加入到队列(指数计算的结果为16,但是最大值设置了10秒)。

来看一下源码的计算

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	//第一次为0
	exp := r.failures[item]
	//累加1
	r.failures[item] = r.failures[item] + 1

	//通过当前计数和baseDelay计算指数结果  baseDelay*(2的exp次方)
	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
	if backoff > math.MaxInt64 {
		return r.maxDelay
	}

	calculated := time.Duration(backoff)
	if calculated > r.maxDelay {
		return r.maxDelay
	}

	return calculated
}

计数器模式


计数器模式可以通过 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int)来使用,有三个参数

  1. fastDelay 快限速时间
  2. slowDelay 慢限速时间
  3. maxFastAttempts 快限速元素个数

原理是这样的,假设fastDelay设置为1秒,slowDelay设置为10秒,maxFastAttempts设置为3,同样在一个限速周期内快速插入5个相同的元素。前三个元素都是以1秒的限速时间加入到队列,添加第四个元素时开始使用slowDelay限速时间,也就是10秒后加入到队列,后面的元素都将以10秒的限速时间加入到队列,直到限速周期结束。

来看一下源码

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()
	//添加一次就计数一次
	r.failures[item] = r.failures[item] + 1
	//计数小于maxFastAttempts都以fastDelay为限速时间,否则以slowDelay为限速时间
	if r.failures[item] <= r.maxFastAttempts {
		return r.fastDelay
	}
	return r.slowDelay
}

混合模式


最后一种是混合模式,可以组合使用不同的限速算法实例化限速队列

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
	return &MaxOfRateLimiter{limiters: limiters}
}

总结


在k8s-client-go的源码中可以看到,大量的接口组合运用,将各种功能拆分成各个细小的库,是一种非常值得学习的代码风格以及思路。

始发于 四颗咖啡豆,转载请声明出处.

关注公粽号->四颗咖啡豆 获取最新内容

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • k8s-client-go源码剖析(二)

    本周是K8S源码研习社第一期第二周,学习内容是学习Informer机制,本文以这个课题进行展开。

    用户2672162
  • k8s-client-go源码剖析(一)

    有幸参与云原生社区举办的Kubernetes源码剖析活动,活动主要以书籍《Kubernetes源码剖析》为主要思路进行展开,提出在看书过程中遇到的问题,和社区成...

    用户2672162
  • k8s中Apimachinery、Api、Client-go库之间的关系

    使用 k8s 相关 sdk 做二次开发时,经常用到 apimachinery、api、client-go 这三个库,一直对他们的职责不是很清楚,网上也没有找到合...

    kinnylee
  • k8s代码走读---client-go编程交互基础

    代码 clone 地址:https://github.com/kubernetes/client-go。实际上在 kubernetes 的源码中也包含了这部分代...

    黑光技术
  • 使用 client-go 对 Kubernetes 进行自定义开发及源码分析

    注意:这里 Kubernetes 集群搭建使用 Minikube 来完成,Minikube 启动的单节点 k8s Node 实例是需要运行在本机的 VM 虚拟机...

    哎_小羊
  • istio 庖丁解牛(三) galley

    今天我们来解析istio控制面组件Galley. Galley Pod是一个单容器单进程组件, 没有sidecar, 结构独立, 职责明确.

    钟华
  • k8s源码分析------第三方库etcd client分析

    第一时间获取文章,可以关注本人公众号 月牙寂道长 yueyajidaozhang

    月牙寂道长
  • client-go客户端自定义开发Kubernetes及源码分析

    client-go 是一种能够与 Kubernetes 集群通信的客户端,通过它可以对 Kubernetes 集群中各资源类型进行 CRUD 操作,它有三大 c...

    程序员同行者
  • python-k8sclient开发K8S

    Client-go是kubernetes官方发布的调用K8S API的golang语言包,可以用来开发K8S的管理服务、监控服务,配合前端展示,就可以开发出一款...

    菲宇
  • Go Channel 源码剖析

    0. 引言 这篇文章介绍一下 Golang channel 的内部实现,包括 channel 的数据结构以及相关操作的代码实现。代码版本 go1.9rc1,部分...

    李海彬
  • client-go 之 Reflector 源码分析

    前面我们说了 Informer 通过对 APIServer 的资源对象执行 List 和 Watch 操作,把获取到的数据存储在本地的缓存中,其中实现这个的核心...

    我是阳明
  • 【转】Go Interface 源码剖析

    源网址:http://legendtkl.com/2017/07/01/golang-interface-implement/

    lpxxn
  • 英国Monzo银行如何用K8s管理1600个微服务?

    英国数字银行Monzo两位资深工程师Matt Heath和Suhail Patel在伦敦一场研讨会上,分享了如何管理1600个后端微服务的经验。这间设立超过5...

    灵雀云
  • k8s源码分析------第三方库go-restful分析

    第一时间获取文章,可以关注本人公众号 月牙寂道长 yueyajidaozhang

    月牙寂道长
  • 从源码打造云原生时代的「Linux」——Kubernetes

    作为目前云计算领域活跃度最高的项目——Kubernetes在其GitHub上已经提供了如何编译Kubernetes的方法,README中的第一句话是:

    nevermosby
  • k8s master机器文件系统故障的一次恢复过程

    研发反馈他们那边一套集群有台master文件系统损坏无法开机,他们是三台openstack上的虚机,是虚拟化宿主机故障导致的虚机文件系统损坏。三台机器是mast...

    程序猿Damon
  • kubernetes 中 informer 的使用

    在实际开发过程中,若想要获取 kubernetes 中某个资源(比如 pod)的所有对象,可以使用 kubectl、k8s REST API、client-go...

    田飞雨
  • kubernetes 中 informer 的使用

    在实际开发过程中,若想要获取 kubernetes 中某个资源(比如 pod)的所有对象,可以使用 kubectl、k8s REST API、client-go...

    田飞雨
  • 除了MySQL,大牛DBA还会啥?

    写在前面:想要流畅阅读本文,需要读者——对K8s的架构有简单了解,理解API Server扮演的角色;具有阅读简单golang源码的能力,包括函数/类方法定义...

    腾讯云数据库 TencentDB

扫码关注云+社区

领取腾讯云代金券