前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基本限流算法与GuavaRateLimiter实现

基本限流算法与GuavaRateLimiter实现

作者头像
leobhao
发布2024-08-07 14:16:40
990
发布2024-08-07 14:16:40
举报
文章被收录于专栏:涓流

限流算法

固定窗口

实现思路:

  1. 在一个时间周期内每来一次请求就将计数器+1
  2. 如果计数器超过了限制数量, 则拒绝服务
  3. 时间达到下一个时间窗口, 计数器重置

优点:

  1. 实现简单:固定窗口算法的实现相对简单,易于理解和部署
  2. 使用稳定,有较好的适用性: 可以根据需要调整时间窗口和限流速率,以适应不同的系统负载要求

缺点:

  1. 请求分布不均匀:固定窗口算法中,窗口内的请求分布可能不均匀,导致某些窗口内的请求数量超过阈值,而其他窗口内的请求较少。
  2. 无法应对突发流量:固定窗口限流不能很好地处理突发流量。如果在窗口的开始时刻有大量请求涌入,系统可能会超过预期的负载。
  3. 临界问题:在窗口切换的瞬间,可能会有一段时间窗口内的请求量达到限流阈值,而新窗口刚开始时又允许大量请求进入,这可能导致系统负载波动。

总结: 固定窗口算法适用于对请求速率有明确要求且流量相对稳定的场景,但对于突发流量和请求分布不均匀的情况,可能需要考虑其他更灵活的限流算法

滑动窗口
  1. 将时间周期设置为滑动窗口大小
  2. 当有新的请求来临时将窗口滑动到改请求来临的时刻
  3. 判断窗口内的请求数是否超过了限制, 超过则拒绝服务, 否则请求通过
  4. 丢弃滑动窗口以外的请求

这里贴一个 go 的实现:

代码语言:txt
复制
type SlidingWindowLimiter struct {
	windowSize   time.Duration // 窗口大小
	maxRequests  int           // 最大请求数
	requests     []time.Time   // 窗口内的请求时间
	requestsLock sync.Mutex    // 请求锁
}

func NewSlidingWindowLimiter(windowSize time.Duration, maxRequests int) *SlidingWindowLimiter {
	return &SlidingWindowLimiter{
		windowSize:  windowSize,
		maxRequests: maxRequests,
		requests:    make([]time.Time, 0),
	}
}

func (limiter *SlidingWindowLimiter) AllowRequest() bool {
	limiter.requestsLock.Lock()
	defer limiter.requestsLock.Unlock()

	// 移除过期的请求
	currentTime := time.Now()
	for len(limiter.requests) > 0 && currentTime.Sub(limiter.requests[0]) > limiter.windowSize {
		limiter.requests = limiter.requests[1:]
	}

	// 检查请求数是否超过阈值
	if len(limiter.requests) >= limiter.maxRequests {
		return false
	}

	limiter.requests = append(limiter.requests, currentTime)
	return true
}

优点:

  1. 灵活性:滑动窗口算法可以根据实际情况动态调整窗口的大小,以适应流量的变化。这种灵活性使得算法能够更好地应对突发流量和请求分布不均匀的情况。
  2. 实时性:由于滑动窗口算法在每个时间窗口结束时都会进行窗口滑动,它能够更及时地响应流量的变化,提供更实时的限流效果。
  3. 精度:相比于固定窗口算法,滑动窗口算法的颗粒度更小,可以提供更精确的限流控制。

缺点:

  1. 内存消耗:滑动窗口算法需要维护一个窗口内的请求时间列表,随着时间的推移,列表的长度会增长。这可能会导致较大的内存消耗,特别是在窗口大小较大或请求频率较高的情况下。
  2. 算法复杂性:相比于简单的固定窗口算法,滑动窗口算法的实现较为复杂。它需要处理窗口滑动、请求计数和过期请求的移除等逻辑,可能需要更多的代码和计算开销。

总结: 滑动窗口算法实际上是颗粒度更小的固定窗口算法,它可以在一定程度上提高限流的精度和实时性,并不能从根本上解决请求分布不均匀的问题。算法受限于窗口的大小和时间间隔,特别是在极端情况下,如突发流量过大或请求分布极不均匀的情况下,仍然可能导致限流不准确, 如下图:

