前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Guava RateLimiter限流源码解析和实例应用

Guava RateLimiter限流源码解析和实例应用

作者头像
算法之名
发布2019-08-20 10:38:41
7530
发布2019-08-20 10:38:41
举报
文章被收录于专栏:算法之名算法之名

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流

  • 缓存 缓存的目的是提升系统访问速度和增大系统处理容量
  • 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开
  • 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

常用的限流算法

漏桶算法

漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

令牌桶算法

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

RateLimiter使用以及源码解析

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,而且十分高效。

Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值) 两种模式实现思路类似,主要区别在等待时间的计算上,本篇重点介绍SmoothBursty

代码语言:javascript
复制
public static RateLimiter create(double permitsPerSecond) {
  /*
   * 默认的RateLimiter配置可以保存最多一秒钟的未使用许可证
   */
  return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}

RateLimiter是一个抽象类,SmoothBursty是其子类SmoothRateLimiter的子类,其两个构造参数含义如下

  • SleepingStopwatch:guava中的一个时钟类实例,会通过这个来计算时间及令牌
  • maxBurstSeconds:官方解释,在ReteLimiter未使用时,最多保存几秒的令牌,默认是1
代码语言:javascript
复制
@VisibleForTesting
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  //根据每秒向桶中放入令牌的数量来设置当前存储令牌数
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}
代码语言:javascript
复制
public final void setRate(double permitsPerSecond) {
  //如果每秒向桶中放入令牌的数量(permitsPerSecond)大于0且为数字,通过检查,否则抛出参数异常
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  //对每个线程进行互斥,建立互斥对象的锁定
  synchronized (mutex()) {
    //由各项参数更新当前存储令牌数
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}
代码语言:javascript
复制
public static void checkArgument(boolean expression, @Nullable Object errorMessage) {
  if (!expression) {
    throw new IllegalArgumentException(String.valueOf(errorMessage));
  }
}
代码语言:javascript
复制
private volatile Object mutexDoNotUseDirectly; //线程安全的互斥对象
代码语言:javascript
复制
private Object mutex() {
  Object mutex = mutexDoNotUseDirectly;
  if (mutex == null) {
    synchronized (this) {
      mutex = mutexDoNotUseDirectly;
      if (mutex == null) {
        mutexDoNotUseDirectly = mutex = new Object();
      }
    }
  }
  return mutex;
}

在SmoothBursty中

代码语言:javascript
复制
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
  //若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据 
  resync(nowMicros);
  //更新添加1个令牌的时间间隔(单位微妙)为1000000微妙(1秒)除以每秒放入令牌桶中的数量
  double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
  this.stableIntervalMicros = stableIntervalMicros;
  //将令牌桶中可以存储令牌的时间参数加上更新当前可以存储的令牌数
  doSetRate(permitsPerSecond, stableIntervalMicros);
}
代码语言:javascript
复制
private long nextFreeTicketMicros = 0L; //下一次请求可以获取令牌的起始时间
代码语言:javascript
复制
double storedPermits; //当前存储令牌数
代码语言:javascript
复制
double maxPermits; //最大存储令牌数 = maxBurstSeconds * stableIntervalMicros
代码语言:javascript
复制
double stableIntervalMicros; //添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
代码语言:javascript
复制
final double maxBurstSeconds; //在RateLimiter未使用时,最多存储几秒的令牌
代码语言:javascript
复制
private void resync(long nowMicros) {
  //如果当前时间大于下一次请求可以获取令牌的起始时间
  if (nowMicros > nextFreeTicketMicros) {
    //比较最大存储令牌数和当前存储的令牌数加上现在要增加的令牌数的大小,小的那个赋给当年存储令牌数,即增加令牌数与当前令牌数之和不能大于最大令牌数
    storedPermits = min(maxPermits,
        storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
    //将当前时间赋给下一次请求可以获取的起始时间
    nextFreeTicketMicros = nowMicros;
  }
}
代码语言:javascript
复制
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  //将最大存储令牌数存入临时副本
  double oldMaxPermits = this.maxPermits;
  //更新最大存储令牌数为存放令牌的秒数乘以每秒向桶中放入的令牌数
  maxPermits = maxBurstSeconds * permitsPerSecond;
  //如果最大存储令牌数的临时副本为正无穷大
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    //更新当前存储令牌数为最大存储令牌数
    storedPermits = maxPermits;
  } else { //如果最大存储令牌数的临时副本不为正无穷大
    //如果最大存储令牌数的临时副本为0,则更新当前存储令牌数为0,否则
    //更新当前存储令牌数为当前存储令牌数乘以最大存储令牌数除以最大存储令牌数的临时副本数
    storedPermits = (oldMaxPermits == 0.0)
        ? 0.0 // initial state
        : storedPermits * maxPermits / oldMaxPermits;
  }
}

