专栏首页BAT的乌托邦[享学Netflix] 二十一、Hystrix指标数据收集(预热):滑动窗口算法(附代码示例)

[享学Netflix] 二十一、Hystrix指标数据收集(预热):滑动窗口算法(附代码示例)

工作不养闲人,团队不养懒人。

代码下载地址:https://github.com/f641385712/netflix-learning

前言

Hystrix能够提供熔断、限流、断路器保护等等功能,而这些功能均基于数据采集。Netflix Hystrix通过类似滑动窗口的数据结构来统计命令执行过程中的各种指标数据,进而做出对应的响应。

滑动窗口算法(Sliding Window Algorithm)是常见的一种算法:它思想简洁且功能强大,可以用来解决一些查找满足一定条件的连续区间的性质/长度的问题。由于区间连续,因此当区间发生变化时,可以通过旧有的计算结果对搜索空间进行剪枝,这样便减少了重复计算,降低了时间复杂度,它还可以将嵌套的循环问题,转换为单循环问题,同样也是降低时间复杂度。


正文

限量的应用场景非常之广泛,无论是Http请求还是RPC请求,都能看到它的身影,它是稳定性建设的有效措施。限流后的处理方式也可以从多种角度去考虑,比如常见的有两种:

  • 超出限定流量之后会拒绝多余的访问
  • 超出限定流量之后,只是发出告警或者是记录日志,访问仍然正常进行

关于限流算法,一般常见的有下面四种:

  1. 固定窗口
  2. 滑动窗口
  3. 令牌桶算法(谷歌的开源guava有实现)
  4. 漏桶算法

很明显,本文讨论的议题是滑动窗口算法。当然在这之前为了辅助理解,需要介绍下固定窗口算法。


固定窗口

这是限流中最简单、最暴力的一种算法(一般粗暴的算法的共同点是:简单)。它的规则可描述如下:我们希望某个API在一分钟内只能固定被访问N次,那么我们就可以直接统计这一分钟开始对API的访问次数,如果访问次数超过了限定值,则抛弃后续的访问。直到下一分钟开始,再开放对API的访问。


代码示例

为了方便后面演示,为所有的限流算法写一个通用接口:

public interface RateLimiter {
	// 是否要限流
    boolean isOverLimit();
	// 当前QPS总数值(也就是窗口期内的访问总量)
    int currentQPS();
	// touch一下:增加一次访问量
    boolean visit();
}

固定窗口算法使用代码实现如下(仅供参考):

/**
 * 固定窗口算法限流器
 * 实现Runnable方法:是到了下一个时段(比如下一分钟时),重置初始值
 *
 * @author yourbatman
 * @date 2020/3/1 22:17
 */
public class FixedWindowRateLimiter implements RateLimiter, Runnable {

    // 每秒最多允许放5个请求
    private static final int DEFAULT_ALLOWED_VISIT_PER_SECOND = 5;
    private final int maxVisitPerSecond;

    // 当前已接收的总数
    private AtomicInteger count;

    public FixedWindowRateLimiter() {
        this(DEFAULT_ALLOWED_VISIT_PER_SECOND);
    }

    public FixedWindowRateLimiter(int maxVisitPerSecond) {
        this.maxVisitPerSecond = maxVisitPerSecond;
        this.count = new AtomicInteger();
    }

    @Override
    public boolean isOverLimit() {
        return currentQPS() >= maxVisitPerSecond;
    }

    @Override
    public int currentQPS() {
        return count.get();
    }

    // 访问一次,次数+1就完事 并且告知是否达到满格了
    @Override
    public boolean visit() {
        count.incrementAndGet();
        return isOverLimit();
    }

    // 到了下一个窗口期,重置总访问量即可,非常简单
    @Override
    public void run() {
        System.out.println("该窗口期的累计访问总量是:" + currentQPS() + ",进入下一个窗口期...");
        count.set(0);
    }
}

书写测试代码:

public static void main(String[] args) throws InterruptedException {
    FixedWindowRateLimiter rateLimiter = new FixedWindowRateLimiter();

    // 使用定时器:1s种表示一个窗口,固定的
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(rateLimiter, 1, 1, TimeUnit.SECONDS);

    // 此处我使用单线程访问,你可以改造成多线程版本
    while (true) {
        String currThreadName = Thread.currentThread().getName();
        boolean overLimit = rateLimiter.isOverLimit();
        if (overLimit) {
            System.out.printf("线程[%s]====被限流了====,因为访问次数已超过阈值[%s]\n", currThreadName, rateLimiter.currentQPS());
        } else {
            rateLimiter.visit();
            System.out.printf("线程[%s]访问成功,当前访问总数[%s]\n", currThreadName, rateLimiter.currentQPS());
        }

        Thread.sleep(100);
    }

}

运行后控制台输出:

线程[main]访问成功,当前访问总数[1]
线程[main]访问成功,当前访问总数[2]
线程[main]访问成功,当前访问总数[3]
线程[main]访问成功,当前访问总数[4]
线程[main]访问成功,当前访问总数[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
该窗口期的累计访问总量是:5,进入下一个窗口期...
线程[main]访问成功,当前访问总数[1]
线程[main]访问成功,当前访问总数[2]
线程[main]访问成功,当前访问总数[3]
线程[main]访问成功,当前访问总数[4]
线程[main]访问成功,当前访问总数[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
该窗口期的累计访问总量是:5,进入下一个窗口期...
...

达到了限流的目的。但是,但是,但是,它有一个非常致命的缺点:假设现在有一个恶意用户在上一分钟的最后一秒和下一分钟的第一秒疯狂的冲击你的API,按照固定窗口的限流规则,这些请求都能够访问成功不会被限制。但是这2s内的请求密集度极高,很有可能就把你的服务打垮了,这是不能接受的

固定窗口算法粗暴的好处就是实现简单,效率也高,但其实生产上几乎没有真实的使用的案例,而是更多的使用下面的改进版:滑动窗口算法。


滑动窗口

滑动窗口:滑动窗口将固定窗口再等分为多个小的窗口,每一次对一个更小的窗口进行流量控制。这种方法可以很好的解决之前的临界问题。

固定窗口就像滑动窗口的一个特例有木有

示意图如下:

对于上面例子情况:我们可以将1s划分为10个窗口,则每个窗口对应100ms。假设恶意用户还是在上一秒的最后一刻和下一秒的第一刻冲击服务,按照滑动窗口的原理,此时统计上一秒的最后100毫秒和下一秒的前100毫秒,这种滑动的方式依旧能够判判断出超出了阈值,从而触发限流机制,保护系统。


代码示例

同样的,使用滑动窗口实现限流器的代码这里也给出一份(仅供参考):

/**
 * 滑动窗口算法限流器
 * 实现Runnable方法:用于控制滑动动作,重置桶的值以及总量值
 * 它的精髓就是在滑动
 *
 * @author yourbatman
 * @date 2020/3/1 22:17
 */
public class SlidingWindowRateLimiter implements RateLimiter, Runnable {

    // 每秒钟最多允许5个请求,这是默认值  你也可以通过构造器指定
    private static final int DEFAULT_ALLOWED_VISIT_PER_SECOND = 5;
    private final long maxVisitPerSecond;

    // 默认把1s划分为10个桶,这是默认值
    private static final int DEFAULT_BUCKET = 10;
    private final int bucket;

    // 每个桶对应的当前的请求数。数组长度和bucket数量一样
    // 桶是固定的大小。但是桶里面的内容会不断变化:因为会滑动
    private final AtomicInteger[] countPerBucket;

    // 总请求数
    private AtomicInteger count;
    private volatile int index;

    // 构造器
    public SlidingWindowRateLimiter() {
        this(DEFAULT_BUCKET, DEFAULT_ALLOWED_VISIT_PER_SECOND);
    }

    public SlidingWindowRateLimiter(int bucket, long maxVisitPerSecond) {
        this.bucket = bucket;
        this.maxVisitPerSecond = maxVisitPerSecond;
        countPerBucket = new AtomicInteger[bucket];
        for (int i = 0; i < bucket; i++)
            countPerBucket[i] = new AtomicInteger();
        count = new AtomicInteger(0);
    }


    // 是否超过限制:当前QPS总数是否超过了最大值(默认每秒5个嘛)
    // 注意:这里应该是>=。因为其实如果桶内访问数量已经等于5了,就应该限制住外面的再进来
    @Override
    public boolean isOverLimit() {
        return currentQPS() >= maxVisitPerSecond;
    }

    @Override
    public int currentQPS() {
        return count.get();
    }

    // 访问一次,次数+1(只要请求进来了就+1),并且告知是否超载
    // 请注意:放在指定的桶哦
    @Override
    public boolean visit() {
        countPerBucket[index].incrementAndGet();
        count.incrementAndGet();
        return isOverLimit();
    }


    // =========模拟线程访问=========
    @Override
    public void run() {
        System.out.println("~~~~~~~~~~~~~~~~~窗口向后滑动一下~~~~~~~~~~~~~~~~~");
        // 桶内的指针向前滑动一下:表示后面的visite请求应该打到下一个桶内了
        index = (index + 1) % bucket;
        // 初始化新桶。并且拿出旧值
        int val = countPerBucket[index].getAndSet(0);
        // 这个步骤一定不要忘了:因为废弃了一个桶,所以总值要减去~~~~
        if (val == 0L) { // 这个桶等于0,说明这个时刻没有流量进来
            System.out.println("~~~~~~~~~~~~~~~~~窗口没能释放出流量,继续保持限流~~~~~~~~~~~~~~~~~");

        } else {
            count.addAndGet(-val);
            System.out.println("~~~~~~~~~~~~~~~~~窗口释放出了[" + val + "]个访问名额,你可以访问了喽~~~~~~~~~~~~~~~~~");
        }

    }
}

书写测试代码:

public static void main(String[] args) throws InterruptedException {
    SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter();
    // 使用一个线程滑动定时滑动这个窗口:100ms滑动一次(一般保持个桶的跨度保持一致)
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(rateLimiter, 100, 100, TimeUnit.MILLISECONDS);

    // 此处我使用单线程访问,你可以改造成多线程版本
    while (true) {
        String currThreadName = Thread.currentThread().getName();
        boolean overLimit = rateLimiter.isOverLimit();
        if (overLimit) {
            System.out.printf("线程[%s]====被限流了====,因为访问次数已超过阈值[%s]\n", currThreadName, rateLimiter.currentQPS());
        } else {
            rateLimiter.visit();
            System.out.printf("线程[%s]访问成功,当前访问总数[%s]\n", currThreadName, rateLimiter.currentQPS());
        }

        Thread.sleep(10);
    }
}

运行程序,控制台输出:

线程[main]访问成功,当前访问总数[1]
线程[main]访问成功,当前访问总数[2]
线程[main]访问成功,当前访问总数[3]
线程[main]访问成功,当前访问总数[4]
线程[main]访问成功,当前访问总数[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
~~~~~~~~~~~~~~~~~窗口向后滑动一下~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~窗口没能释放出流量,继续保持限流~~~~~~~~~~~~~~~~~
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
...
~~~~~~~~~~~~~~~~~窗口向后滑动一下~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~窗口释放出了[5]个访问名额,你可以访问了喽~~~~~~~~~~~~~~~~~
线程[main]访问成功,当前访问总数[1]
线程[main]访问成功,当前访问总数[2]
线程[main]访问成功,当前访问总数[3]
线程[main]访问成功,当前访问总数[4]
线程[main]访问成功,当前访问总数[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
线程[main]====被限流了====,因为访问次数已超过阈值[5]
~~~~~~~~~~~~~~~~~窗口向后滑动一下~~~~~~~~~~~~~~~~~
...

同样也达到了限流的目的。滑动窗口算法是个不错的算法,简单好理解且功能强大,Hystrix在限流、断路器计算上都是基于此算法来实现的。


算法题举例

滑动窗口算法它作为一个经典算法,这里也凑凑热闹来几道经典算法题玩玩,加深理解。

1、给定一组大小为n的整数数组,计算长度为k的子数组(必须连续)和的最大值。例如:数组[-1,4,7,-3,8,5,-2,6],k=2,那么最大值理应为:8+5 = 13

初级做法:遍历所有子数组,时间复杂度:O(n*k) 空间复杂度 O(n)

/**
 * 遍历所有子数组,求和并比较
 * 嵌套循环 时间复杂度:O(n*k)
 */
public static void calNormal(int[] array, int k) {
    if (array.length == 0 || k <= 0 || k > array.length) {// 非法参数不处理
        return;
    }

    int index = 0;// 记录最大子数组第1个元素的索引,目前是0
    int maxSum = 0;// 记录最大子数组和,目前是从左开始第1个子数组
    for (int i = 0; i < k; i++) {
        maxSum += array[i];
    }

    // 当前maxSum是第一个数组的值,下面将【所有的子数组】相加比较
    // 遍历所有子数组,求和并比较(因为第一个数组已经计算了,所以此处角标从1开始即可)
    for (int i = 1; i <= array.length - k; i++) {
        int curSum = 0;
        for (int j = 0; j < k; j++) {// 计算当前子数组和
            curSum += array[i + j];
        }

        // 如果大于最大和,则记录maxSum为当前值,且记录index为i
        if (curSum > maxSum) {
            maxSum = curSum;
            index = i;
        }
    }

    /**打印结果*/
    System.out.print(maxSum + " // ");// 打印最大和
    System.out.print(array[index]);// 先打印第1个值
    for (int i = 1; i < k; i++) {
        int value = array[i + index];
        System.out.print(value >= 0 ? ("+" + value) : value);// 非负数前面打印+号
    }
    System.out.println();
}

高级做法:滑动窗口。时间复杂度:O(n) 空间复杂度 O(n)

/**
 * 窗口向右滑动,通过减失效值加最新值求和并比较
 * 单层循环 时间复杂度:O(n)
 */
public static void calBySlidingWindow(int[] array, int k) {
    if (array.length == 0 || k <= 0 || k > array.length) {// 非法参数不处理
        return;
    }

    // 同上
    int index = 0;
    int maxSum = 0;
    for (int i = 0; i < k; i++) {
        maxSum += array[i];
    }

    int curWindowSum = maxSum;
    // 从下个元素开始,即窗口向右滑动
    for (int i = 1; i <= array.length - k; i++) {
        // 减去失效值,加上最新值(窗口内元素固定嘛~这就是限流的思想)
        curWindowSum = curWindowSum - array[i - 1] + array[k + i - 1];
        if (curWindowSum > maxSum) {// 如果大于最大和,则记录
            maxSum = curWindowSum;
            index = i;
        }
    }

    /**打印结果*/
    System.out.print(maxSum + " // ");// 打印最大和
    System.out.print(array[index]);// 先打印第1个值
    for (int i = 1; i < k; i++) {
        int value = array[i + index];
        System.out.print(value >= 0 ? ("+" + value) : value);// 非负数前面打印+号
    }
    System.out.println();
}

测试代码:

public static void main(String[] args) {
    calNormal(new int[]{-1, 4, 7, -3, 8, 5, -2, 6}, 2);
    System.out.println("-------------");
    calBySlidingWindow(new int[]{-1, 4, 7, -3, 8, 5, -2, 6}, 2);
}

控制台打印:

13 // 8+5
-------------
13 // 8+5

运用滑动窗口思路,遍历时不嵌套循环计算所有值;外层遍历相当于窗口向右滑动,每次减去失效值加上最新值,即为当前窗口的和,然后再比较。性能远高于普通方式,数组长度越长优势越明显。


2、问题描述:给定一个字符串,找出不含有重复字符的最长子串的长度。 初级做法:遍历所有子数组

/**
 * 普通方式:把【所有的子串】一个一个的尝试
 */
public int subStringLengthByNormal(String str) {
    int resLength = 0;
    int strLength = str.length();

    // 两个for循环 i头 j尾 能确定出所有的子串
    for (int i = 0; i < strLength; i++) {
        for (int j = i + 1; j < strLength; j++) {
            Set<String> hashSet = new HashSet<>();

            boolean isExists = false;
            // 遍历这个子串的内容,看看有木有重复的字母
            // 有的话立马break 进入到下一个子串中
            for (int z = i; z < j; z++) {
                String strChildren = str.substring(z, z + 1);
                if (hashSet.contains(strChildren)) {
                    isExists = true;
                    break;
                } else {
                    hashSet.add(strChildren);
                }
            }

            // 若最终没有重复的,那就看看这个子串的长度和resLength进行对比喽
            // 取最大值
            if (!isExists) {
                //这里是不存在相同的才给resLength赋值
                resLength = Math.max(resLength, j - i);
            }

        }
    }
    return resLength;
}

高级做法:滑动窗口

设计思路:它本质其实是个队列的问题,如字符串是“pwwkew”,刚进入这个队列满足条件的是“pw”,如果再进到下一步“w”,这时候的队列就变成了“pww”,显然已经不满足条件了,这时候就要移除队列左边的数,一直移除直到满足条件(无重复)为止:“pww”=>“pw”=>“w”。继续之前再进入队列“wke”=>“wkew”=>“kew”。一直维持这样的队列,找出队列出现最长的长度。

public int lengthOfLongestSubstring(String s) {
    if (s == null) return 0;
    if (s.length() == 1) return 1;

    int res = 0, l = 0, r = 0;
    int length = s.length();
    Set<Character> set = new HashSet<>();

    while (l < length && r < length) {
        if (!set.contains(s.charAt(r))) {
            set.add(s.charAt(r));
            r++;
            res = Math.max(res, r - l);
        } else {
            set.remove(s.charAt(l));
            l++;
        }
    }
    return res;
}

测试代码:

@Test
public void fun1() {

    String s = "abcabcbb"; // 结果:abc
    System.out.println(subStringLengthByNormal(s)); // 3
    System.out.println(lengthOfLongestSubstring(s)); // 3

    System.out.println("-------------------------");

    s = "bbbbb"; // 结果:b
    System.out.println(subStringLengthByNormal(s)); // 1
    System.out.println(lengthOfLongestSubstring(s)); // 1

    System.out.println("-------------------------");

    s = "pwwkew"; // 结果:wke
    System.out.println(subStringLengthByNormal(s)); // 3
    System.out.println(lengthOfLongestSubstring(s)); // 3
}

控制台输出:

3
3
-------------------------
1
1
-------------------------
3
3

总结

滑动窗口法可以用来解决一些查找满足一定条件连续区间的性质(长度等)问题,个人认为可以看做是一种双指针方法的特例,两个指针都起始于原点,并一前一后向终点前进(只能前进,不能后退)。还有一种双指针方法,其两个指针一始一终,并相向靠近,这种方法的内在思想和滑动窗口也非常类似~

Hystrix指标数据收集使用的便是滑动窗口算法:固定窗口大小(类似于第一个示例),所以了解本文后再观其源码会轻松不少。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 6. 二十不惑,ObjectMapper使用也不再迷惑

    各位好,我是YourBatman。从本文起,终于要和Jackson的“高级”部分打交道了,也就是数据绑定jackson-databind模块。通过接触它的高级A...

    YourBatman
  • [享学Netflix] 四十一、Ribbon核心API源码解析:ribbon-core(四)ClientException客户端异常

    代码下载地址:https://github.com/f641385712/netflix-learning

    YourBatman
  • 【小家java】Java中的线程池,你真的用对了吗?(教你用正确的姿势使用线程池,Executors使用中的坑)

    在【小家java】用 ThreadPoolExecutor/ThreadPoolTaskExecutor 线程池技术提高系统吞吐量(附带线程池参数详解和使用注意...

    YourBatman
  • 浏览器事件

    常用浏览器事件与DOM事件,包括鼠标事件、键盘事件、框架/对象事件、表单事件、剪贴板事件、打印事件、拖动事件、多媒体事件、动画事件、过渡事件。

    WindrunnerMax
  • 使用numpy解决图像维度变换问题

    在机器学习中经常会碰到各种图像数据集,有的是按照num*height*width*channel来存储的,而有的则是num*channel*height*wid...

    marsggbo
  • ApkCrack:强大且免费的APK编辑工具

    美丽应用
  • Windows Server 2008群集仲裁机制

    Quorum Disk,又名仲裁磁盘,其目的是在协调集群节点间的故障转移(Failover)Windows Server 2003年代的集群中,对于单一的仲裁设...

    张善友
  • Linux命令(32)——grep命令

    grep(Globally search a Regular Expression and Print)是GNU开发的一款免费开源的文本搜索工具。grep家族包...

    Dabelv
  • @Aspect注解

    在这篇中我们将通过@Aspect注解来创建一个切面,以此来演示@Aspect注解的基本使用。

    吉林乌拉
  • console接口是干什么用的?

    1. 交换机的console 接口:它是用来配置交换机的,所以只有网管型交换机才有。而且还要注意,并不是所有网管型交换机都有,那是因为交换机的配置方法有多种,如...

    it妹

扫码关注云+社区

领取腾讯云代金券