聊聊Guava的RateLimiter

本文主要研究一下Guava的RateLimiter

RateLimiter

guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/RateLimiter.java

@Beta
@GwtIncompatible
public abstract class RateLimiter {

    //......
 /**
   * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request
   * can be granted. Tells the amount of time slept, if any.
   *
   * @param permits the number of permits to acquire
   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   * @since 16.0 (present in 13.0 with {@code void} return type})
   */
  @CanIgnoreReturnValue
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

  /**
   * Reserves the given number of permits from this {@code RateLimiter} for future use, returning
   * the number of microseconds until the reservation can be consumed.
   *
   * @return time in microseconds to wait until the resource can be acquired, never negative
   */
  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

  private static void checkPermits(int permits) {
    checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
  }

  /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
      long nowMicros = stopwatch.readMicros();
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }

  private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
  }

  /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

  /**
   * Returns the earliest time that permits are available (with one caveat).
   *
   * @return the time that permits are available, or, if permits are available immediately, an
   *     arbitrary past or present time
   */
  abstract long queryEarliestAvailable(long nowMicros);

  /**
   * Reserves the requested number of permits and returns the time that those permits can be used
   * (with one caveat).
   *
   * @return the time that the permits may be used, or, if the permits may be used immediately, an
   *     arbitrary past or present time
   */
  abstract long reserveEarliestAvailable(int permits, long nowMicros);

  //......
}
  • 这里主要看acquire以及tryAcquire方法
  • acquire主要依赖reserve方法,先调用reserveAndGetWaitLength,最后是调用reserveEarliestAvailable方法
  • tryAcquire也会调用reserveAndGetWaitLength,最后也是调用reserveEarliestAvailable方法
  • reserveEarliestAvailable是抽象方法,由子类去实现

SmoothRateLimiter

guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/SmoothRateLimiter.java

@GwtIncompatible
abstract class SmoothRateLimiter extends RateLimiter {
  //......
  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

  /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
  void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }

  /**
   * Translates a specified portion of our currently stored permits which we want to spend/acquire,
   * into a throttling time. Conceptually, this evaluates the integral of the underlying function we
   * use, for the range of [(storedPermits - permitsToTake), storedPermits].
   *
   * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
   */
  abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

  /**
   * Returns the number of microseconds during cool down that we have to wait to get a new permit.
   */
  abstract double coolDownIntervalMicros();

  //......
}
  • SmoothRateLimiter是RateLimiter的抽象子类,是平滑限流实现类的抽象父类
  • 这里首先调用resync方法(用于处理根据速率添加token的逻辑),然后再去计算permits扣减以及等待时间的计算
  • 这里调用了两个抽象方法,分别是coolDownIntervalMicros以及storedPermitsToWaitTime

SmoothRateLimiter的两个子类

SmoothRateLimiter有两个内部静态子类,分别是SmoothBursty以及SmoothWarmingUp

SmoothBursty

  /**
   * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
   * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
   * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
   * seconds, we can save up to 2 * 10 = 20 permits.
   */
  static final class SmoothBursty extends SmoothRateLimiter {
    /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
    final double maxBurstSeconds;

    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
      super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
    }

    @Override
    double coolDownIntervalMicros() {
      return stableIntervalMicros;
    }
  }
  • SmoothBursty是一个zero throttling的”bursty” RateLimiter
  • coolDownIntervalMicros返回的是stableIntervalMicros,而storedPermitsToWaitTime返回的为0