我们再来看一下RateLimiter的tryAcquire方法

代码语言:javascript
复制
public boolean tryAcquire(long timeout, TimeUnit unit) {
  //尝试在timeout时间内获取令牌,如果可以则挂起(睡眠)等待相应时间并返回true,否则立即返回false 
  return tryAcquire(1, timeout, unit);
}
代码语言:javascript
复制
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  //取等待时间的微妙数与0比较取大值赋给超时时间
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  //如果检查时间>0,通过检查,此处为1
  checkPermits(permits);
  long microsToWait;
  //建立互斥对象加锁互斥
  synchronized (mutex()) {
    //获取当前时间
    long nowMicros = stopwatch.readMicros();
    //如果下一次请求可以获取令牌的起始时间减去等待时间大于当前时间
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false; //返回false
    } else { //如果下一次请求可以获取令牌的起始时间减去等待时间小于等于当前时间
      //获取下一次请求可以获取令牌的起始时间减去当前时间的值与0之间的大值并刷新各参数(下一次请求可以获取令牌的起始时间、当前存储令牌数)
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  //线程休眠microsToWait时间
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  //返回true
  return true;
}
代码语言:javascript
复制
private static int checkPermits(int permits) {
  checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
  return permits;
}
代码语言:javascript
复制
final Stopwatch stopwatch = Stopwatch.createStarted();
代码语言:javascript
复制
@Override
long readMicros() {
  return stopwatch.elapsed(MICROSECONDS);
}
代码语言:javascript
复制
private boolean canAcquire(long nowMicros, long timeoutMicros) {
  //返回下一次请求可以获取令牌的起始时间减去等待时间是否小于等于当前时间
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
代码语言:javascript
复制
final long reserveAndGetWaitLength(int permits, long nowMicros) {
  //获取下一次请求可以获取令牌的起始时间并更新各参数(下一次请求可以获取令牌的起始时间、当前存储令牌数)
  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  //返回下一次请求可以获取令牌的起始时间减去当前时间的值与0之间的大值
  return max(momentAvailable - nowMicros, 0);
}
代码语言:javascript
复制
@Override
void sleepMicrosUninterruptibly(long micros) {
  if (micros > 0) {
    Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
  }
}

在SmoothBursty中

代码语言:javascript
复制
@Override
final long queryEarliestAvailable(long nowMicros) {
  //返回下一次请求可以获取令牌的起始时间
  return nextFreeTicketMicros;
}
代码语言:javascript
复制
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  //若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据
  resync(nowMicros);
  //获取下一次请求可以获取令牌的起始时间
  long returnValue = nextFreeTicketMicros;
  //在允许的请求数(这里为1)和当前存储令牌数间取小值赋给允许消费的存储令牌数(storedPermitsToSpend)
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  //将允许的请求数减去允许消费的存储令牌数赋给允许刷新数(freshPermits)
  double freshPermits = requiredPermits - storedPermitsToSpend;
  //将允许刷新数乘以添加令牌时间间隔赋给等待微妙数(waitMicros)
  long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
      + (long) (freshPermits * stableIntervalMicros);
  //更新下一次请求可以获取令牌的起始时间为下一次请求可以获取令牌的起始时间加上等待微妙数
  this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
  //更新当前存储令牌数为当前存储令牌数减去允许消费的存储令牌数
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
代码语言:javascript
复制
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}