漏桶限流

滑动窗口在窗口大小和时间间隔不够精细的情况下, 仍然无法准确的应对突发流量, 漏桶可以视为滑动窗口的一个改进: 维护一个固定容量的漏桶,请求以不定的速率流入漏桶,而漏桶以固定的速率流出。如果请求到达时,漏桶已满,则会触发拒绝策略

  1. 漏桶容量:确定一个固定的漏桶容量,表示漏桶可以存储的最大请求数。
  2. 漏桶速率:确定一个固定的漏桶速率,表示漏桶每秒可以处理的请求数。
  3. 请求处理:当请求到达时,生产者将请求放入漏桶中。
  4. 漏桶流出:漏桶以固定的速率从漏桶中消费请求,并处理这些请求。如果漏桶中有请求,则处理一个请求;如果漏桶为空,则不处理请求。
  5. 请求丢弃或延迟:如果漏桶已满,即漏桶中的请求数达到了容量上限,新到达的请求将被丢弃或延迟处理。

总结: 漏桶算法控制流量流速绝对均匀, 适合流量比较平滑的场景(如数据库), 分布式的实现难度较滑动窗口来说复杂一些

go 实现:

代码语言:txt
复制
type LeakyBucket struct {
	rate       float64 // 漏桶速率,单位请求数/秒
	capacity   int     // 漏桶容量,最多可存储请求数
	water      int     // 当前水量,表示当前漏桶中的请求数
	lastLeakMs int64   // 上次漏水的时间戳,单位秒
}

func NewLeakyBucket(rate float64, capacity int) *LeakyBucket {
	return &LeakyBucket{
		rate:       rate,
		capacity:   capacity,
		water:      0,
		lastLeakMs: time.Now().Unix(),
	}
}

func (lb *LeakyBucket) Allow() bool {
	now := time.Now().Unix()
	elapsed := now - lb.lastLeakMs

	// 漏水,根据时间间隔计算漏掉的水量
	leakAmount := int(float64(elapsed) / 1000 * lb.rate)
	if leakAmount > 0 {
		if leakAmount > lb.water {
			lb.water = 0
		} else {
			lb.water -= leakAmount
		}
	}

	// 判断当前水量是否超过容量
	if lb.water > lb.capacity {
		lb.water-- // 如果超过容量,减去刚刚增加的水量
		return false
	}

	// 增加水量
	lb.water++

	lb.lastLeakMs = now
	return true
}

令牌桶限流
  1. 按照一定的速率生产令牌并放入令牌桶中
  2. 如果桶中令牌已满,则丢弃令牌
  3. 请求过来时先到桶中拿令牌,拿到令牌则放行通过,否则拒绝请求。
总结
  • 固定窗口计数算法简单易实现,其缺陷是可能在中间的某一秒内通过的请求数是限流阈值的两倍,该算法仅适用于对限流准确度要求不高的应用场景。
  • 滑动窗口计数算法解决了固定窗口计数算法的缺陷,但是该算法较难实现,因为要记录每次请求所以可能出现比较占用内存比较多的情况。
  • 漏桶算法可以做到均匀平滑的限制请求,Ngixn 热 limit_req 模块也是采用此种算法。因为匀速处理请求的缘故所以该算法应对限流阈值内的突发请求无法及时处理。
  • 令牌桶算法解决了以上三个算法的所有缺陷,是一种相对比较完美的限流算法,也是限流场景中应用最为广泛的算法。使用 Redis + Lua 脚本的方式可以简单的实现
Guava RateLimiter

guava RateLimiter 作为抽象类有个子类 SmoothRateLimiter, 这是个抽象类并且又两个实现类:SmoothWarmingUpSmoothBursty

RateLimiter只有两个属性:

代码语言:txt
复制
// 用于计时,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。
private final SleepingStopwatch stopwatch;
// 来做锁,RateLimiter 依赖于 synchronized 来控制并发
private volatile Object mutexDoNotUseDirectly;

SmoothRateLimiter

SmoothRateLimiter 作为抽象类继承于 RateLimiterSmoothRateLimiter的属性如下:

代码语言:txt
复制
// 当前还有多少 permits 没有被使用,被存下来的 permits 数量
double storedPermits;

