专栏首页Go faster系统库golang.org/x/time/rate 限频器bug
原创

系统库golang.org/x/time/rate 限频器bug

背景

最近在使用限频器时发现golang辅助系统库中的限频器有bug,分享出来与大家一起探讨一下。

测试代码:

package main

import (
	"fmt"
	"sync/atomic"
	"time"
	"golang.org/x/time/rate"
)

func main() {
	var succCount, failCount int64
	limit := rate.Every(100 * time.Millisecond)
	burst := 1
	limiter := rate.NewLimiter(limit, burst)
	start := time.Now()
	for i := 0; i < 5000; i++ {
		go func() {
			for {
				if limiter.Allow() {
					atomic.AddInt64(&succCount, 1)
				} else {
					atomic.AddInt64(&failCount, 1)
				}
			}
		}()
	}
	time.Sleep(2 * time.Second)
	elapsed := time.Since(start)
	fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}

输出:

elapsed= 2.010675962s succCount= 24849 failCount= 6894827

使用的go版本:

go version go1.16.2 darwin/amd64

从上例可以看出,设置的qps是每秒通过10个请求,但在多协程并发场景下2s的时间段内竟然通过了24849个请求。在trpc服务场景中使用时每个请求也都会开一个协程进行业务逻辑处理,那在这种场景下岂不是就bug了。

原因

我们深入代码看一下:

看time/rate的源码,Allow函数的实现只是AllowN(time.Now(), 1)的便捷实现:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
	return lim.AllowN(time.Now(), 1)
}

AllowN又调用了reserveN方法:

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok
}

reserveN的实现就很有意思了,

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}

	now, last, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

这里面比较重要的方法是advance的实现:

// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	last := lim.last
	if now.Before(last) {
		last = now
	}

	// Avoid making delta overflow below when last is very old.
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
	elapsed := now.Sub(last)
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}

	// Calculate the new number of tokens, due to time that passed.
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}

	return now, last, tokens
}

这个函数中返回了三个参数newNow,newLast,newTokens。

从代码中可以看出第一个参数newNow其实完全就是传入的入参now直接返回了,所以这个第一个返回值其实是没有必要的;

第二个参数,是返回上次tokens被更新的时间点,如果当前传入的时间点是在上次更新的时间点之前的话同样会返回当前传入的时间点;

第三个参数newTokens是根据当前的时间点与上次更新的时间点之间的流逝时间转换成token数量进行返回。

让我们结合本人加上的中文注释再回头看一下reserveN的实现逻辑:

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
    // 加个锁先,这个好理解
	lim.mu.Lock()
    
    // 判断limit是否为无限大,直接返回ok
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
    // 通过advance函数获取到now这个时间点可以用的token数量
	now, last, tokens := lim.advance(now)
    
	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}
    // 更新状态这里,如果ok了就更新当前的时间点以及需要更新的字段,但如果不ok的话为什么需要更新last字段呢
	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

上面代码注释中已经指出了如果获取不ok的话,这里只更新了lim中的last字段,我们来考证一下:

type Limiter struct {
	limit Limit
	burst int

	mu     sync.Mutex
	tokens float64
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}

这里的注释写的很清楚,last是上面的tokens字段更新的时间点。所以上面reserveN中更新last字段的操作显得很迷。

所以这里改一下系统库的代码验证一下,将reserveN的方法修改一下:

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
    
    // 这里忽略advance返回的第二个字段
	now, _, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	}
	// 这里如果不ok的话不执行任何操作

	lim.mu.Unlock()
	return r
}

修改点在注释里已经标明了,让上面的测试代码依赖修改后的库代码重新执行可得:

package main

import (
	"fmt"
	"sync/atomic"
	"time"
	//"golang.org/x/time/rate"

	rate "git.code.oa.com/gcd/go-utils/comm/trate"
)

func main() {
	var succCount, failCount int64
	limit := rate.Every(100 * time.Millisecond)
	burst := 1
	limiter := rate.NewLimiter(limit, burst)
	start := time.Now()
	for i := 0; i < 5000; i++ {
		go func() {
			for {
				if limiter.Allow() {
					atomic.AddInt64(&succCount, 1)
				} else {
					atomic.AddInt64(&failCount, 1)
				}
			}
		}()
	}
	time.Sleep(2 * time.Second)
	elapsed := time.Since(start)
	fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}
elapsed= 2.009816654s succCount= 21 failCount= 7513617

进展

为此在GitHub上为系统库提了一个issue,发现早在2017年的时候就有人发现了这个问题并为此提了fix的建议,但一直没有被合并到master。

总结