在Uninterruptibles中

代码语言:javascript
复制
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
  //定义是否已中断为false
  boolean interrupted = false;
  try {
    //将下一次请求可以获取令牌的起始时间减去当前时间的值转化为纳秒定义为remainingNanos
    long remainingNanos = unit.toNanos(sleepFor);
    //将系统的纳秒值加上该转化值为end
    long end = System.nanoTime() + remainingNanos;
    while (true) {
      try {
        //线程休眠remainingNanos时间
        NANOSECONDS.sleep(remainingNanos);
        return;
      } catch (InterruptedException e) {
        //如果发生中断异常,将是否已中断更新为true
        interrupted = true;
        //更新remainingNanos为end减去系统的纳秒值,并进入下一轮循环
        remainingNanos = end - System.nanoTime();
      }
    }
  } finally {
    //如果发生中断异常
    if (interrupted) {
      //当前线程中断
      Thread.currentThread().interrupt();
    }
  }
}

源码分析就是这些了,现在我们来看一下Guava RateLimiter的应用,在APO中拦截Controller,并进行限流

在pom中添加

代码语言:javascript
复制
<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>18.0</version>
</dependency>
代码语言:javascript
复制
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

标签

代码语言:javascript
复制
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LxRateLimit {
    /**
     *
     * @return
     */
    String value() default "";

    /**
     * 每秒向桶中放入令牌的数量   默认最大即不做限流
     * @return
     */
    double perSecond() default Double.MAX_VALUE;

    /**
     * 获取令牌的等待时间  默认0
     * @return
     */
    int timeOut() default 0;

    /**
     * 超时时间单位
     * @return
     */
    TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;
}

AOP类

代码语言:javascript
复制
@Slf4j
@Aspect
@Component
public class LxRateLimitAspect {
    private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);

    /**
     * 带有指定注解切入
     */
    @ResponseBody
    @Around(value = "@annotation(com.guanjian.annotation.LxRateLimit)")
    public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable {
        log.info("拦截到了{}方法...", pjp.getSignature().getName());
        Signature signature = pjp.getSignature();
        MethodSignature methodSignature = (MethodSignature)signature;
        //获取目标方法
        Method targetMethod = methodSignature.getMethod();
        if (targetMethod.isAnnotationPresent(LxRateLimit.class)) {
            //获取目标方法的@LxRateLimit注解
            LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class);
            rateLimiter.setRate(lxRateLimit.perSecond());
            if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit()))
                return "服务器繁忙,请稍后再试!";
        }
        return pjp.proceed();
    }
}

Controller

代码语言:javascript
复制
@RestController
public class AnnotationTestController {
    @GetMapping("/testannotation")
    @LxRateLimit(perSecond = 2000.0, timeOut = 500) //此处限速为2000qps
    public String testAnnotation() {
        return "get token success";
    }
}

我们先在Controller中将@LxRateLimit(perSecond = 2000.0, timeOut = 500)注释掉

运行Jmeter进行压测

我们启用500线程压测

压测结果

吞吐量为7867.8qps,此时是不限速的

现在我们恢复Controller中的@LxRateLimit(perSecond = 2000.0, timeOut = 500)

吞吐量为2067.7qps

系统日志可以看到大量的拦截

2019-05-26 21:24:33.370 INFO 11092 --- [o-8080-exec-176] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.370 INFO 11092 --- [io-8080-exec-27] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [o-8080-exec-128] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [o-8080-exec-191] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.377 INFO 11092 --- [io-8080-exec-36] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.379 INFO 11092 --- [o-8080-exec-123] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.379 INFO 11092 --- [io-8080-exec-61] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.380 INFO 11092 --- [io-8080-exec-19] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.382 INFO 11092 --- [io-8080-exec-77] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.384 INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法...

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 常用的限流算法
    • 漏桶算法
      • 令牌桶算法
      • RateLimiter使用以及源码解析
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档