Spring Cloud Gateway 结合配置中心限流

前言

上篇文章《Spring Cloud Gateway 限流操作》我讲过复杂的限流场景可以通过扩展RedisRateLimiter来实现自己的限流策略。

假设你领导给你安排了一个任务,具体需求如下:

  • 针对具体的接口做限流
  • 不同接口限流的力度可以不同
  • 可以动态调整限流配置,实时生效

如果你接到上面的任务,你会怎么去设计+实现呢?

每个人看待问题的角度不同,自然思考出来的方案也不同,正所谓条条大路通罗马,能到达亩的地的路那就是一条好路。

如何分析需求

下面我给出我的实现方式,仅供各位参考,大牛请忽略。

具体问题具体分析,针对需求点,分别去做分析。

需求一 “如何针对具体的接口做限流” 这个在上篇文章中也有讲过,只需要让KeyResolver返回的是接口的URI即可,这样限流的维度那就是对这个接口进行限流。

需求二 “不同接口限流的力度可以不同” 这个通过配置的方式明显实现不了,配置中的replenishRate和burstCapacity都是配置死的,如果要做成动态的那么必须的自己通过扩展RedisRateLimiter来实现。

前提是必须有一个配置列表,这个配置列表就是每个接口对应的限流数值。有了这个配置我们就可以通过请求的接口获取这个接口对应的限流值。

需求三“可以动态调整限流配置,实时生效” 这个的话也比较容易,无论你是存文件,存数据库,存缓存只要每次都去读取,必然是实时生效的,但是性能问题我们不得不考虑啊。

存文件,读取文件,耗IO,主要是不方便修改 存数据库,可以通过web界面去修改,也可以直接改数据库,每次都要查询,性能不行 存分布式缓存(redis),性能比数据库有提高

对比下来肯定是缓存是最优的方案,还有更好的方案吗? 有,结合配置中心来做,我这边用自己的配置中心(https://github.com/yinjihuan/smconf)来讲解,换成其他的配置中心也是一样的思路。

配置中心的优点在于它本来就是用来存储配置的,配置在项目启动时加载完毕,当有修改时推送更新,每次读取都在本地对象中,性能好。

具体方案有了之后我们就可以开始撸代码了,但是你有想过这么多接口的限流值怎么初始化吗?手动一个个去加?

不同的服务维护的小组不同,当然也有可能是一个小组维护,从设计者的角度来思考,应该把设置的权利交给用户,交给我们的接口开发者,每个接口能够承受多少并发让用户来定,你的职责就是在网关进行限流。当然在公司中具体的限制量也不一定会由开发人员来定哈,这个得根据压测结果,做最好的调整。

话不多说-开始撸码

首先我们定义自己的RedisRateLimiter,复制源码稍微改造下即可, 这边只贴核心代码。

public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config>
        implements ApplicationContextAware {

    public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";
    public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
    public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
    public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
    public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";

    public CustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,
            Validator validator) {
        super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);
        this.redisTemplate = redisTemplate;
        this.script = script;
        initialized.compareAndSet(false, true);
    }

    public CustomRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) {
        super(Config.class, CONFIGURATION_PROPERTY_NAME, null);
        this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);
    }

    // 限流配置
    private RateLimitConf rateLimitConf;

    @Override
    @SuppressWarnings("unchecked")
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        // 加载配置
        this.rateLimitConf = context.getBean(RateLimitConf.class);  
    }


    /**
     * This uses a basic token bucket algorithm and relies on the fact that
     * Redis scripts execute atomically. No other operations can run between
     * fetching the count and writing the new count.
     */
    @Override
    @SuppressWarnings("unchecked")
    public Mono<Response> isAllowed(String routeId, String id) {
        if (!this.initialized.get()) {
            throw new IllegalStateException("RedisRateLimiter is not initialized");
        }

        //Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);

        if (rateLimitConf == null) {
            throw new IllegalArgumentException("No Configuration found for route " + routeId);
        }
        Map<String,Integer> routeConfig = rateLimitConf.getLimitMap();

        // Key的格式:服务名称.接口URI.类型
        String replenishRateKey = routeId + "." + id + ".replenishRate";
        int replenishRate = routeConfig.get(replenishRateKey) == null ? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey);

        String burstCapacityKey = routeId + "." + id + ".burstCapacity";
        int burstCapacity = routeConfig.get(burstCapacityKey) == null ? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);

        try {
            List<String> keys = getKeys(id);

            // The arguments to the LUA script. time() returns unixtime in
            // seconds.
            List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
                    Instant.now().getEpochSecond() + "", "1");
            // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
            Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
            // .log("redisratelimiter", Level.FINER);
            return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
                    .reduce(new ArrayList<Long>(), (longs, l) -> {
                        longs.addAll(l);
                        return longs;
                    }).map(results -> {
                        boolean allowed = results.get(0) == 1L;
                        Long tokensLeft = results.get(1);

                        Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));

                        if (log.isDebugEnabled()) {
                            log.debug("response: " + response);
                        }
                        return response;
                    });
        } catch (Exception e) {
            /*
             * We don't want a hard dependency on Redis to allow traffic. Make
             * sure to set an alert so you know if this is happening too much.
             * Stripe's observed failure rate is 0.01%.
             */
            log.error("Error determining if user allowed from redis", e);
        }
        return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
    }

    public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
        HashMap<String, String> headers = new HashMap<>();
        headers.put(this.remainingHeader, tokensLeft.toString());
        headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
        headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
        return headers;
    }

}

