这种算法很好实现, 但是会出现限流不准确问题。比如每秒通过 5 个请求,时间窗口的大小为 1 秒,当前时间窗口周期内的后半秒正常通过了 5 个请求,下一个时间窗口周期内的前半秒正常通过了 5 个请求,在这两个窗口内都没有超过限制。 但是在这两个窗口的中间那一秒实际上通过了 10 个请求,显然不满足每秒 5 个请求的限制。
这种算法解决了固定窗口计数器出现的通过请求数是限制数两倍的缺陷,但是需要记录窗口周期内的请求,如果限流阈值设置过大,窗口周期内记录的请求就会很多,就会比较占用内存
漏桶算法控制流量流速绝对均匀, 适合流量比较平滑的场景(如数据库), 分布式的实现难度较滑动窗口来说复杂一些
guava RateLimiter
作为抽象类有个子类 SmoothRateLimiter
, 这是个抽象类并且又两个实现类:SmoothWarmingUp
和SmoothBursty
。
RateLimiter
只有两个属性:
// 用于计时,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。
private final SleepingStopwatch stopwatch;
// 来做锁,RateLimiter 依赖于 synchronized 来控制并发
private volatile Object mutexDoNotUseDirectly;
SmoothRateLimiter
作为抽象类继承于 RateLimiter
。SmoothRateLimiter
的属性如下:
// 当前还有多少 permits 没有被使用,被存下来的 permits 数量
double storedPermits;
// 最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
double maxPermits;
// 每隔多少时间产生一个 permit,
// 比如我们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000 个
double stableIntervalMicros;
// 下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间
private long nextFreeTicketMicros = 0L;
nextFreeTicketMicros
是一个很关键的属性。每次获取 permits 的时候,先拿 storedPermits 的值,因为它是存货,如果够,storedPermits 减去相应的值就可以了,如果不够,那么还需要将 nextFreeTicketMicros 往前推,表示预占了接下来多少时间的量了。
那么下一个请求来的时候,如果还没到 nextFreeTicketMicros 这个时间点,需要 sleep 到这个点再返回,就要将这个值再往前推。
构造 SmoothBursty
:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
// 这里实例化就一个属性 maxBurstSeconds 为 1.0, 代表最多缓存 1s
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
继续看 setRate
分析
public final void setRate(double permitsPerSecond) {
// 检查表达式是否有效
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
// synchronized 控制并发
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
// doSetRate
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 同步
resync(nowMicros);
// 计算属性 stableIntervalMicros
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
resync
用来更新 storedPermits
和 nextFreeTicketMicros
, 避免长时间不调用 acquire 导致不准确:
void resync(long nowMicros) {
// nextFreeTicket 已经过过期了,比如很长时间没有再次调用 limiter.acquire()
// 需要将 nextFreeTicket 设置为当前时间,重新计算 storedPermits
if (nowMicros > nextFreeTicketMicros) {
// coolDownIntervalMicros 直接返回了 stableIntervalMicros, 也就是生产一个 permits 的时间长度
// 计算一下, 这段时间产生了多少 permits
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
// 如果超过了 maxtPermits, 则使用 maxPermits
storedPermits = min(maxPermits, storedPermits + newPermits);
// 将 nextFreeTicketMicros 更新为现在
nextFreeTicketMicros = nowMicros;
}
}
设置好了stableIntervalMicros
、storedPermits
和nextFreeTicketMicros
, doSetRate 的实现:
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
// 这里计算了新的 maxPermits 为 1 秒产生的 permits
// 原来的值是初始化的, 现在要重新调整频率, 所以需要重新计算 maxPermits
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// maxPermits 来说,是重新计算,而对于 storedPermits 来说,是做等比例的缩放
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
// 预约,如果当前不能直接获取到 permits,需要等待
// 返回值代表需要 sleep 多久
long microsToWait = reserve(permits);
// sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回 sleep 的时长
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
reserve
预定 permits:
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 返回 nextFreeTicketMicros
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 计算时长
return max(momentAvailable - nowMicros, 0);
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 这里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)
resync(nowMicros);
// 返回值就是 nextFreeTicketMicros,注意刚刚已经做了 resync 了,此时它是最新的正确的值
long returnValue = nextFreeTicketMicros;
// storedPermits 中可以使用多少个 permits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// storedPermits 中不够的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 为了这个不够的部分,需要等待多久时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
+ (long) (freshPermits * stableIntervalMicros);
// 将 nextFreeTicketMicros 往前推
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// storedPermits 减去被拿走的部分
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
从 reserve 的流程可看到,获取 permits 的时候,其实是获取了两部分,一部分来自于存量 storedPermits,存量不够的话,另一部分来自于预占未来的 freshPermits。