在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流
缓存
缓存的目的是提升系统访问速度和增大系统处理容量降级
降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开限流
限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,而且十分高效。
Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值) 两种模式实现思路类似,主要区别在等待时间的计算上,本篇重点介绍SmoothBursty
public static RateLimiter create(double permitsPerSecond) {
/*
* 默认的RateLimiter配置可以保存最多一秒钟的未使用许可证
*/
return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}
RateLimiter是一个抽象类,SmoothBursty是其子类SmoothRateLimiter的子类,其两个构造参数含义如下
@VisibleForTesting
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
//根据每秒向桶中放入令牌的数量来设置当前存储令牌数
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
public final void setRate(double permitsPerSecond) {
//如果每秒向桶中放入令牌的数量(permitsPerSecond)大于0且为数字,通过检查,否则抛出参数异常
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
//对每个线程进行互斥,建立互斥对象的锁定
synchronized (mutex()) {
//由各项参数更新当前存储令牌数
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
public static void checkArgument(boolean expression, @Nullable Object errorMessage) {
if (!expression) {
throw new IllegalArgumentException(String.valueOf(errorMessage));
}
}
private volatile Object mutexDoNotUseDirectly; //线程安全的互斥对象
private Object mutex() {
Object mutex = mutexDoNotUseDirectly;
if (mutex == null) {
synchronized (this) {
mutex = mutexDoNotUseDirectly;
if (mutex == null) {
mutexDoNotUseDirectly = mutex = new Object();
}
}
}
return mutex;
}
在SmoothBursty中
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
//若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据
resync(nowMicros);
//更新添加1个令牌的时间间隔(单位微妙)为1000000微妙(1秒)除以每秒放入令牌桶中的数量
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
//将令牌桶中可以存储令牌的时间参数加上更新当前可以存储的令牌数
doSetRate(permitsPerSecond, stableIntervalMicros);
}
private long nextFreeTicketMicros = 0L; //下一次请求可以获取令牌的起始时间
double storedPermits; //当前存储令牌数
double maxPermits; //最大存储令牌数 = maxBurstSeconds * stableIntervalMicros
double stableIntervalMicros; //添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
final double maxBurstSeconds; //在RateLimiter未使用时,最多存储几秒的令牌
private void resync(long nowMicros) {
//如果当前时间大于下一次请求可以获取令牌的起始时间
if (nowMicros > nextFreeTicketMicros) {
//比较最大存储令牌数和当前存储的令牌数加上现在要增加的令牌数的大小,小的那个赋给当年存储令牌数,即增加令牌数与当前令牌数之和不能大于最大令牌数
storedPermits = min(maxPermits,
storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
//将当前时间赋给下一次请求可以获取的起始时间
nextFreeTicketMicros = nowMicros;
}
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
//将最大存储令牌数存入临时副本
double oldMaxPermits = this.maxPermits;
//更新最大存储令牌数为存放令牌的秒数乘以每秒向桶中放入的令牌数
maxPermits = maxBurstSeconds * permitsPerSecond;
//如果最大存储令牌数的临时副本为正无穷大
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
//更新当前存储令牌数为最大存储令牌数
storedPermits = maxPermits;
} else { //如果最大存储令牌数的临时副本不为正无穷大
//如果最大存储令牌数的临时副本为0,则更新当前存储令牌数为0,否则
//更新当前存储令牌数为当前存储令牌数乘以最大存储令牌数除以最大存储令牌数的临时副本数
storedPermits = (oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
我们再来看一下RateLimiter的tryAcquire方法
public boolean tryAcquire(long timeout, TimeUnit unit) {
//尝试在timeout时间内获取令牌,如果可以则挂起(睡眠)等待相应时间并返回true,否则立即返回false
return tryAcquire(1, timeout, unit);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
//取等待时间的微妙数与0比较取大值赋给超时时间
long timeoutMicros = max(unit.toMicros(timeout), 0);
//如果检查时间>0,通过检查,此处为1
checkPermits(permits);
long microsToWait;
//建立互斥对象加锁互斥
synchronized (mutex()) {
//获取当前时间
long nowMicros = stopwatch.readMicros();
//如果下一次请求可以获取令牌的起始时间减去等待时间大于当前时间
if (!canAcquire(nowMicros, timeoutMicros)) {
return false; //返回false
} else { //如果下一次请求可以获取令牌的起始时间减去等待时间小于等于当前时间
//获取下一次请求可以获取令牌的起始时间减去当前时间的值与0之间的大值并刷新各参数(下一次请求可以获取令牌的起始时间、当前存储令牌数)
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
//线程休眠microsToWait时间
stopwatch.sleepMicrosUninterruptibly(microsToWait);
//返回true
return true;
}
private static int checkPermits(int permits) {
checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
return permits;
}
final Stopwatch stopwatch = Stopwatch.createStarted();
@Override
long readMicros() {
return stopwatch.elapsed(MICROSECONDS);
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
//返回下一次请求可以获取令牌的起始时间减去等待时间是否小于等于当前时间
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
//获取下一次请求可以获取令牌的起始时间并更新各参数(下一次请求可以获取令牌的起始时间、当前存储令牌数)
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
//返回下一次请求可以获取令牌的起始时间减去当前时间的值与0之间的大值
return max(momentAvailable - nowMicros, 0);
}
@Override
void sleepMicrosUninterruptibly(long micros) {
if (micros > 0) {
Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
}
}
在SmoothBursty中
@Override
final long queryEarliestAvailable(long nowMicros) {
//返回下一次请求可以获取令牌的起始时间
return nextFreeTicketMicros;
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
//若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据
resync(nowMicros);
//获取下一次请求可以获取令牌的起始时间
long returnValue = nextFreeTicketMicros;
//在允许的请求数(这里为1)和当前存储令牌数间取小值赋给允许消费的存储令牌数(storedPermitsToSpend)
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
//将允许的请求数减去允许消费的存储令牌数赋给允许刷新数(freshPermits)
double freshPermits = requiredPermits - storedPermitsToSpend;
//将允许刷新数乘以添加令牌时间间隔赋给等待微妙数(waitMicros)
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
//更新下一次请求可以获取令牌的起始时间为下一次请求可以获取令牌的起始时间加上等待微妙数
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
//更新当前存储令牌数为当前存储令牌数减去允许消费的存储令牌数
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
在Uninterruptibles中
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
//定义是否已中断为false
boolean interrupted = false;
try {
//将下一次请求可以获取令牌的起始时间减去当前时间的值转化为纳秒定义为remainingNanos
long remainingNanos = unit.toNanos(sleepFor);
//将系统的纳秒值加上该转化值为end
long end = System.nanoTime() + remainingNanos;
while (true) {
try {
//线程休眠remainingNanos时间
NANOSECONDS.sleep(remainingNanos);
return;
} catch (InterruptedException e) {
//如果发生中断异常,将是否已中断更新为true
interrupted = true;
//更新remainingNanos为end减去系统的纳秒值,并进入下一轮循环
remainingNanos = end - System.nanoTime();
}
}
} finally {
//如果发生中断异常
if (interrupted) {
//当前线程中断
Thread.currentThread().interrupt();
}
}
}
源码分析就是这些了,现在我们来看一下Guava RateLimiter的应用,在APO中拦截Controller,并进行限流
在pom中添加
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
标签
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LxRateLimit {
/**
*
* @return
*/
String value() default "";
/**
* 每秒向桶中放入令牌的数量 默认最大即不做限流
* @return
*/
double perSecond() default Double.MAX_VALUE;
/**
* 获取令牌的等待时间 默认0
* @return
*/
int timeOut() default 0;
/**
* 超时时间单位
* @return
*/
TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;
}
AOP类
@Slf4j
@Aspect
@Component
public class LxRateLimitAspect {
private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);
/**
* 带有指定注解切入
*/
@ResponseBody
@Around(value = "@annotation(com.guanjian.annotation.LxRateLimit)")
public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable {
log.info("拦截到了{}方法...", pjp.getSignature().getName());
Signature signature = pjp.getSignature();
MethodSignature methodSignature = (MethodSignature)signature;
//获取目标方法
Method targetMethod = methodSignature.getMethod();
if (targetMethod.isAnnotationPresent(LxRateLimit.class)) {
//获取目标方法的@LxRateLimit注解
LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class);
rateLimiter.setRate(lxRateLimit.perSecond());
if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit()))
return "服务器繁忙,请稍后再试!";
}
return pjp.proceed();
}
}
Controller
@RestController
public class AnnotationTestController {
@GetMapping("/testannotation")
@LxRateLimit(perSecond = 2000.0, timeOut = 500) //此处限速为2000qps
public String testAnnotation() {
return "get token success";
}
}
我们先在Controller中将@LxRateLimit(perSecond = 2000.0, timeOut = 500)注释掉
运行Jmeter进行压测
我们启用500线程压测
压测结果
吞吐量为7867.8qps,此时是不限速的
现在我们恢复Controller中的@LxRateLimit(perSecond = 2000.0, timeOut = 500)
吞吐量为2067.7qps
系统日志可以看到大量的拦截
2019-05-26 21:24:33.370 INFO 11092 --- [o-8080-exec-176] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.370 INFO 11092 --- [io-8080-exec-27] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [o-8080-exec-128] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [o-8080-exec-191] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.377 INFO 11092 --- [io-8080-exec-36] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.379 INFO 11092 --- [o-8080-exec-123] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.379 INFO 11092 --- [io-8080-exec-61] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.380 INFO 11092 --- [io-8080-exec-19] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.382 INFO 11092 --- [io-8080-exec-77] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.384 INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法...