需要在setApplicationContext中加载我们的配置类,配置类的定义如下:

@CxytianDiConf(system="fangjia-gateway")
public class RateLimitConf {
    // 限流配置
    @ConfField(value = "limitMap")
    private Map<String, Integer> limitMap = new HashMap<String, Integer>(){{
        put("default.replenishRate", 100);
        put("default.burstCapacity", 1000);
    }};
    public void setLimitMap(Map<String, Integer> limitMap) {
        this.limitMap = limitMap;
    }
    public Map<String, Integer> getLimitMap() {
        return limitMap;
    }
}

所有的接口对应的限流信息都在map中,有默认值,如果没有对应的配置就用默认的值对接口进行限流。

isAllowed方法中通过‘服务名称.接口URI.类型’组成一个Key, 通过这个Key去Map中获取对应的值。

类型的作用主要是用来区分replenishRate和burstCapacity两个值。

接下来就是配置CustomRedisRateLimiter:

@Bean
@Primary
public CustomRedisRateLimiter customRedisRateLimiter(
                  ReactiveRedisTemplate<String, String> redisTemplate,     
                  @Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME)  RedisScript<List<Long>> redisScript,
                  Validator validator) {
    return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
}

网关这边的逻辑已经实现好了,接下来就是需要在具体的服务中自定义注解,然后将限流的参数初始化到我们的配置中心就可以了。

定义注解

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ApiRateLimit {

    /**
     * 速率
     * @return
     */
    int replenishRate() default 100;

    /**
     * 容积
     * @return
     */
    int burstCapacity() default 1000;

}

启动监听器,读取注解,初始化配置

/**
 * 初始化API网关需要进行并发限制的API
 * @author yinjihuan
 *
 */
public class InitGatewayApiLimitRateListener implements ApplicationListener<ApplicationReadyEvent> {

    // Controller包路径
    private String controllerPath;

    private RateLimitConf rateLimitConf;

    private ConfInit confInit;

    private String applicationName;

