实现思路:
优点:
缺点:
总结: 固定窗口算法适用于对请求速率有明确要求且流量相对稳定的场景,但对于突发流量和请求分布不均匀的情况,可能需要考虑其他更灵活的限流算法。
这里贴一个 go 的实现:
type SlidingWindowLimiter struct {
windowSize time.Duration // 窗口大小
maxRequests int // 最大请求数
requests []time.Time // 窗口内的请求时间
requestsLock sync.Mutex // 请求锁
}
func NewSlidingWindowLimiter(windowSize time.Duration, maxRequests int) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
windowSize: windowSize,
maxRequests: maxRequests,
requests: make([]time.Time, 0),
}
}
func (limiter *SlidingWindowLimiter) AllowRequest() bool {
limiter.requestsLock.Lock()
defer limiter.requestsLock.Unlock()
// 移除过期的请求
currentTime := time.Now()
for len(limiter.requests) > 0 && currentTime.Sub(limiter.requests[0]) > limiter.windowSize {
limiter.requests = limiter.requests[1:]
}
// 检查请求数是否超过阈值
if len(limiter.requests) >= limiter.maxRequests {
return false
}
limiter.requests = append(limiter.requests, currentTime)
return true
}
优点:
缺点:
总结: 滑动窗口算法实际上是颗粒度更小的固定窗口算法,它可以在一定程度上提高限流的精度和实时性,并不能从根本上解决请求分布不均匀的问题。算法受限于窗口的大小和时间间隔,特别是在极端情况下,如突发流量过大或请求分布极不均匀的情况下,仍然可能导致限流不准确, 如下图:
滑动窗口在窗口大小和时间间隔不够精细的情况下, 仍然无法准确的应对突发流量, 漏桶可以视为滑动窗口的一个改进: 维护一个固定容量的漏桶,请求以不定的速率流入漏桶,而漏桶以固定的速率流出。如果请求到达时,漏桶已满,则会触发拒绝策略
总结: 漏桶算法控制流量流速绝对均匀, 适合流量比较平滑的场景(如数据库), 分布式的实现难度较滑动窗口来说复杂一些
go 实现:
type LeakyBucket struct {
rate float64 // 漏桶速率,单位请求数/秒
capacity int // 漏桶容量,最多可存储请求数
water int // 当前水量,表示当前漏桶中的请求数
lastLeakMs int64 // 上次漏水的时间戳,单位秒
}
func NewLeakyBucket(rate float64, capacity int) *LeakyBucket {
return &LeakyBucket{
rate: rate,
capacity: capacity,
water: 0,
lastLeakMs: time.Now().Unix(),
}
}
func (lb *LeakyBucket) Allow() bool {
now := time.Now().Unix()
elapsed := now - lb.lastLeakMs
// 漏水,根据时间间隔计算漏掉的水量
leakAmount := int(float64(elapsed) / 1000 * lb.rate)
if leakAmount > 0 {
if leakAmount > lb.water {
lb.water = 0
} else {
lb.water -= leakAmount
}
}
// 判断当前水量是否超过容量
if lb.water > lb.capacity {
lb.water-- // 如果超过容量,减去刚刚增加的水量
return false
}
// 增加水量
lb.water++
lb.lastLeakMs = now
return true
}
guava RateLimiter
作为抽象类有个子类 SmoothRateLimiter
, 这是个抽象类并且又两个实现类:SmoothWarmingUp
和SmoothBursty
。
RateLimiter
只有两个属性:
// 用于计时,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。
private final SleepingStopwatch stopwatch;
// 来做锁,RateLimiter 依赖于 synchronized 来控制并发
private volatile Object mutexDoNotUseDirectly;
SmoothRateLimiter
作为抽象类继承于 RateLimiter
。SmoothRateLimiter
的属性如下:
// 当前还有多少 permits 没有被使用,被存下来的 permits 数量
double storedPermits;
// 最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
double maxPermits;
// 每隔多少时间产生一个 permit,
// 比如我们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000 个
double stableIntervalMicros;
// 下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间
private long nextFreeTicketMicros = 0L;
nextFreeTicketMicros
是一个很关键的属性。每次获取 permits 的时候,先拿 storedPermits 的值,因为它是存货,如果够,storedPermits 减去相应的值就可以了,如果不够,那么还需要将 nextFreeTicketMicros 往前推,表示预占了接下来多少时间的量了。
那么下一个请求来的时候,如果还没到 nextFreeTicketMicros 这个时间点,需要 sleep 到这个点再返回,就要将这个值再往前推。
构造 SmoothBursty
:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
// 这里实例化就一个属性 maxBurstSeconds 为 1.0, 代表最多缓存 1s
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
继续看 setRate
分析
public final void setRate(double permitsPerSecond) {
// 检查表达式是否有效
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
// synchronized 控制并发
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
// doSetRate
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 同步
resync(nowMicros);
// 计算属性 stableIntervalMicros
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
resync
用来更新 storedPermits
和 nextFreeTicketMicros
, 避免长时间不调用 acquire 导致不准确:
void resync(long nowMicros) {
// nextFreeTicket 已经过过期了,比如很长时间没有再次调用 limiter.acquire()
// 需要将 nextFreeTicket 设置为当前时间,重新计算 storedPermits
if (nowMicros > nextFreeTicketMicros) {
// coolDownIntervalMicros 直接返回了 stableIntervalMicros, 也就是生产一个 permits 的时间长度
// 计算一下, 这段时间产生了多少 permits
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
// 如果超过了 maxtPermits, 则使用 maxPermits
storedPermits = min(maxPermits, storedPermits + newPermits);
// 将 nextFreeTicketMicros 更新为现在
nextFreeTicketMicros = nowMicros;
}
}
设置好了stableIntervalMicros
、storedPermits
和nextFreeTicketMicros
, doSetRate 的实现:
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
// 这里计算了新的 maxPermits 为 1 秒产生的 permits
// 原来的值是初始化的, 现在要重新调整频率, 所以需要重新计算 maxPermits
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// maxPermits 来说,是重新计算,而对于 storedPermits 来说,是做等比例的缩放
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
// 预约,如果当前不能直接获取到 permits,需要等待
// 返回值代表需要 sleep 多久
long microsToWait = reserve(permits);
// sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回 sleep 的时长
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
reserve
预定 permits:
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 返回 nextFreeTicketMicros
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 计算时长
return max(momentAvailable - nowMicros, 0);
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 这里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)
resync(nowMicros);
// 返回值就是 nextFreeTicketMicros,注意刚刚已经做了 resync 了,此时它是最新的正确的值
long returnValue = nextFreeTicketMicros;
// storedPermits 中可以使用多少个 permits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// storedPermits 中不够的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 为了这个不够的部分,需要等待多久时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
+ (long) (freshPermits * stableIntervalMicros);
// 将 nextFreeTicketMicros 往前推
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// storedPermits 减去被拿走的部分
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
从 reserve 的流程可看到,获取 permits 的时候,其实是获取了两部分,一部分来自于存量 storedPermits,存量不够的话,另一部分来自于预占未来的 freshPermits。