在分析uber/ratelimit组件的设计之前,我们需要知道这个组件的功能。ratelimit 见名思义就是用来做"频次控制"的, 是Leaky Bucket 算法的一个实现。
我目前用的版本是v.2.0版本, 下面的案例也是官方给出的一个案例。
func TestExample(t *testing.T) {
// 初始化一个QPS=100的限流器
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
if i > 0 {
fmt.Println(i, now.Sub(prev))
}
prev = now
}
}
输出结果:
=== RUN TestExample
1 10ms
2 10ms
3 10ms
4 10ms
5 10ms
6 10ms
7 10ms
8 10ms
9 10ms
--- PASS: TestExample2 (0.09s)
PASS
要实现以一个固定的速率处理请求的效果, 我们只需要通过在 ratelimit 的 New 函数中, 传入的参数rate表示的是每秒允许请求量 (RPS)。简单换算一下就知道每个请求之间的时间间隔。
limiter.perRequest = time.Second / time.Duration(rate)
现在假设我们设置的rate=100, 那么经过计算可以得知perRequest=10ms。 当请求 R1 处理结束后, 我们记录下请求 R1 的处理完成的时刻, 记为 last。稍后请求 R2 到来, 如果此刻的时间last相比并没有达到perRequest 的间隔大小,那么 sleep 一段时间即可。
其次传统的漏桶算法有个明显的缺点就是无法适应突发流量的场景, uber-go为了缓解这个问题引入了松弛量 (maxSlack) 的概念,赋予了漏桶一定的令牌存储的功能。 个人觉得这个思路是从令牌桶算法的引入过来的。
maxSlack变量决定了在服务器空闲的时候可以存储一定数量的令牌,当遇到突发流量时可以把这些令牌用于应急。我们来看一下 ratelimit 默认允许存多少个令牌。
// buildConfig combines defaults with options.
func buildConfig(opts []Option) config {
c := config{
clock: clock.New(),
slack: 10,
per: time.Second,
}
...
}
我们可以看到默认可以存储10个令牌,只要服务器空闲了超过了100ms, 那么就算突然同时来了10个请求,系统也能处理。
最后一个问题就是如何处理高并发场景下“竞态问题”了, 如果10ms内就是多个请求,那么这几个请求势必会产生竞争,只有竞争成功的亲求,才有资格进入sleep阶段后获得执行,其他的请求都会进行Waiting。这一处在我这个版本能看到两个实现:
一个是基于lock的方式,这种方式会导致请求对应的goroutine进入block 状态;
// limiter_mutexbased.go 里面的Toke()实现
func (t *mutexLimiter) Take() time.Time {
t.Lock()
defer t.Unlock()
...
return t.last
}
一种方式是使用for和atomic.CompareAndSwapPointer配合进行自旋,这种方式不会导致请求对应的goroutine进入block ,但是会消耗少量的内存资源。Uber现在默认采用第二种方法解决“竞态问题”。
// limiter_atomic.go 里面的Toke()实现
func (t *atomicLimiter) Take() time.Time {
...
// 竞态环境,如果没有获得执行权就会进行自旋
for !taken {
...
// 这里会进行CAS尝试获取执行权
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
return newState.last
}
如果你看了uber/ratelimit的源码,那一定能看到其实还有一个clock配置字段,默认使用的是go原生的time clock, 精度在ns级别,如果需要更加精细的时钟,可以在初始化时使用WithClock进行切换。
type MyClock struct{}
func (*MyClock) Now() time.Time {
return time.Now()
}
func (*MyClock) Sleep(d time.Duration) {
time.Sleep(d)
}
func TestExample(t *testing.T) {
// 使用自定义的clock
myClock := &MyClock{}
rl := ratelimit.New(100, ratelimit.WithClock(myClock)) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
if i > 0 {
fmt.Println(i, now.Sub(prev))
}
prev = now
}
}
限流是一个大的主题,漏桶算法只是其发展过程中的一个产物,随着技术的演进业务对限流组件的诉求显然已经不是简单的设置一个阈值那么简单,最终肯定会向自适应算法演进。因此你会发现目前主流的限流组件Hystrix,Sentinel基本上都提供了动态的自适应的限流方案。而且云原生时代已经到来,未来限流组件势必会作为一个标准化的基础功能开放给开发者来选择,我们要做的就是保持一颗不断探索的初心。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。