大家如果使用了这个限频器的话,注意避坑。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 常用限流算法的应用场景和实现原理

    在高并发业务场景下,保护系统时,常用的"三板斧"有:"熔断、降级和限流"。今天和大家谈谈常用的限流算法的几种实现方式,这里所说的限流并非是网关层面的限流,而是业...

    KevinYan
  • X射线:通过最大程度地减少对所学占用分布的支持来机械搜索被遮挡的对象(CS RO)

    对于电子商务,仓库,医疗保健和家庭服务中的应用程序,通常需要机器人搜索对象堆以掌握特定的目标对象。对于机械搜索,我们介绍了X射线,这是一种基于学习到的占用分布的...

    时代在召唤
  • Android开发高级进阶——传感器

    SensorManager提供的注册传感器的方法为registerListener(SensorEventListener listener, Sensor s...

    trampcr
  • Prometheus 如何做到“活学活用”,大牛总结的避坑指南

    监控系统的历史悠久,是一个很成熟的方向,而 Prometheus 作为新生代的开源监控系统,慢慢成为了云原生体系的事实标准,也证明了其设计很受欢迎。

    民工哥
  • 安卓测试常用的 ADB 命令

        adb的工作方式比较特殊采用监听Socket TCP 5554等端口的方式让IDE和Qemu通讯,默认情况下adb会daemon相关的网络端口,所以当我...

    测试之道
  • 为什么这条异常没有上报? HTTP 429

    刚开始碰到 Sentry 中未收到报错 (Event) 时,一直在尝试去找 Sentry 服务器端的 Inbound Filter 设置以及 Sentry 客户...

    山月
  • Shell 命令执行可视化和告警工具

    链接:https://www.freebuf.com/sectool/212820.html

    释然
  • 重定向的九种方案及性能比较

    最近计划对于之前的短链接服务进行升级改造。在改造前,对于常见 Web 语言,如 Java、PHP、Python、Node、Ruby、Go和服务工具 Nginx、...

    soulteary
  • nginx防止DDOS攻击配置(二)

    我们用的高防服务器只防流量攻击不防CC,现在的攻击多数都是混合型的,而且CC攻击很多,防CC只能自己搞了,按照第一篇的配置,在实际的使用中效果并不理想。限制每秒...

    后端技术探索
  • ROS学习记录①:安装、起步和IDE工具

    依次打开 File -> Setting -> Editor -> File and Code Templates -> Python Script,添加

    小黑鸭
  • Python 播放音频与录音

    os.system(file) 调用系统应用来打开文件,file 可为图片或者音频文件。

    Python进阶者
  • 技术干货|eBay对流量控制说“so easy”!

    流量控制对于保证Web服务的安全性和可靠性至关重要。在安全性方面,需要阻止黑客频繁访问某些API而获取大量信息。在可靠性方面,任何服务在有限资源...

    Spark学习技巧
  • chrony软件使用说明

    1.1.1 chrony简介   Chrony是一个开源的自由软件,它能保持系统时钟与时钟服务器(NTP)同步,让时间保持精确。   它由两个程序组成:chro...

    惨绿少年
  • 高可用 Prometheus 的常见问题

    监控系统的历史悠久,是一个很成熟的方向,而 Prometheus 作为新生代的开源监控系统,慢慢成为了云原生体系的事实标准,也证明了其设计很受欢迎。本文主要分享...

    米开朗基杨
  • Html 5 video/audio 格式转换 ogg

    Html5 开始支持video和audio标签,但是各个浏览器支持的格式不一样,见下图 Codec support in modern desktop brow...

    JadePeng
  • 【Rust日报】2020-08-29 生产环境 Rust 序列化库的选择

    特别是 Serde 在 Rust 1.0.0 发布之前就已经可用,其背后的理念是使用 trait 解耦对象,并从序列化格式中进行序列化/反序列化,这是一个非常强...

    MikeLoveRust
  • 监控神器Prometheus用不对,也就是把新手村的剑

    监控系统的历史悠久,是一个很成熟的方向,而 Prometheus 作为新生代的开源监控系统,慢慢成为了云原生体系的事实标准,也证明了其设计很受欢迎。

    lyb-geek
  • 分布式存储系统可靠性:系统量化估算

    可用性指的是系统服务的可用性。一般按全年可用时间除以全年时间来衡量可用性的好坏,平常我们说的 SLA指标就是可用性指标,这里就不展开细说。

    用户8639654
  • requestAnimationFrame 的 Bug?

    今天有一位同学,在群里问了这一个问题:requestAnimationFrame 的执行机制如何

    落落落洛克

扫码关注云+社区

领取腾讯云代金券