专栏首页SpringCloud专栏Java简单实现滑动窗口

Java简单实现滑动窗口

由于最近有一个统计单位时间内某key的访问次数的需求,譬如每5秒访问了redis的某key超过100次,就取出该key单独处理。

这样的单位时间统计,很明显我们都知道有个边界问题,譬如5秒内100次的限制。刚好前4.99秒访问都是0,最后0.01秒来了100次,5.01秒又来了100次。也就是访问有明显的毛刺情况出现,为了弱化这个毛刺情况,我们可以采用滑动窗口。

滑动窗口

滑动窗口的主要原理比较简单,就是将这个单位时间进行拆分,譬如5秒的统计范围,我们将它划分成5个1秒。

当请求进来时,先判断当前请求属于这5个1秒的时间片中的哪个,然后将对应的时间片对应的统计值加1,再判断当前加上前4个时间片的次数总和是否已经超过了设置的阈值。

当时间已经到达第6个时间片时,就把第一个时间片给干掉,因为无论第一片是多少个统计值,它都不会再参与后续的计算了。

就这样,随着时间的推移,统计值就随着各个时间片的滚动,不断地进行统计。

具体要将单位时间拆分为多少片,要根据实际情况来决定。当然,毫无疑问的是切分的越小,毛刺现象也越少。系统统计也越准确,随之就是内存占用会越大,因为你的这个窗口的数组会更大。

代码实现思路就是定义好分片数量,每个分片都有一个独立的计数器,所有的分片合计为一个数组。当请求来时,按照分片规则,判断请求应该划分到哪个分片中去。要判断是否超过阈值,就将前N个统计值相加,对比定义的阈值即可。

代码我直接引用别人写好的了,源代码在https://www.iteye.com/blog/go12345-1744728

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 滑动窗口。该窗口同样的key,都是单线程计算。
 *
 * @author wuweifeng wrote on 2019-12-04.
 */
public class SlidingWindow {
    /**
     * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
     */
    private AtomicInteger[] timeSlices;
    /**
     * 队列的总长度
     */
    private int timeSliceSize;
    /**
     * 每个时间片的时长,以毫秒为单位
     */
    private int timeMillisPerSlice;
    /**
     * 共有多少个时间片(即窗口长度)
     */
    private int windowSize;
    /**
     * 在一个完整窗口期内允许通过的最大阈值
     */
    private int threshold;
    /**
     * 该滑窗的起始创建时间,也就是第一个数据
     */
    private long beginTimestamp;
    /**
     * 最后一个数据的时间戳
     */
    private long lastAddTimestamp;

    public static void main(String[] args) {
        //1秒一个时间片,窗口共5个
        SlidingWindow window = new SlidingWindow(100, 4, 8);
        for (int i = 0; i < 100; i++) {
            System.out.println(window.addCount(2));

            window.print();
            System.out.println("--------------------------");
            try {
                Thread.sleep(102);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public SlidingWindow(int duration, int threshold) {
        //超过10分钟的按10分钟
        if (duration > 600) {
            duration = 600;
        }
        //要求5秒内探测出来的,
        if (duration <= 5) {
            this.windowSize = 5;
            this.timeMillisPerSlice = duration * 200;
        } else {
            this.windowSize = 10;
            this.timeMillisPerSlice = duration * 100;
        }
        this.threshold = threshold;
        // 保证存储在至少两个window
        this.timeSliceSize = windowSize * 2;

        reset();
    }

    public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
        this.timeMillisPerSlice = timeMillisPerSlice;
        this.windowSize = windowSize;
        this.threshold = threshold;
        // 保证存储在至少两个window
        this.timeSliceSize = windowSize * 2;

        reset();
    }

    /**
     * 初始化
     */
    private void reset() {
        beginTimestamp = SystemClock.now();
        //窗口个数
        AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
        for (int i = 0; i < timeSliceSize; i++) {
            localTimeSlices[i] = new AtomicInteger(0);
        }
        timeSlices = localTimeSlices;
    }

    private void print() {
        for (AtomicInteger integer : timeSlices) {
            System.out.print(integer + "-");
        }
    }

    /**
     * 计算当前所在的时间片的位置
     */
    private int locationIndex() {
        long now = SystemClock.now();
        //如果当前的key已经超出一整个时间片了,那么就直接初始化就行了,不用去计算了
        if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
            reset();
        }

        return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
    }

    /**
     * 增加count个数量
     */
    public boolean addCount(int count) {
        //当前自己所在的位置,是哪个小时间窗
        int index = locationIndex();
//        System.out.println("index:" + index);
        //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
        //譬如1秒分4个窗口,那么数组共计8个窗口
        //当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和
        clearFromIndex(index);

        int sum = 0;
        // 在当前时间片里继续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时间片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }
        System.out.println(sum + "---" + threshold);

        lastAddTimestamp = SystemClock.now();

        return sum >= threshold;
    }

    private void clearFromIndex(int index) {
        for (int i = 1; i <= windowSize; i++) {
            int j = index + i;
            if (j >= windowSize * 2) {
                j -= windowSize * 2;
            }
            timeSlices[j].set(0);
        }
    }

}
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 用于解决高并发下System.currentTimeMillis卡顿
 * @author lry
 */
public class SystemClock {

