前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码解析:Guava客户端限流

源码解析:Guava客户端限流

原创
作者头像
后台技术汇
发布2024-09-15 08:00:05
1040
发布2024-09-15 08:00:05
举报
文章被收录于专栏:后台技术汇

1、背景

客户端限流 PK 分布式限流

客户端限流:当应用为单点应用时,只要应用进行了限流,那么应用所依赖的各种服务也都得到了保护。

分布式限流:线上业务出于各种原因考虑,多是分布式系统,单节点的限流仅能保护自身节点,但无法保护应用依赖的各种服务,并且在进行节点扩容、缩容时也无法准确控制整个服务的请求限制

  1. 网关层限流 服务网关,作为整个分布式链路中的第一道关卡,承接了所有用户来访请求。我们在网关层进行限流,就可以达到了整体限流的目的了。目前,主流的网关层有以软件为代表的Nginx,还有Spring Cloud中的Gateway和Zuul这类网关层组件,也有以硬件为代表的F5。
  2. 中间件限流
    1. 将限流信息存储在分布式环境中某个中间件里(比如Redis缓存),每个组件都可以从这里获取到当前时刻的流量统计,从而决定是拒绝服务还是放行流量。
  3. 限流组件
目前也有一些开源组件提供了限流的功能,比如Sentinel就是一个不错的选择。Sentinel是阿里出品的开源组件,并且包含在了Spring Cloud Alibaba组件库中。Hystrix也具有限流的功能。

2、客户端应用案例

Guava是一个客户端组件,在其多线程模块下提供了以RateLimiter为首的几个限流支持类。它只能对“当前”服务进行限流,即它不属于分布式限流的解决方案。

代码语言:javascript
复制
@RestController@RequestMapping("/guava")@Slf4jpublic class GuavaLimitController {
    private static final Integer RATE_LIMIT = 10;    RateLimiter rateLimiter = RateLimiter.create(RATE_LIMIT);
    @GetMapping("/try_require_rate_limit")    public String hello() {        boolean limit = false;        try {            limit = rateLimiter.tryAcquire();        } catch (Exception e) {            log.error("limit error", e);        }
        if (limit) {            return "hello guava rate limit success...";        }        throw new RuntimeException("hello guava rate limit fall...");    }
    @GetMapping("/acquire_rate_limit")    public String hello2() {        double acquireTimeGap = rateLimiter.acquire();        return "hello guava rate limit success, acquireTimeGap: " + acquireTimeGap;    }
}

3、Jmeter测试效果

Jmeter将在10秒内逐渐启动30个线程进行并发测试,每个线程执行测试计划20次,共20 * 30 = 600次。

3.1 非阻塞限流

3.2 阻塞限流

说明:

  • 在10s的600次请求下(QPS约为60)
    • 阻塞请求出现20%概率请求因为获取不到令牌而被限流
    • 非阻塞请求则因为阻塞超时而失败,大部分请求都会阻塞2s左右,然后根据获取到令牌情况成功或者失败
  • 由上可以得出Guava客户端限流,acquire与tryAcquire确实发挥了不同效果的作用。

3.3 源码分析

轻量级锁:(参考:JVM的知识点-对象头ObjectHeader,超链接)

ObjectHeader是和对象自身定义数据无关的额外存储成本,也是JVM实现轻量级锁和偏向锁的关键。

(1)tryAcquire源码

代码语言:javascript
复制
public boolean tryAcquire() {    //【1】  return tryAcquire(1, 0, MICROSECONDS);}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {  long timeoutMicros = max(unit.toMicros(timeout), 0);  // 【2】  checkPermits(permits);  long microsToWait;  // 【3】锁:线程安全下,执行申请令牌的操作  synchronized (mutex()) {    long nowMicros = stopwatch.readMicros();    // 【4】判断是否可以获取到令牌    if (!canAcquire(nowMicros, timeoutMicros)) {        // 【5】不能获取到,则直接返回false      return false;    } else {        //【6】能获取到,则取出令牌并设置 下次令牌发布时间      microsToWait = reserveAndGetWaitLength(permits, nowMicros);    }  }  // 【9】线程中断microsToWait时间  stopwatch.sleepMicrosUninterruptibly(microsToWait);  return true;}
// 获取对象头作为锁对象private Object mutex() {    Object mutex = mutexDoNotUseDirectly;    if (mutex == null) {        synchronized (this) {            mutex = mutexDoNotUseDirectly;            if (mutex == null) {            mutexDoNotUseDirectly = mutex = new Object();            }        }    }    return mutex;}
// 【4】是否可以获取令牌private boolean canAcquire(long nowMicros, long timeoutMicros) {    // now当前时间,timeoutMicros为0,那么这里就是比较上次设置好的令牌发布时间 和 当前时间的大小而已    // nextFreeTicketMicros > nowMicros:返回false,还不到获取令牌时机    // nextFreeTicketMicros < nowMicros:返回true,已经过了,可以获取令牌时机    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;}
final long reserveAndGetWaitLength(int permits, long nowMicros) {    //【7】获取最早的令牌发布时间    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);    // 【8】如果<0,说明当下时刻有令牌可获得;否则>0,说明还需要等待差值(momentAvailable - nowMicros)    return max(momentAvailable - nowMicros, 0);}
private static void checkPermits(int permits) {    checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);}
//获取最早的令牌发布时间@Overridefinal long reserveEarliestAvailable(int requiredPermits, long nowMicros) {    //调用resync(nowMicros)方法,同步限流器的状态。    resync(nowMicros);    //初始化returnValue为nextFreeTicketMicros,表示当前最早可用的许可时间。    long returnValue = nextFreeTicketMicros;    //计算storedPermitsToSpend,即从已存储的许可中可以使用的许可数量。    //这是所需许可数量(requiredPermits)和当前已存储许可数量(this.storedPermits)中的较小值。    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);    //计算freshPermits,即还需要获取的新许可数量。    //这是所需许可数量(requiredPermits)减去已存储许可数量(storedPermitsToSpend)。    double freshPermits = requiredPermits - storedPermitsToSpend;    // 计算waitMicros,即需要等待的时间。    //这是已存储许可的等待时间(通过调用storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)计算)加上新许可的等待时间(通过将freshPermits乘以稳定间隔时间(stableIntervalMicros)计算)。    long waitMicros =      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)          + (long) (freshPermits * stableIntervalMicros);    //更新nextFreeTicketMicros,将其增加waitMicros,表示预留许可后的最早可用时间。    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);    //更新this.storedPermits,减去已使用的许可数量(storedPermitsToSpend)。    this.storedPermits -= storedPermitsToSpend;    //返回returnValue,即预留许可的最早可用时间。    return returnValue;}
  1. 【1】申请一个令牌,并且是非阻塞立刻返回结果
  2. 【2】检查申请的令牌数是否为正数,否则抛异常
  3. 【3】mutex()方法代码是一个双重检查锁定(Double-Checked Locking)模式的实现,用于确保mutexDoNotUseDirectly对象只被初始化一次。这种模式在多线程环境下非常有用,因为它可以减少同步的开销。
  4. 【4】是否可以获取令牌:比较上次设置好的令牌发布时间 和 当前时间的大小而已
    1. nextFreeTicketMicros > nowMicros:返回false,还不到获取令牌时机
    2. nextFreeTicketMicros < nowMicros:返回true,已经过了,可以获取令牌时机
  5. 判断能否获取令牌
    1. 【7】获取最早的令牌发布时间,其核心算法是reserveEarliestAvailable
    2. 【8】如果<0,说明当下时刻有令牌可获得;否则>0,说明还需要等待差值(momentAvailable - nowMicros)
    3. 【5】不能获取到,则直接返回false这是tryRequire和require的区别!!)
    4. 【6】能获取到,则取出令牌并设置 下次令牌发布时间
  6. 【9】线程中断microsToWait时间,由于上面的第【4】步骤canAcquire判断为true,那么能走到第【9】步骤,已经说明是不用等的了。
    1. 没到领牌时间就等一下
    2. 到了就直接返回不中断;

