前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go微服务--令牌桶实现原理

Go微服务--令牌桶实现原理

作者头像
冬夜先生
修改2021-09-03 10:55:59
3860
修改2021-09-03 10:55:59
举报
文章被收录于专栏:csico

1. 前言

在上一篇文章 Go微服务: 令牌桶 当中简单的介绍了令牌桶实现的原理,然后利用 /x/time/rate 这个库 10 行代码写了一个基于 ip 的 gin 限流中间件,那这个功能是怎么实现的呢?接下来我们就从源码层面来了解一下这个库的实现。这个实现很有意思,并没有真正的使用一个定时器不断的生成令牌,而是靠计算的方式来完成

2.rate/limt

golang.org/x/time/rate库中 使用限速器的时候我们需要调用 NewLimiter 方法,然后 Limiter 提供了三组限速的方法,这三组方法其实都是通过调用 reserveN 实现的 reserveN 返回一个 *Reservation 指针,先来看一下这两个结构体。

2.1 Limiter

代码语言:javascript
复制
type Limiter struct {    // 互斥锁	mu     sync.Mutex     // 每秒产生 token 的速度, 其实是 float64 的一个别名	limit  Limit     // 桶的大小	burst  int     // 当前时间节点拥有的 tokens 数量	tokens float64 	// 上次更新 token 的时间	last time.Time 	// 上次限速的时间,这个时间可能是过去的某个时间也可能是将来的某个时间	lastEvent time.Time}

2.2 Reservation

预定,表示预约某个时间的 token

代码语言:javascript
复制
type Reservation struct {    // 是否能预约上	ok        bool    // limter	lim       *Limiter    // 预约的 token 数量	tokens    int    // token 实际使用的时间	timeToAct time.Time	// 保存一下速率,因为 lim 的速率是可以被动态调整的,所以不能直接用	limit Limit}

这个库并没有使用定时器来发放 token 而是用了 lazyload 的方式,等需要消费 token 的时候才通过时间去计算然后更新 token 的数量,下面我们先通过一个例子来看一下这个流程是怎么跑的

如上图所示,假设我们有一个限速器,它的 token 生成速度为 1,也就是一秒一个,桶的大小为 10,每个格子表示一秒的时间间隔

  • last表示上一次更新 token时还有 2 个token
  • 现在我们有一个请求竟来, 总共需要7个 token才能完成请求
  • now表示我现在进来的时间,距离last 已经过去了2s, 那么现在就有4个token(每秒生成一个token)
  • 所以,如果需要 7 个 token 那么也就还需要等待 3s 中才真的有 7 个,所以这就是 timeToAct 所在的时间节点
  • 预约成功之后更新 last = now 、token = -3 因为 token 已经被预约出去了所以现在剩下的就是负数了

2.3 消费 token

总共有三种消费 token 的方法 AllowN, ReserveN, WaitN最终都是调用的reserveN 这个方法

代码语言:javascript
复制
// now: 需要消费 token 的时间点// n: 需要多少个 token// maxFutureReserve: 能够等待的最长时间func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {	lim.mu.Lock()     // 如果发放令牌的速度无穷大的话,那么直接返回就行了,要多少可以给多少	if lim.limit == Inf {		lim.mu.Unlock()		return Reservation{			ok:        true,			lim:       lim,			tokens:    n,			timeToAct: now,		}	}     // advance 方法会去计算当前有多少个 token    // 后面会讲到,now 其实就是传入的时间,但是 last 可能会变	now, last, tokens := lim.advance(now) 	// 发放 token 之后还剩多少	tokens -= float64(n) 	// 根据 token 数量计算需要等待的时间	var waitDuration time.Duration	if tokens < 0 {		waitDuration = lim.limit.durationFromTokens(-tokens)	} 	// 计算是否可以发放,如果需要的量比桶的容量还大肯定是不行的    // 然后就是看需要能否容忍需要等待的时间	ok := n <= lim.burst && waitDuration <= maxFutureReserve 	// Prepare reservation	r := Reservation{		ok:    ok,		lim:   lim,		limit: lim.limit,	}    // 如果可以的话,就把 token 分配给预约者	if ok {		r.tokens = n		r.timeToAct = now.Add(waitDuration)	} 	// 更新各个字段的状态	if ok {		lim.last = now		lim.tokens = tokens		lim.lastEvent = r.timeToAct	} else {        // 为什么不 ok 也要更新 last 呢?因为 last 可能会改变		lim.last = last	} 	lim.mu.Unlock()	return r}

advance 方法用于计算 token 的数量

代码语言:javascript
复制
// now 是传入的当前的时间点,返回的 newNow 其实就是传入的参数,没有任何改变// newLast 是更新 token 的时间// newTokens 是 token 的数量func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {	// 如果当前时间比上次更新 token 的时间还要早,那么就重置一下 last    last := lim.last	if now.Before(last) {		last = now	} 	// 这里为了防止溢出,先计算了将桶填满需要花费的最大时间	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)    // 计算时间差,如果大于最大时间的话,就取最大值	elapsed := now.Sub(last)	if elapsed > maxElapsed {		elapsed = maxElapsed	} 	// 计算这段时间生成的 token 数量,如果大于桶的容量,就取桶的容量	delta := lim.limit.tokensFromDuration(elapsed)	tokens := lim.tokens + delta	if burst := float64(lim.burst); tokens > burst {		tokens = burst	} 	return now, last, tokens}

这个比较有意思的是先去计算了时间的最大值,因为初始化的时候没为 last 赋值,所以 now.Before(last) 出来的结果可能是一个很大的值,再去计算 tokens 数量很可能溢出

durationFromTokens 根据 tokens 的数量计算需要花费的时间

代码语言:javascript
复制
func (limit Limit) durationFromTokens(tokens float64) time.Duration {	seconds := tokens / float64(limit)	return time.Nanosecond * time.Duration(1e9*seconds)}

tokensFromDuration根据时间计算 tokens 的数量

代码语言:javascript
复制
func (limit Limit) tokensFromDuration(d time.Duration) float64 {	// 这里通过拆分整数和小数部分可以减少时间上的误差	sec := float64(d/time.Second) * float64(limit)	nsec := float64(d%time.Second) * float64(limit)	return sec + nsec/1e9}

2.4 消费token的总结

消费 token 的逻辑就讲完了,大概总结一下

  • 需要消费的时候, 先去计算一下,从过去到现在可以生成多少个token
  • 然后通过需要的 token 减去现在拥有的token数量,就得到了需要预约的token数量
  • 再通过token数量 转换成时间,就可以得到需要等待的时间长度,以及是否可以消费
  • 然后再通过不同的消费方式进行消费

2.5 WaitN

代码语言:javascript
复制
// ctx 用于控制超时, n 是需要消费的 token 数量,如果 context 的 Deadline 早于要等待的时间就会直接返回失败func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {	lim.mu.Lock()	burst := lim.burst	limit := lim.limit	lim.mu.Unlock()     // 先看一下是不是已经超出消费极限了	if n > burst && limit != Inf {		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)	}     // 如果 ctx 已经结束了也不用等了	select {	case <-ctx.Done():		return ctx.Err()	default:	} 	// 计算一下可以等待的时间	now := time.Now()	waitLimit := InfDuration	if deadline, ok := ctx.Deadline(); ok {		waitLimit = deadline.Sub(now)	} 	// 调用 reserveN 得到预约数据	r := lim.reserveN(now, n, waitLimit)     // 如果不 ok 说明预约不到	if !r.ok {		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)	} 	// 如果可以预约到,计算一下需要等多久	delay := r.DelayFrom(now)	if delay == 0 {		return nil	}     // 启动一个 timer 进行定时	t := time.NewTimer(delay)	defer t.Stop()	select {	case <-t.C:		// We can proceed.		return nil	case <-ctx.Done():		// 如果 context 主动取消了,那么之前预约的 token 数量需要归还		r.Cancel()		return ctx.Err()	}}

2.5 取消消费

WaitN 当中如果预约上了,但是 Context 取消了,会调用 CancelAt 归还 tokens, 实现原理如下

代码语言:javascript
复制
func (r *Reservation) CancelAt(now time.Time) {    // 不 ok 说明没有预约上,直接返回就行了	if !r.ok {		return	} 	r.lim.mu.Lock()	defer r.lim.mu.Unlock()     // 如果没有速率限制,或者没有消费 token 或 token 已经被消费了,都不用还了	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {		return	} 	// 计算需要还的 token 数量    // 这里说是需要减去已经预支的 token 数量,但是我发现应该是个 bug,感觉这里减重复了	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))	if restoreTokens <= 0 {		return	}     // 计算当前拥有的 tokens 数量	now, _, tokens := r.lim.advance(now) 	// 当前拥有的加上需要归还的就是现有的,但是不能大于桶的容量	tokens += restoreTokens	if burst := float64(r.lim.burst); tokens > burst {		tokens = burst	} 	// 更新 tokens 数量	r.lim.last = now	r.lim.tokens = tokens     // 如果相等说明后面没有新的 token 消费,所以将状态重置到上一次	if r.timeToAct == r.lim.lastEvent {		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))		if !prevEvent.Before(now) {			r.lim.lastEvent = prevEvent		}	} 	return}

3. 存在的问题

除了上面提到的感觉 cancelAt 可能有一个 bug 外,云神的博客还提到了一个问题,就是如果我们 cancel 了的话,后面已经在等待的任务是不会重新调整的,举个例子

代码语言:javascript
复制
func wait() {	l := rate.NewLimiter(10, 10)	t := time.Now()	l.ReserveN(t, 10) 	var wg sync.WaitGroup 	ctx, cancel := context.WithTimeout(context.TODO(), time.Hour)	defer cancel()     // 注释掉下面这段就不会提前 cancel	wg.Add(1)	go func() {		defer wg.Done()		// 模拟出现问题, 200ms就取消了		time.Sleep(200 * time.Millisecond)		cancel()	}() 	wg.Add(2)	go func() {		defer wg.Done()		// 如果要等,这个要等 1s 才能执行,但是我们的 ctx 200ms 就会取消		l.WaitN(ctx, 10)		fmt.Printf("[1] cost: %s\n", time.Since(t))	}() 	time.Sleep(100 * time.Millisecond) 	go func() {		defer wg.Done()		// 正常情况下,这个要等 1.2 s 才能执行,但是我们前面都取消了		// 这个是不是应该就只需要等 200ms 就执行了		ctx, cancel := context.WithTimeout(context.Background(), time.Hour)		defer cancel()		l.WaitN(ctx, 2)		fmt.Printf("[2] cost: %s\n", time.Since(t))	}() 	wg.Wait()}

先看一下不提前 cancel 的结果

代码语言:javascript
复制
[1] cost: 1.0002113s[2] cost: 1.2007347s

再看看提前 cancel 的结果

代码语言:javascript
复制
[1] cost: 200.8268ms[2] cost: 1.201066s

可以看到就是 1 有变化,从 1s -> 200ms 但是 2 一直都要等 1.2s

3.1 关于可能存在的bug

代码语言:javascript
复制
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))

在取消的时候,会减掉一个预约的时间,但是我发现这里其实应该是重复减了一次

测试代码

代码语言:javascript
复制
func main() {	t0 := time.Now()	t1 := time.Now().Add(100 * time.Millisecond)	t2 := time.Now().Add(200 * time.Millisecond)	t3 := time.Now().Add(300 * time.Millisecond) 	l := rate.NewLimiter(10, 20)	l.ReserveN(t0, 15) // 桶里还剩 5 个 token	fmt.Printf("%+v\n", l) 	r := l.ReserveN(t1, 10) // 桶还有 -4 个,	fmt.Printf("%+v\n", l) 	// 注释掉下面两行,最后结果还剩 8 个 token	l.ReserveN(t2, 2) // 桶里还有 -5 个	fmt.Printf("%+v\n", l) 	r.CancelAt(t3)	fmt.Printf("%+v\n", l)	// 归还之前借的,运行结果 桶里还有 4 个	// 但是这里不应该剩下 6 个么,本来有 5 个,300ms 生成了 3 个,后面又预支出去 2 个	// 而且我发现如果我注释掉预支两个的代码,结果和我预期的一致,剩余 8 个token}

4. 总结

这一节主要是看了源码,但是其中还是有很多的细节没有深入的去了解,后面再去细看吧

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 前言
  • 2.rate/limt
    • 2.1 Limiter
      • 2.2 Reservation
        • 2.3 消费 token
          • 2.4 消费token的总结
            • 2.5 WaitN
              • 2.5 取消消费
              • 3. 存在的问题
                • 3.1 关于可能存在的bug
                • 4. 总结
                相关产品与服务
                消息队列 TDMQ
                消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档