随着微服务的流行,服务之间的依赖性和调用关系变得越来越复杂,服务的稳定性变得尤为重要。业务场景中经常会涉及到瞬时流量冲击,可能会导致请求响应超时,甚至服务器被压垮、宕机不可用。出于对系统本身和上下游服务的保护,我们通常会对请求进行限流处理,快速拒绝超出配置上限的请求,保证系统或上下游服务系统的稳定。合理策略能有效应对流量冲击,确保系统可用性和性能。本文详细介绍了几种限流算法,比较各个算法的优缺点,给出了限流算法选型的一些建议,同时对业务上常用的分布式限流也提出一些解决方案。
保护高并发服务稳定主要有三把利器:缓存、降级和限流。
这三把"利器"各有其特点,通常会结合使用,以达到最佳的效果。例如,可以通过缓存来减少数据库的访问,通过降级来应对系统故障,通过限流来防止系统过载。在设计高并发系统时,需要根据系统的具体需求和特点,合理地使用这些技术。接下来本文会主要介绍一些业界常用的限流方法。
限流是一种对请求或并发数进行限制的关键技术手段,旨在保障系统的正常运行。当服务资源有限、处理能力有限时,限流可以对调用服务的上游请求进行限制,以防止自身服务因资源耗尽而停止服务。
在限流中,有两个重要的概念需要了解:
通过合理设置阈值和选择适当的拒绝策略,限流技术可以帮助系统应对突发的请求量激增、恶意用户访问或请求频率过高的情况,保障系统的稳定性和可用性。限流方案根据限流范围,可分为 单机限流和分布式限流 ;其中单机限流依据算法,又可分为 固定窗口、滑动窗口、漏桶和令牌桶限流等常见四种 。本文将对上述限流方案进行详细介绍。
固定窗口算法是一种简单直观的限流算法,其原理是将时间划分为固定大小的窗口,在每个窗口内限制请求的数量或速率。具体实现时,可以使用一个计数器来记录当前窗口内的请求数,并与预设的阈值进行比较。固定窗口算法的原理如下:
type FixedWindowLimiter struct {
windowSize time.Duration // 窗口大小
maxRequests int // 最大请求数
requests int // 当前窗口内的请求数
lastReset int64 // 上次窗口重置时间(秒级时间戳)
resetMutex sync.Mutex // 重置锁
}
func NewFixedWindowLimiter(windowSize time.Duration, maxRequests int) *FixedWindowLimiter {
return &FixedWindowLimiter{
windowSize: windowSize,
maxRequests: maxRequests,
lastReset: time.Now().Unix(),
}
}
func (limiter *FixedWindowLimiter) AllowRequest() bool {
limiter.resetMutex.Lock()
defer limiter.resetMutex.Unlock()
// 检查是否需要重置窗口
if time.Now().Unix()-limiter.lastReset >= int64(limiter.windowSize.Seconds()) {
limiter.requests = 0
limiter.lastReset = time.Now().Unix()
}
// 检查请求数是否超过阈值
if limiter.requests >= limiter.maxRequests {
return false
}
limiter.requests++
return true
}
func main() {
limiter := NewFixedWindowLimiter(1*time.Second, 3) // 每秒最多允许3个请求
for i := 0; i < 15; i++ {
now := time.Now().Format("15:04:05")
if limiter.AllowRequest() {
fmt.Println(now + " 请求通过")
} else {
fmt.Println(now + " 请求被限流")
}
time.Sleep(100 * time.Millisecond)
}
}
执行结果:
优点:
缺点:
固定窗口算法适用于对请求速率有明确要求且流量相对稳定的场景,但对于突发流量和请求分布不均匀的情况,可能需要考虑其他更灵活的限流算法。
上文已经说明当遇到时间窗口的临界突变时,固定窗口算法可能无法灵活地应对流量的变化。例如,在一个时间窗口结束时,如果突然出现大量请求,固定窗口算法可能会导致请求被拒绝,即使在下一个时间窗口内的请求并不多。这种情况下,我们需要一种更灵活的算法来应对突发流量和请求分布不均匀的情况—滑动窗口算法。该算法是对固定窗口算法的一种改进,它通过动态调整窗口的大小来更好地适应流量的变化。与固定窗口算法不同,滑动窗口算法可以在遇到下一个时间窗口之前调整窗口的大小,以便更好地控制请求的速率。算法的原理如下:
package main
import (
"fmt"
"sync"
"time"
)
type SlidingWindowLimiter struct {
windowSize time.Duration // 窗口大小
maxRequests int // 最大请求数
requests []time.Time // 窗口内的请求时间
requestsLock sync.Mutex // 请求锁
}
func NewSlidingWindowLimiter(windowSize time.Duration, maxRequests int) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
windowSize: windowSize,
maxRequests: maxRequests,
requests: make([]time.Time, 0),
}
}
func (limiter *SlidingWindowLimiter) AllowRequest() bool {
limiter.requestsLock.Lock()
defer limiter.requestsLock.Unlock()
// 移除过期的请求
currentTime := time.Now()
for len(limiter.requests) > 0 && currentTime.Sub(limiter.requests[0]) > limiter.windowSize {
limiter.requests = limiter.requests[1:]
}
// 检查请求数是否超过阈值
if len(limiter.requests) >= limiter.maxRequests {
return false
}
limiter.requests = append(limiter.requests, currentTime)
return true
}
func main() {
limiter := NewSlidingWindowLimiter(500*time.Millisecond, 2) // 每秒最多允许4个请求
for i := 0; i < 15; i++ {
now := time.Now().Format("15:04:05")
if limiter.AllowRequest() {
fmt.Println(now + " 请求通过")
} else {
fmt.Println(now + " 请求被限流")
}
time.Sleep(100 * time.Millisecond)
}
}
执行结果:
优点:
缺点:
从代码的角度可以发现,滑动窗口算法实际上是颗粒度更小的固定窗口算法,它可以在一定程度上提高限流的精度和实时性,并不能从根本上解决请求分布不均匀的问题。算法受限于窗口的大小和时间间隔,特别是在极端情况下,如突发流量过大或请求分布极不均匀的情况下,仍然可能导致限流不准确。因此,在实际应用中,要采用更复杂的算法或策略来进一步优化限流效果。
尽管滑动窗口算法可以提供一定的限流效果,但它仍然受限于窗口的大小和时间间隔。在某些情况下,突发流量可能会导致窗口内的请求数超过限制。为了更好地平滑请求的流量,漏桶限流算法可以作为滑动窗口算法的改进。算法的原理很简单:它维护一个固定容量的漏桶,请求以不定的速率流入漏桶,而漏桶以固定的速率流出。如果请求到达时,漏桶已满,则会触发拒绝策略。可以从生产者-消费者模式去理解这个算法,请求充当生产者,每个请求就像一滴水,当请求到达时,它被放入一个队列(漏桶)中。而漏桶则充当消费者,以固定的速率从队列中消费请求,就像从桶底的孔洞中不断漏出水滴。消费的速率等于限流阈值,例如每秒处理2个请求,即500毫秒消费一个请求。漏桶的容量就像队列的容量,当请求堆积超过指定容量时,会触发拒绝策略,即新到达的请求将被丢弃或延迟处理。算法的实现如下:
package main
import (
"fmt"
"time"
)
type LeakyBucket struct {
rate float64 // 漏桶速率,单位请求数/秒
capacity int // 漏桶容量,最多可存储请求数
water int // 当前水量,表示当前漏桶中的请求数
lastLeakMs int64 // 上次漏水的时间戳,单位秒
}
func NewLeakyBucket(rate float64, capacity int) *LeakyBucket {
return &LeakyBucket{
rate: rate,
capacity: capacity,
water: 0,
lastLeakMs: time.Now().Unix(),
}
}
func (lb *LeakyBucket) Allow() bool {
now := time.Now().Unix()
elapsed := now - lb.lastLeakMs
// 漏水,根据时间间隔计算漏掉的水量
leakAmount := int(float64(elapsed) / 1000 * lb.rate)
if leakAmount > 0 {
if leakAmount > lb.water {
lb.water = 0
} else {
lb.water -= leakAmount
}
}
// 判断当前水量是否超过容量
if lb.water > lb.capacity {
lb.water-- // 如果超过容量,减去刚刚增加的水量
return false
}
// 增加水量
lb.water++
lb.lastLeakMs = now
return true
}
func main() {
// 创建一个漏桶,速率为每秒处理3个请求,容量为4个请求
leakyBucket := NewLeakyBucket(3, 4)
// 模拟请求
for i := 1; i <= 15; i++ {
now := time.Now().Format("15:04:05")
if leakyBucket.Allow() {
fmt.Printf(now+" 第 %d 个请求通过\n", i)
} else {
fmt.Printf(now+" 第 %d 个请求被限流\n", i)
}
time.Sleep(200 * time.Millisecond) // 模拟请求间隔
}
}
执行结果:
优点:
缺点:
令牌桶算法是实现限流是一种常见的思路,用于限制请求的速率。它可以确保系统在高负载情况下仍能提供稳定的服务,并防止突发流量对系统造成过载。最为常用的 Google 的 Java 开发工具包 Guava 中的限流工具类 RateLimiter 就是令牌桶的一个实现。令牌桶算法基于一个令牌桶的概念,其中令牌以固定的速率产生,并放入桶中。每个令牌代表一个请求的许可。当请求到达时,需要从令牌桶中获取一个令牌才能通过。如果令牌桶中没有足够的令牌,则请求被限制或丢弃。 令牌桶算法的实现步骤如下:
package main
import (
"fmt"
"sync"
"time"
)
// TokenBucket 表示一个令牌桶。
type TokenBucket struct {
rate float64 // 令牌添加到桶中的速率。
capacity float64 // 桶的最大容量。
tokens float64 // 当前桶中的令牌数量。
lastUpdate time.Time // 上次更新令牌数量的时间。
mu sync.Mutex // 互斥锁,确保线程安全。
}
// NewTokenBucket 创建一个新的令牌桶,给定令牌添加速率和桶的容量。
func NewTokenBucket(rate float64, capacity float64) *TokenBucket {
return &TokenBucket{
rate: rate,
capacity: capacity,
tokens: capacity, // 初始时,桶是满的。
lastUpdate: time.Now(),
}
}
// Allow 检查是否可以从桶中移除一个令牌。如果可以,它移除一个令牌并返回 true。
// 如果不可以,它返回 false。
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
// 根据经过的时间计算要添加的令牌数量。
now := time.Now()
elapsed := now.Sub(tb.lastUpdate).Seconds()
tokensToAdd := elapsed * tb.rate
tb.tokens += tokensToAdd
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity // 确保令牌数量不超过桶的容量。
}
if tb.tokens >= 1.0 {
tb.tokens--
tb.lastUpdate = now
return true
}
return false
}
func main() {
tokenBucket := NewTokenBucket(2.0, 3.0)
for i := 1; i <= 10; i++ {
now := time.Now().Format("15:04:05")
if tokenBucket.Allow() {
fmt.Printf(now+" 第 %d 个请求通过\n", i)
} else { // 如果不能移除一个令牌,请求被拒绝。
fmt.Printf(now+" 第 %d 个请求被限流\n", i)
}
time.Sleep(200 * time.Millisecond)
}
}
执行结果:
优点:
缺点:
单机限流指针对单一服务器的情况,通过限制单台服务器在单位时间内处理的请求数量,防止服务器过载。常见的限流算法上文已介绍,其优点在于实现简单,效率高,效果明显。随着微服务架构的普及,系统的服务通常会部署在多台服务器上,此时就需要分布式限流来保证整个系统的稳定性。接下本文会介绍几种常见的分布式限流技术方案:
通过一个中心化的限流器来控制所有服务器的请求。实现方式:
package main
import (
"context"
"fmt"
"go.uber.org/atomic"
"sync"
"git.code.oa.com/pcg-csd/trpc-ext/redis"
)
type RedisClient interface {
Do(ctx context.Context, cmd string, args ...interface{}) (interface{}, error)
}
// Client 数据库
type Client struct {
client RedisClient // redis 操作
script string // lua脚本
}
// NewBucketClient 创建redis令牌桶
func NewBucketClient(redis RedisClient) *Client {
helper := redis
return &Client{
client: helper,
script: `
-- 令牌桶限流脚本
-- KEYS[1]: 桶的名称
-- ARGV[1]: 桶的容量
-- ARGV[2]: 令牌产生速率
local bucket = KEYS[1]
local capacity = tonumber(ARGV[1])
local tokenRate = tonumber(ARGV[2])
local redisTime = redis.call('TIME')
local now = tonumber(redisTime[1])
local tokens, lastRefill = unpack(redis.call('hmget', bucket, 'tokens', 'lastRefill'))
tokens = tonumber(tokens)
lastRefill = tonumber(lastRefill)
if not tokens or not lastRefill then
tokens = capacity
lastRefill = now
else
local intervalsSinceLast = (now - lastRefill) * tokenRate
tokens = math.min(capacity, tokens + intervalsSinceLast)
end
if tokens < 1 then
return 0
else
redis.call('hmset', bucket, 'tokens', tokens - 1, 'lastRefill', now)
return 1
end
`,
}
}
// 获取令牌,获取成功则立即返回true,否则返回false
func (c *Client) isAllowed(ctx context.Context, key string, capacity int64, tokenRate int64) (bool, error) {
result, err := redis.Int(c.client.Do(ctx, "eval", c.script, 1, key, capacity, tokenRate))
if err != nil {
fmt.Println("Redis 执行错误:", err)
return false, err
}
return result == 1, nil
}
// 调用检测
func main() {
c := NewBucketClient(redis.GetPoolByName("redis://127.0.0.1:6379"))
gw := sync.WaitGroup{}
gw.Add(120)
count := atomic.Int64{}
for i := 0; i < 120; i++ {
go func(i int) {
defer gw.Done()
status, err := c.isAllowed(context.Background(), "test", 100, 10)
if status {
count.Add(1)
}
fmt.Printf("go %d status:%v error: %v\n", i, status, err)
}(i)
}
gw.Wait()
fmt.Printf("allow %d\n\n", count.Load())
}
执行结果:
可以看到中心化限流方案的存在较高的单点故障风险,且带宽瓶颈比较严重。在这个基础上本文结合本地缓存单机限流和负载均衡设计了一个新的分布式限流方案。具体方案如下:
使用ZooKeeper或者etcd等分布式协调服务来实现限流。每台服务器都会向分布式协调服务申请令牌,只有获取到令牌的请求才能被处理。基本方案:
这个方案的优点是可以实现精确的全局限流,并且可以避免单点故障。但是,这个方案的缺点是实现复杂,且对ZooKeeper的性能有较高的要求。如果ZooKeeper无法处理大量的令牌申请和释放操作,可能会成为系统的瓶颈。
总之,没有最好的方案,只有合适的方案。在选择合适的限流方案时,我们需要考虑多种因素,包括系统的需求、现有的技术栈、系统的负载情况以及底层系统的性能等。理解每种方案的工作原理和特性,以便在实际应用中做出最佳的选择。
一个好的限流设计必须要考虑到业务的特性和需求,同时具备以下六点:
限流是保证系统稳定和高效运行的重要手段,但它并不是唯一的解决方案。我们还需要考虑其他的系统设计和优化手段,例如负载均衡、缓存、异步处理等(面对爆量,扩容永远是最好的方式,除了贵!)。这些手段相互配合,才能构建出一个既能应对高并发请求,又能保证服务质量的系统。