(2)acquire源码

代码语言:javascript
复制

public double acquire() {
  return acquire(1);
}

public double acquire(int permits) {
    // 【1】校验令牌需要多久可以获取到
    long microsToWait = reserve(permits);
    // 【2】此处将会阻塞
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
    checkPermits(permits);
    // 【3】加锁
    synchronized (mutex()) {
        return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
    // 校验当前时间和下次分发令牌时间
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
}

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros);
    
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}

/** 阻塞逻辑 **/
abstract static class SleepingStopwatch {
    protected SleepingStopwatch() {
    }

    protected abstract long readMicros();

    protected abstract void sleepMicrosUninterruptibly(long var1);

    public static SleepingStopwatch createFromSystemTimer() {
        return new SleepingStopwatch() {
            final Stopwatch stopwatch = Stopwatch.createStarted();

            protected long readMicros() {
                return this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
            }

            protected void sleepMicrosUninterruptibly(long micros) {
                if (micros > 0L) {
                //Thread.currentThread().interrupt()是Java中用于中断当前线程的方法。当一个线程被中断时,它的中断状态将被设置为true。中断是一种协作机制,它允许一个线程通知另一个线程某个条件已经发生,从而允许接收中断的线程采取适当的行动。
                    Uninterruptibles.sleepUninterruptibly(micros, TimeUnit.MICROSECONDS);
                }

            }
        };
    }
}

源码解析:

  1. acquire的逻辑基本等于tryAcquire,区别只在于acquire少了一个提前返回的操作(见tryAcquire源码的第【5】步骤)
  2. 其余的方法源码的逻辑,是跟tryAcquire一样的,此处不再赘述

总结:acquire与tryAcquire

acquire()tryAcquire()RateLimiter 类中的两个主要方法,它们的区别如下:

  1. acquire() 方法:
    1. 当调用 acquire() 方法时,如果当前请求速率未超过限制,则请求会被立即处理,并且方法会立即返回。
    2. 如果当前请求速率超过了限制,acquire() 方法会使当前线程阻塞,直到有可用的请求配额为止。
    3. 这意味着,如果请求速率超过了限制,线程将等待直到可以继续执行。
  2. tryAcquire() 方法:
    1. 当调用 tryAcquire() 方法时,如果当前请求速率未超过限制,则请求会被立即处理,并且方法会立即返回 true
    2. 如果当前请求速率超过了限制,tryAcquire() 方法不会阻塞当前线程,而是立即返回 false
    3. 这意味着,如果请求速率超过了限制,线程将立即知道无法继续执行,并可以根据需要采取其他措施(例如重试、记录日志等)。

4、总结

总结一下,当使用客户端限流时,两种方法:acquire()tryAcquire() 的主要区别在于是否阻塞当前线程。acquire() 会阻塞线程直到有可用的请求配额,而 tryAcquire() 不会阻塞线程,而是立即返回结果。

在实际应用中,我们可以根据需求和场景选择合适的方法。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、背景
  • 2、客户端应用案例
  • 3、Jmeter测试效果
    • 3.1 非阻塞限流
      • 3.2 阻塞限流
        • 3.3 源码分析
          • 轻量级锁:(参考:JVM的知识点-对象头ObjectHeader,超链接)
          • (1)tryAcquire源码
          • (2)acquire源码
        • 总结:acquire与tryAcquire
        • 4、总结
        相关产品与服务
        消息队列 TDMQ
        消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档