// 最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
double maxPermits;

// 每隔多少时间产生一个 permit,
// 比如我们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000 个
double stableIntervalMicros;

// 下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间
private long nextFreeTicketMicros = 0L; 

nextFreeTicketMicros 是一个很关键的属性。每次获取 permits 的时候,先拿 storedPermits 的值,因为它是存货,如果够,storedPermits 减去相应的值就可以了,如果不够,那么还需要将 nextFreeTicketMicros 往前推,表示预占了接下来多少时间的量了。 那么下一个请求来的时候,如果还没到 nextFreeTicketMicros 这个时间点,需要 sleep 到这个点再返回,就要将这个值再往前推。

SmoothBursty

构造 SmoothBursty:

代码语言:txt
复制
public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    // 这里实例化就一个属性 maxBurstSeconds 为 1.0, 代表最多缓存 1s
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

setRate

继续看 setRate 分析

代码语言:txt
复制
public final void setRate(double permitsPerSecond) {
  // 检查表达式是否有效
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  // synchronized 控制并发
  synchronized (mutex()) {
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}


// doSetRate
final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 同步
    resync(nowMicros);
    // 计算属性 stableIntervalMicros
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

resync 用来更新 storedPermitsnextFreeTicketMicros, 避免长时间不调用 acquire 导致不准确:

代码语言:txt
复制
void resync(long nowMicros) {
  // nextFreeTicket 已经过过期了,比如很长时间没有再次调用 limiter.acquire() 
  // 需要将 nextFreeTicket 设置为当前时间,重新计算 storedPermits
  if (nowMicros > nextFreeTicketMicros) {
    // coolDownIntervalMicros 直接返回了 stableIntervalMicros, 也就是生产一个 permits 的时间长度
    // 计算一下, 这段时间产生了多少 permits
    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
    // 如果超过了 maxtPermits, 则使用 maxPermits
    storedPermits = min(maxPermits, storedPermits + newPermits);
    // 将 nextFreeTicketMicros 更新为现在
    nextFreeTicketMicros = nowMicros;
  }
}

设置好了stableIntervalMicrosstoredPermitsnextFreeTicketMicros, doSetRate 的实现:

代码语言:txt
复制
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  // 这里计算了新的 maxPermits 为 1 秒产生的 permits
  // 原来的值是初始化的, 现在要重新调整频率, 所以需要重新计算 maxPermits
  maxPermits = maxBurstSeconds * permitsPerSecond;
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    // maxPermits 来说,是重新计算,而对于 storedPermits 来说,是做等比例的缩放
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}

acquire
代码语言:txt
复制
public double acquire() {
  return acquire(1);
}

public double acquire(int permits) {
  // 预约,如果当前不能直接获取到 permits,需要等待
  // 返回值代表需要 sleep 多久
  long microsToWait = reserve(permits);
  // sleep
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  // 返回 sleep 的时长
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

reserve 预定 permits:

代码语言:txt
复制
final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
  // 返回 nextFreeTicketMicros
  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  // 计算时长
  return max(momentAvailable - nowMicros, 0);
}


final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  // 这里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)
  resync(nowMicros);
  // 返回值就是 nextFreeTicketMicros,注意刚刚已经做了 resync 了,此时它是最新的正确的值
  long returnValue = nextFreeTicketMicros;
  // storedPermits 中可以使用多少个 permits
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  // storedPermits 中不够的部分
  double freshPermits = requiredPermits - storedPermitsToSpend;
  // 为了这个不够的部分,需要等待多久时间
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
          + (long) (freshPermits * stableIntervalMicros);
  // 将 nextFreeTicketMicros 往前推
  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  // storedPermits 减去被拿走的部分
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}

从 reserve 的流程可看到,获取 permits 的时候,其实是获取了两部分,一部分来自于存量 storedPermits,存量不够的话,另一部分来自于预占未来的 freshPermits。

参考资料

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-7-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 限流算法
    • 固定窗口
      • 滑动窗口
        • 漏桶限流
          • 令牌桶限流
            • 总结
              • Guava RateLimiter
              • SmoothRateLimiter
              • SmoothBursty
                • setRate
                  • acquire
                  • 参考资料
                  相关产品与服务
                  云数据库 Redis
                  腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档