    private final int period;

    private final AtomicLong now;

    private static class InstanceHolder {
        private static final SystemClock INSTANCE = new SystemClock(1);
    }

    private SystemClock(int period) {
        this.period = period;
        this.now = new AtomicLong(System.currentTimeMillis());
        scheduleClockUpdating();
    }

    private static SystemClock instance() {
        return InstanceHolder.INSTANCE;
    }

    private void scheduleClockUpdating() {
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "System Clock");
                thread.setDaemon(true);
                return thread;
            }
        });
        scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
    }

    private long currentTimeMillis() {
        return now.get();
    }

    /**
     * 用来替换原来的System.currentTimeMillis()
     */
    public static long now() {
        return instance().currentTimeMillis();
    }
}

参照代码main方法,通过修改每个时间片的时间,窗口数量,阈值,来进行测试。

这就是简单实现了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 回溯算法解八皇后问题(java版)

        八皇后问题是学习回溯算法时不得不提的一个问题,用回溯算法解决该问题逻辑比较简单。

    天涯泪小武
  • SpringCloud重试机制配置

    SpringCloud重试retry是一个很赞的功能,能够有效的处理单点故障的问题。主要功能是当请求一个服务的某个实例时,譬如你的User服务启动了2个,它们都...

    天涯泪小武
  • 回溯算法解迷宫问题(java版)

        以一个M×N的长方阵表示迷宫,0和1分别表示迷宫中的通路和障碍。设计程序,对任意设定的迷宫,求出从入口到出口的所有通路。

    天涯泪小武
  • 【POJ 1679】The Unique MST(次小生成树)

    找出最小生成树,同时用Max[i][j]记录i到j的唯一路径上最大边权。然后用不在最小生成树里的边i-j来替换,看看是否差值为0。

    饶文津
  • 【手绘漫画】图解LeetCode之最长上升子序列(LeetCode300题),贪心算法 + 二分查找

    今天是第五期,争取每天一期,最多两天一期,欢迎大家监督我。。。公众号监督最好!!!

    我是管小亮
  • Go远超Python,机器学习人才极度稀缺,全球16,655位程序员告诉你这些真相

    几年前,滴滴的工程师曾告诉我,在滴滴 Go 已经得到了非常广泛的应用,去年,在腾讯内部使用的编程语言排行榜中,Go 已经排到了第 3 的位置。而今天,由专业开发...

    AI科技大本营
  • 小程序 – 笔记

    code_horse
  • SSM框架——实现分页和搜索分页

    分页是Java Web项目常用的功能,昨天在Spring MVC中实现了简单的分页操作和搜索分页,在此记录一下。使用的框架为(MyBatis+SpringMVC...

    Java团长
  • 图的遍历(下)——邻接表

    在我的上一篇博客:图的遍历(上)——邻接矩阵 中主要介绍了邻接矩阵的BFS和递归的DFS与非递归的DFS这3种遍历算法。在这篇博客我将主要叙述邻接表的以上3中遍...

    AI那点小事
  • 半小时入门Thrift

          当一个单体软件产品体量达到一定程序,都会想到拆分为不同的模块(当今这么流行微服务)。拆分后一定会存在进程之间的交互(简称:PRC),那么thrift...

    sam dragon

扫码关注云+社区

领取腾讯云代金券