客户端限流 PK 分布式限流
客户端限流:当应用为单点应用时,只要应用进行了限流,那么应用所依赖的各种服务也都得到了保护。
分布式限流:线上业务出于各种原因考虑,多是分布式系统,单节点的限流仅能保护自身节点,但无法保护应用依赖的各种服务,并且在进行节点扩容、缩容时也无法准确控制整个服务的请求限制
Guava是一个客户端组件,在其多线程模块下提供了以RateLimiter为首的几个限流支持类。它只能对“当前”服务进行限流,即它不属于分布式限流的解决方案。
@RestController@RequestMapping("/guava")@Slf4jpublic class GuavaLimitController {
private static final Integer RATE_LIMIT = 10; RateLimiter rateLimiter = RateLimiter.create(RATE_LIMIT);
@GetMapping("/try_require_rate_limit") public String hello() { boolean limit = false; try { limit = rateLimiter.tryAcquire(); } catch (Exception e) { log.error("limit error", e); }
if (limit) { return "hello guava rate limit success..."; } throw new RuntimeException("hello guava rate limit fall..."); }
@GetMapping("/acquire_rate_limit") public String hello2() { double acquireTimeGap = rateLimiter.acquire(); return "hello guava rate limit success, acquireTimeGap: " + acquireTimeGap; }
}
Jmeter将在10秒内逐渐启动30个线程进行并发测试,每个线程执行测试计划20次,共20 * 30 = 600次。
说明:
ObjectHeader是和对象自身定义数据无关的额外存储成本,也是JVM实现轻量级锁和偏向锁的关键。
public boolean tryAcquire() { //【1】 return tryAcquire(1, 0, MICROSECONDS);}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); // 【2】 checkPermits(permits); long microsToWait; // 【3】锁:线程安全下,执行申请令牌的操作 synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); // 【4】判断是否可以获取到令牌 if (!canAcquire(nowMicros, timeoutMicros)) { // 【5】不能获取到,则直接返回false return false; } else { //【6】能获取到,则取出令牌并设置 下次令牌发布时间 microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } // 【9】线程中断microsToWait时间 stopwatch.sleepMicrosUninterruptibly(microsToWait); return true;}
// 获取对象头作为锁对象private Object mutex() { Object mutex = mutexDoNotUseDirectly; if (mutex == null) { synchronized (this) { mutex = mutexDoNotUseDirectly; if (mutex == null) { mutexDoNotUseDirectly = mutex = new Object(); } } } return mutex;}
// 【4】是否可以获取令牌private boolean canAcquire(long nowMicros, long timeoutMicros) { // now当前时间,timeoutMicros为0,那么这里就是比较上次设置好的令牌发布时间 和 当前时间的大小而已 // nextFreeTicketMicros > nowMicros:返回false,还不到获取令牌时机 // nextFreeTicketMicros < nowMicros:返回true,已经过了,可以获取令牌时机 return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;}
final long reserveAndGetWaitLength(int permits, long nowMicros) { //【7】获取最早的令牌发布时间 long momentAvailable = reserveEarliestAvailable(permits, nowMicros); // 【8】如果<0,说明当下时刻有令牌可获得;否则>0,说明还需要等待差值(momentAvailable - nowMicros) return max(momentAvailable - nowMicros, 0);}
private static void checkPermits(int permits) { checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);}
//获取最早的令牌发布时间@Overridefinal long reserveEarliestAvailable(int requiredPermits, long nowMicros) { //调用resync(nowMicros)方法,同步限流器的状态。 resync(nowMicros); //初始化returnValue为nextFreeTicketMicros,表示当前最早可用的许可时间。 long returnValue = nextFreeTicketMicros; //计算storedPermitsToSpend,即从已存储的许可中可以使用的许可数量。 //这是所需许可数量(requiredPermits)和当前已存储许可数量(this.storedPermits)中的较小值。 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); //计算freshPermits,即还需要获取的新许可数量。 //这是所需许可数量(requiredPermits)减去已存储许可数量(storedPermitsToSpend)。 double freshPermits = requiredPermits - storedPermitsToSpend; // 计算waitMicros,即需要等待的时间。 //这是已存储许可的等待时间(通过调用storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)计算)加上新许可的等待时间(通过将freshPermits乘以稳定间隔时间(stableIntervalMicros)计算)。 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); //更新nextFreeTicketMicros,将其增加waitMicros,表示预留许可后的最早可用时间。 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); //更新this.storedPermits,减去已使用的许可数量(storedPermitsToSpend)。 this.storedPermits -= storedPermitsToSpend; //返回returnValue,即预留许可的最早可用时间。 return returnValue;}
mutexDoNotUseDirectly
对象只被初始化一次。这种模式在多线程环境下非常有用,因为它可以减少同步的开销。
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
// 【1】校验令牌需要多久可以获取到
long microsToWait = reserve(permits);
// 【2】此处将会阻塞
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
// 【3】加锁
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 校验当前时间和下次分发令牌时间
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
/** 阻塞逻辑 **/
abstract static class SleepingStopwatch {
protected SleepingStopwatch() {
}
protected abstract long readMicros();
protected abstract void sleepMicrosUninterruptibly(long var1);
public static SleepingStopwatch createFromSystemTimer() {
return new SleepingStopwatch() {
final Stopwatch stopwatch = Stopwatch.createStarted();
protected long readMicros() {
return this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
}
protected void sleepMicrosUninterruptibly(long micros) {
if (micros > 0L) {
//Thread.currentThread().interrupt()是Java中用于中断当前线程的方法。当一个线程被中断时,它的中断状态将被设置为true。中断是一种协作机制,它允许一个线程通知另一个线程某个条件已经发生,从而允许接收中断的线程采取适当的行动。
Uninterruptibles.sleepUninterruptibly(micros, TimeUnit.MICROSECONDS);
}
}
};
}
}
源码解析:
acquire()
和 tryAcquire()
是 RateLimiter
类中的两个主要方法,它们的区别如下:
acquire()
方法时,如果当前请求速率未超过限制,则请求会被立即处理,并且方法会立即返回。acquire()
方法会使当前线程阻塞,直到有可用的请求配额为止。tryAcquire()
方法时,如果当前请求速率未超过限制,则请求会被立即处理,并且方法会立即返回 true
。tryAcquire()
方法不会阻塞当前线程,而是立即返回 false
。总结一下,当使用客户端限流时,两种方法:acquire()
和 tryAcquire()
的主要区别在于是否阻塞当前线程。acquire()
会阻塞线程直到有可用的请求配额,而 tryAcquire()
不会阻塞线程,而是立即返回结果。
在实际应用中,我们可以根据需求和场景选择合适的方法。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。