最近在使用限频器时发现golang辅助系统库中的限频器有bug,分享出来与大家一起探讨一下。
测试代码:
package main
import (
"fmt"
"sync/atomic"
"time"
"golang.org/x/time/rate"
)
func main() {
var succCount, failCount int64
limit := rate.Every(100 * time.Millisecond)
burst := 1
limiter := rate.NewLimiter(limit, burst)
start := time.Now()
for i := 0; i < 5000; i++ {
go func() {
for {
if limiter.Allow() {
atomic.AddInt64(&succCount, 1)
} else {
atomic.AddInt64(&failCount, 1)
}
}
}()
}
time.Sleep(2 * time.Second)
elapsed := time.Since(start)
fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}
输出:
elapsed= 2.010675962s succCount= 24849 failCount= 6894827
使用的go版本:
go version go1.16.2 darwin/amd64
从上例可以看出,设置的qps是每秒通过10个请求,但在多协程并发场景下2s的时间段内竟然通过了24849个请求。在trpc服务场景中使用时每个请求也都会开一个协程进行业务逻辑处理,那在这种场景下岂不是就bug了。
我们深入代码看一下:
看time/rate的源码,Allow函数的实现只是AllowN(time.Now(), 1)的便捷实现:
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
AllowN又调用了reserveN方法:
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
reserveN的实现就很有意思了,
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
这里面比较重要的方法是advance的实现:
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
这个函数中返回了三个参数newNow,newLast,newTokens。
从代码中可以看出第一个参数newNow其实完全就是传入的入参now直接返回了,所以这个第一个返回值其实是没有必要的;
第二个参数,是返回上次tokens被更新的时间点,如果当前传入的时间点是在上次更新的时间点之前的话同样会返回当前传入的时间点;
第三个参数newTokens是根据当前的时间点与上次更新的时间点之间的流逝时间转换成token数量进行返回。
让我们结合本人加上的中文注释再回头看一下reserveN的实现逻辑:
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
// 加个锁先,这个好理解
lim.mu.Lock()
// 判断limit是否为无限大,直接返回ok
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 通过advance函数获取到now这个时间点可以用的token数量
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// 更新状态这里,如果ok了就更新当前的时间点以及需要更新的字段,但如果不ok的话为什么需要更新last字段呢
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
上面代码注释中已经指出了如果获取不ok的话,这里只更新了lim中的last字段,我们来考证一下:
type Limiter struct {
limit Limit
burst int
mu sync.Mutex
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
这里的注释写的很清楚,last是上面的tokens字段更新的时间点。所以上面reserveN中更新last字段的操作显得很迷。
所以这里改一下系统库的代码验证一下,将reserveN的方法修改一下:
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 这里忽略advance返回的第二个字段
now, _, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
}
// 这里如果不ok的话不执行任何操作
lim.mu.Unlock()
return r
}
修改点在注释里已经标明了,让上面的测试代码依赖修改后的库代码重新执行可得:
package main
import (
"fmt"
"sync/atomic"
"time"
//"golang.org/x/time/rate"
rate "git.code.oa.com/gcd/go-utils/comm/trate"
)
func main() {
var succCount, failCount int64
limit := rate.Every(100 * time.Millisecond)
burst := 1
limiter := rate.NewLimiter(limit, burst)
start := time.Now()
for i := 0; i < 5000; i++ {
go func() {
for {
if limiter.Allow() {
atomic.AddInt64(&succCount, 1)
} else {
atomic.AddInt64(&failCount, 1)
}
}
}()
}
time.Sleep(2 * time.Second)
elapsed := time.Since(start)
fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}
elapsed= 2.009816654s succCount= 21 failCount= 7513617
为此在GitHub上为系统库提了一个issue,发现早在2017年的时候就有人发现了这个问题并为此提了fix的建议,但一直没有被合并到master。
大家如果使用了这个限频器的话,注意避坑。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。