前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码分析Dubbo tps过滤器器实现原理

源码分析Dubbo tps过滤器器实现原理

作者头像
丁威
发布2019-06-10 17:17:37
6150
发布2019-06-10 17:17:37
举报
文章被收录于专栏:中间件兴趣圈中间件兴趣圈

微信公众号:[中间件兴趣圈] 作者简介:《RocketMQ技术内幕》作者

本文将重点分析一下dubbo限流的另外一个方式,tps过滤器。 @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)

  • 过滤器作用 服务调用tps过滤器
  • 使用场景 对Dubbo服务提供者实现限流(tps)。
  • 阻断条件 当服务调用者超过其TPS时,直接返回rpc exception。

接下来从源码的角度分析Tps过滤器的实现机制。

代码语言:javascript
复制
 1public class TpsLimitFilter implements Filter {
 2    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
 3    @Override
 4    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
 5        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {  
 6            throw new RpcException(
 7                    "Failed to invoke service "  +  invoker.getInterface().getName() +  "." + invocation.getMethodName() + " because exceed max service tps.");
 8        }
 9        return invoker.invoke(invocation);
10    }
11}

tps limit 生效的条件是,服务提供者的url中包含了tps=""这个属性,默认TPS统计时长为1分钟,表示如果在1分钟之内的调用次数超过配置的tps,则阻断本次RPC服务调用。

其TPS控制代码主要由DefaultTPSLimiter实现。

代码语言:javascript
复制
 1public class DefaultTPSLimiter implements TPSLimiter {
 2    private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();
 3    @Override
 4    public boolean isAllowable(URL url, Invocation invocation) {
 5        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);    // @1
 6        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
 7                Constants.DEFAULT_TPS_LIMIT_INTERVAL);                         
 8        String serviceKey = url.getServiceKey();                                            // @2
 9        if (rate > 0) {
10            StatItem statItem = stats.get(serviceKey);
11            if (statItem == null) {
12                stats.putIfAbsent(serviceKey,
13                        new StatItem(serviceKey, rate, interval));
14                statItem = stats.get(serviceKey);
15            }
16            return statItem.isAllowable();                                                     // @3
17        } else {
18            StatItem statItem = stats.get(serviceKey);
19            if (statItem != null) {
20                stats.remove(serviceKey);
21            }
22        }
23
24        return true;
25    }
26}

代码@1:获取服务提供者url中的参数tps、tps.interval属性。

代码@2:获取服务key,并创建或获取对应的StatItem。

代码@3:调用StatItem的isAllowable()方法来判断是否可用。

StatItem#isAllowable
代码语言:javascript
复制
 1public boolean isAllowable() {
 2        long now = System.currentTimeMillis();
 3        if (now > lastResetTime + interval) {    // @1
 4            token.set(rate);
 5            lastResetTime = now;
 6        }
 7
 8        int value = token.get();   
 9        boolean flag = false;
10        while (value > 0 && !flag) {     // @2
11            flag = token.compareAndSet(value, value - 1);  
12            value = token.get();
13        }
14        return flag;
15    }

该类的核心思想:是漏桶算法。 代码@1:如果当前时间大于(上一次刷新时间+统计间隔),重新复位token为rate,表示重新生成一批token。

代码@2:每使用一次,消耗一个token,如果能成功消耗一个token则返回true,如果没有可消耗的token,则直接返回false。

Tps过滤器的实现原理其实比较简单,大家可以从这里体会到ConcurrentHashMap、漏桶算法的简易实现。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-04-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 中间件兴趣圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • StatItem#isAllowable
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档