    public InitGatewayApiLimitRateListener(String controllerPath) {
        this.controllerPath = controllerPath;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class);
        this.confInit = event.getApplicationContext().getBean(ConfInit.class);
        this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");
        try {
            initLimitRateAPI();
        } catch (Exception e) {
            throw new RuntimeException("初始化需要进行并发限制的API异常", e);
        }
    }

    /**
     * 初始化需要进行并发限制的API
     * @throws IOException
     * @throws ClassNotFoundException
     */
    private void initLimitRateAPI() throws IOException, ClassNotFoundException {
        Map<String, Integer> limitMap = rateLimitConf.getLimitMap();
        ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);
        List<String> classList = scan.getFullyQualifiedClassNameList();
        for (String clazz : classList) {
            Class<?> clz = Class.forName(clazz);
            if (!clz.isAnnotationPresent(RestController.class)) {
                continue;
            }
            Method[] methods = clz.getDeclaredMethods();
            for (Method method : methods) {
                if (method.isAnnotationPresent(ApiRateLimit.class)) {
                    ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);
                    String replenishRateKey = applicationName + "." + getApiUri(clz, method) + ".replenishRate";
                    String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";
                    limitMap.put(replenishRateKey, apiRateLimit.replenishRate());
                    limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());
                }
            }
        }
        rateLimitConf.setLimitMap(limitMap);
        // 初始化值到配置中心
        confInit.init(rateLimitConf);
    }

     private String getApiUri(Class<?> clz, Method method) {
            StringBuilder uri = new StringBuilder();
            uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);
            if (method.isAnnotationPresent(GetMapping.class)) {
                uri.append(method.getAnnotation(GetMapping.class).value()[0]);
            } else if (method.isAnnotationPresent(PostMapping.class)) {
                uri.append(method.getAnnotation(PostMapping.class).value()[0]);
            } else if (method.isAnnotationPresent(RequestMapping.class)) {
                uri.append(method.getAnnotation(RequestMapping.class).value()[0]);
            }
            return uri.toString();
     }
}

配置监听器

SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);
application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));
context = application.run(args);

最后使用就很简单了,只需要增加注解就可以了

@ApiRateLimit(replenishRate=10, burstCapacity=100)
@GetMapping("/data")
public HouseInfo getData(@RequestParam("name") String name) {
    return new HouseInfo(1L, "上海", "虹口", "东体小区");
}

我这边只是给大家提供一种去实现的思路,也许大家还有更好的方案。

我觉得只要不让每个开发都去关心这种非业务性质的功能,那就可以了,都在框架层面处理掉。当然实现原理可以跟大家分享下,会用很好,既会用又了解原理那就更好了。

原文发布于微信公众号 - 猿天地(cxytiandi)

原文发表时间:2018-08-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏DT乱“码”

JSP+ajax+springMVC+MayBatis处理excel上传导入

jsp <div class="subtext1"> <div cla...

2819
来自专栏一个会写诗的程序员的博客

6.3 Spring Boot集成mongodb开发小结

本章我们通过SpringBoot集成mongodb,Java,Kotlin开发一个极简社区文章博客系统。

1823
来自专栏IT笔记

SpringBoot开发案例之整合mongoDB

JDK1.7、Maven、Eclipse、SpringBoot1.5.2、mongodb3.4,Robomongo(可视化工具)

6626
来自专栏IT笔记

SpringBoot开发案例之整合mongoDB

? mongodb.jpg 开始前,建议大家去了解以下文章,当然不看也没问题: MongoDB从入门到“精通”之简介和如何安装 MongoDB从入门到“精通”...

5588
来自专栏Jerry的SAP技术分享

ABAP和Java的destination和JNDI

3573
来自专栏ACM小冰成长之路

HDU-5559-Frog and String

ACM模版 描述 ? 题解 丧心病狂的构造题!!! ? Ps. 截图来自 JeraKrs’s blog。 代码 #include <cstdio> #inclu...

1825
来自专栏Android中高级开发

Android开发之漫漫长途 XI——从I到X的小结

该文章是一个系列文章,是本人在Android开发的漫漫长途上的一点感想和记录,我会尽量按照先易后难的顺序进行编写该系列。该系列引用了《Android开发艺术探索...

1172
来自专栏Ryan Miao

Date, TimeZone, MongoDB, java中date的时区问题

打印new Date(),Fri Aug 12 13:37:51 CST 2016. 显示Asia/Shanghai的时区,但是date toString 的时...

5608
来自专栏Brian

AWK 深入浅出教程

---- 概述 awk是一门解释性文本处理语言,它在文本处理领域中非常强大和方便。awk有三个主要的类型是: AWK - 原先来源于 AT & T 实验室的的A...

5056
来自专栏后端沉思录

mybatis拦截器分表

mybatis提供了拦截器插件用来处理被拦截的方法的某些逻辑.下面会通过创建8张表,当用户注册时,根据对手机号取余入不同的表.

6113

扫码关注云+社区

领取腾讯云代金券