摘要: 原创出处 http://www.iocoder.cn/Eureka/rate-limiter/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Eureka 1.8.X 版本
本文主要分享 RateLimiter 的代码实现和 RateLimiter 在 Eureka 中的应用。
推荐 Spring Cloud 书籍:
推荐 Spring Cloud 视频:
com.netflix.discovery.util.RateLimiter
,基于Token Bucket Algorithm ( 令牌桶算法 )的速率限制器。
FROM 《接口限流实践》 令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
RateLimiter 目前支持分钟级和秒级两种速率限制。构造方法如下:
public class RateLimiter {
/**
* 速率单位转换成毫秒
*/
private final long rateToMsConversion;
public RateLimiter(TimeUnit averageRateUnit) {
switch (averageRateUnit) {
case SECONDS: // 秒级
rateToMsConversion = 1000;
break;
case MINUTES: // 分钟级
rateToMsConversion = 60 * 1000;
break;
default:
throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
}
}
}
averageRateUnit
参数,速率单位。构造方法里将 averageRateUnit
转换成 rateToMsConversion
。调用 #acquire(...)
方法,获取令牌,并返回是否获取成功
// RateLimiter.java
/**
* 获取令牌( Token )
*
* @param burstSize 令牌桶上限
* @param averageRate 令牌再装平均速率
* @return 是否获取成功
*/
public boolean acquire(int burstSize, long averageRate) {
return acquire(burstSize, averageRate, System.currentTimeMillis());
}
public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
return true;
}
// 填充 令牌
refillToken(burstSize, averageRate, currentTimeMillis);
// 消费 令牌
return consumeToken(burstSize);
}
burstSize
参数 :令牌桶上限。averageRate
参数 :令牌填充平均速率。averageRateUnit = SECONDS
averageRate = 2000
burstSize = 10
2000
个令牌。例如,每秒允许请求 2000
次。2000 / 1000 = 2
个消耗的令牌。10
个令牌。例如,每毫秒允许请求上限为 10
次,并且请求消耗掉的令牌,需要逐步填充。这里要注意下,虽然每毫秒允许请求上限为 10
次,这是在没有任何令牌被消耗的情况下,实际每秒允许请求依然是 2000
次。#acquire(...)
分成两部分,我们分别解析,整体如下图:
调用 #refillToken(...)
方法,填充已消耗的令牌。可能很多同学开始和我想的一样,一个后台每毫秒执行填充。为什么不适合这样呢?一方面,实际项目里每个接口都会有相应的 RateLimiter ,导致太多执行频率极高的后台任务;另一方面,获取令牌时才计算,多次令牌填充可以合并成一次,减少冗余和无效的计算。
代码如下:
1: /**
2: * 速率单位转换成毫秒
3: */
4: private final long rateToMsConversion;
5:
6: /**
7: * 消耗令牌数
8: */
9: private final AtomicInteger consumedTokens = new AtomicInteger();
10: /**
11: * 最后填充令牌的时间
12: */
13: private final AtomicLong lastRefillTime = new AtomicLong(0);
14:
15: private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
16: // 获得 最后填充令牌的时间
17: long refillTime = lastRefillTime.get();
18: // 获得 过去多少毫秒
19: long timeDelta = currentTimeMillis - refillTime;
20:
21: // 计算 可填充最大令牌数量
22: long newTokens = timeDelta * averageRate / rateToMsConversion;
23: if (newTokens > 0) {
24: // 计算 新的填充令牌的时间
25: long newRefillTime = refillTime == 0
26: ? currentTimeMillis
27: : refillTime + newTokens * rateToMsConversion / averageRate;
28: // CAS 保证有且仅有一个线程进入填充
29: if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
30: while (true) { // 死循环,直到成功
31: // 计算 填充令牌后的已消耗令牌数量
32: int currentLevel = consumedTokens.get();
33: int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
34: int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
35: // CAS 避免和正在消费令牌的线程冲突
36: if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
37: return;
38: }
39: }
40: }
41: }
42: }
refillTime
) 。每次填充令牌,会设置 currentTimeMillis
到 refillTime
。timeDelta
),用于计算需要填充的令牌数。newTokens
)。newTokens
可能超过 burstSize
,所以下面会有逻辑调整 newTokens
。averageRate = 500 && averageRateUnit = SECONDS
时, 每 2 毫秒才填充一个令牌,如果设置 currentTimeMillis
,会导致不足以填充一个令牌的时长被吞了。用 #refillToken(...)
方法,填充消耗( 获取 )的令牌。
代码如下 :
1: private boolean consumeToken(int burstSize) {
2: while (true) { // 死循环,直到没有令牌,或者获取令牌成功
3: // 没有令牌
4: int currentLevel = consumedTokens.get();
5: if (currentLevel >= burstSize) {
6: return false;
7: }
8: // CAS 避免和正在消费令牌或者填充令牌的线程冲突
9: if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
10: return true;
11: }
12: }
13: }
com.netflix.eureka.RateLimitingFilter
,Eureka-Server 限流过滤器。使用 RateLimiting ,保证 Eureka-Server 稳定性。
#doFilter(...)
方法,代码如下:
1: @Override
2: public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
3: // 获得 Target
4: Target target = getTarget(request);
5:
6: // Other Target ,不做限流
7: if (target == Target.Other) {
8: chain.doFilter(request, response);
9: return;
10: }
11:
12: HttpServletRequest httpRequest = (HttpServletRequest) request;
13: // 判断是否被限流
14: if (isRateLimited(httpRequest, target)) {
15: // TODO[0012]:监控相关,跳过
16: incrementStats(target);
17: // 如果开启限流,返回 503 状态码
18: if (serverConfig.isRateLimiterEnabled()) {
19: ((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
20: return;
21: }
22: }
23: chain.doFilter(request, response);
24: }
#getTarget()
方法,获取 Target。RateLimitingFilter 只对符合正在表达式 ^./apps(/[^/])?$
的接口做限流,其中不包含 Eureka-Server 集群批量同步接口。#getTarget(…)
方法代码。#isRateLimited(...)
方法,判断是否被限流。代码如下:
1: private boolean isRateLimited(HttpServletRequest request, Target target) { 2: // 判断是否特权应用 3: if (isPrivileged(request)) { 4: logger.debug("Privileged {} request", target); 5: return false; 6: } 7: // 判断是否被超载( 限流 ) 8: if (isOverloaded(target)) { 9: logger.debug("Overloaded {} request; discarding it", target); 10: return true; 11: } 12: logger.debug("{} request admitted", target); 13: return false; 14: }#isPrivileged()
方法,判断是否为特权应用,对特权应用不开启限流逻辑。代码如下:
private boolean isPrivileged(HttpServletRequest request) { // 是否对标准客户端开启限流 if (serverConfig.isRateLimiterThrottleStandardClients()) { return false; } // 以请求头( "DiscoveryIdentity-Name" ) 判断是否在标准客户端名集合内 Set<String> privilegedClients = serverConfig.getRateLimiterPrivilegedClients(); String clientName = request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY); return privilegedClients.contains(clientName) || DEFAULT_PRIVILEGED_CLIENTS.contains(clientName); }#isOverloaded(...)
方法,判断是否超载( 限流 )。代码如下:
/** * Includes both full and delta fetches. */ private static final RateLimiter registryFetchRateLimiter = new RateLimiter(TimeUnit.SECONDS); /** * Only full registry fetches. */ private static final RateLimiter registryFullFetchRateLimiter = new RateLimiter(TimeUnit.SECONDS); private boolean isOverloaded(Target target) { int maxInWindow = serverConfig.getRateLimiterBurstSize(); // 10 int fetchWindowSize = serverConfig.getRateLimiterRegistryFetchAverageRate(); // 500 boolean overloaded = !registryFetchRateLimiter.acquire(maxInWindow, fetchWindowSize); if (target == Target.FullFetch) { int fullFetchWindowSize = serverConfig.getRateLimiterFullFetchAverageRate(); // 100 overloaded |= !registryFullFetchRateLimiter.acquire(maxInWindow, fullFetchWindowSize); } return overloaded; }eureka.rateLimiter.enabled = true
( 默认值 :false
,可配 ),返回 503 状态码。com.netflix.discovery.InstanceInfoReplicator
,Eureka-Client 应用实例复制器。在 《Eureka 源码解析 —— 应用实例注册发现(一)之注册》「2.1 应用实例信息复制器」 有详细解析。
应用实例状态发生变化时,调用 #onDemandUpdate()
方法,向 Eureka-Server 发起注册,同步应用实例信息。InstanceInfoReplicator 使用 RateLimiter ,避免状态频繁发生变化,向 Eureka-Server 频繁同步。代码如下:
class InstanceInfoReplicator implements Runnable {
/**
* RateLimiter
*/
private final RateLimiter rateLimiter;
/**
* 令牌桶上限,默认:2
*/
private final int burstSize;
/**
* 令牌再装平均速率,默认:60 * 2 / 30 = 4
*/
private final int allowedRatePerMinute;
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
// ... 省略其他代码
this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;
this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
}
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
// 取消任务
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
// 再次调用
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
}
#onDemandUpdate()
方法,调用 RateLimiter#acquire(…)
方法,获取令牌。扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有