SmoothWarmingUp

  static final class SmoothWarmingUp extends SmoothRateLimiter {
    private final long warmupPeriodMicros;
    /**
     * The slope of the line from the stable interval (when permits == 0), to the cold interval
     * (when permits == maxPermits)
     */
    private double slope;

    private double thresholdPermits;
    private double coldFactor;

    SmoothWarmingUp(
        SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
      super(stopwatch);
      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
      this.coldFactor = coldFactor;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        // TODO(cpovirk): Figure out a good name for this variable.
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }

    private double permitsToTime(double permits) {
      return stableIntervalMicros + permits * slope;
    }

    @Override
    double coolDownIntervalMicros() {
      return warmupPeriodMicros / maxPermits;
    }
  }
  • coolDownIntervalMicros返回的是warmupPeriodMicros / maxPermits,而storedPermitsToWaitTime的计算相对复杂一些
  • SmoothBursty是基于token bucket算法,允许一定量的bursty流量,但是有些场景需要bursty流量更平滑些,这就需要使用SmoothWarmingUp
  • SmoothWarmingUp有一个warmup period,为thresholdPermits到maxPermits的这段范围 * <pre> * ^ throttling * | * cold + / * interval | /. * | / . * | / . ← "warmup period" is the area of the trapezoid between * | / . thresholdPermits and maxPermits * | / . * | / . * | / . * stable +----------/ WARM . * interval | . UP . * | . PERIOD. * | . . * 0 +----------+-------+--------------→ storedPermits * 0 thresholdPermits maxPermits * </pre>

主要涉及如下几个公式

coldInterval = coldFactor * stableInterval.
thresholdPermits = 0.5 * warmupPeriod / stableInterval
maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)
  • coldFactor默认是3
  • stableInterval代码以毫秒计算,即stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond

小结

  • Guava的RateLimiter(SmoothRateLimiter)基于token bucket算法实现,具体有两个实现类,分别是SmoothBursty以及SmoothWarmingUp
  • SmoothBursty初始化的storedPermits为0,可以支持burst到maxPermits
  • SmoothWarmingUp初始化的storedPermits为maxPermits(thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros)),也支持burst,但是总体相对平滑

doc

  • RateLimiter.java
  • RateLimiter.html
  • Rate-Limiter

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-08-31

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏一棹烟波

OpenGL进行简单的通用计算实例

博主作为OpenGL新手,最近要用OpenGL进行并行的数据计算,突然发现这样的资料还是很少的,大部分资料和参考书都是讲用OpenGL进行渲染的。好不容易找到一...

2837
来自专栏图形学与OpenGL

3.6.2 编程实例-河南地图绘制

#include <iostream> #include <fstream> #include<vector> #include <GL/glut.h> usi...

1171
来自专栏编程微刊

【前端图表】echarts实现散点图x轴时间轴

5143
来自专栏生信技能树

使用ESTIMATE来对转录组表达数据根据stromal和immune细胞比例估算肿瘤纯度

ESTIMATE (Estimation of STromal and Immune cells in MAlignant Tumor tissues usin...

4832
来自专栏智能计算时代

IBM Watson提供的认知计算服务介绍

Cognitive Service Introduction Twitter:@huiwenhan Weibo:@huiwenhan Agenda Wats...

3578
来自专栏小特工作室

基于iTextSharp的PDF文档操作

  公司是跨境电商,需要和各种物流打交道,需要把东西交给物流,让他们发到世界各地。其中需要物流公司提供一个运单号,来追踪货物到达哪里?!   最近在和DHL物流...

26910
来自专栏GIS讲堂

ArcGIS Image Server简介以及OL2中的加载

本文讲述Arcgis Image Server相关以及在OL2中如何加载Arcgis Server发布的影像服务。

1142
来自专栏CreateAMind

Ray RLlib: Scalable Reinforcement Learning

1612
来自专栏IT派

开源|人脸检测的C / C ++源代

人脸检测的C/C++源代码,曾发表于 OPENCV 的 MAILING LIST,主要是对OPENCV 3.1 版本发布的代码做了一些速度上的优化,并且解决了内...

3935
来自专栏CreateAMind

https://github.com/CPFL/Autoware 自动驾驶框架比较齐全

Integrated open-source software for urban autonomous driving, maintained by Tier...

3782

扫码关注云+社区

领取腾讯云代金券