前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >​什么是限流,如何限流

​什么是限流,如何限流

作者头像
王小明_HIT
发布2021-05-20 10:40:45
3.2K0
发布2021-05-20 10:40:45
举报
文章被收录于专栏:程序员奇点程序员奇点

什么是限流

限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统 的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

比如场景:

某天小明突然发现自己的接口请求突然之间涨到了原来的10倍,接口几乎不能使用,产生了一系列连锁反应,导致了整个系统崩溃。这就好比,老电闸中都安装了保险丝,一旦使用大功率设备,保险丝就会熔断,保证各个电器不被强电流烧坏,系统也同样安装保险丝,防止非预期请求过大,引起系统瘫痪。

限流方法

常用的限流算法有:计数法,滑动窗口计数法,漏桶算法和令牌桶算法。

漏桶算法思路

水(请求)进入到漏桶里,漏桶以一定的速度流出,当水流的速度过大会直接溢出, 漏桶是强行限制了数据的传输速率。

令牌桶算法

除了要能够限制数据的平均传输速率外,还需要允许某种程度的突发请求,令牌桶更为合适。

令牌桶的思路是以一个恒定的速率往桶里放令牌,如果请求需要被处理,则需要从桶里取出一个令牌,如果没有令牌可取,那么就拒绝服务。

Google开源工具包Guava提供了限流工具类RateLimiter是基于令牌桶算法来实现的。

代码语言:javascript
复制
public double acquire() {
        return acquire(1);
    }

 public double acquire(int permits) {
        checkPermits(permits);  //检查参数是否合法(是否大于0)
        long microsToWait;
        synchronized (mutex) { //应对并发情况需要同步
            microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得需要等待的时间 
        }
        ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0
        return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
    }

private long reserveNextTicket(double requiredPermits, long nowMicros) {
        resync(nowMicros); //补充令牌
        long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
        double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取这次请求消耗的令牌数目
        double freshPermits = requiredPermits - storedPermitsToSpend;

        long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
                + (long) (freshPermits * stableIntervalMicros); 

        this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
        this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌
        return microsToNextFreeTicket;
    }

private void resync(long nowMicros) {
        // if nextFreeTicket is in the past, resync to now
        if (nowMicros > nextFreeTicketMicros) {
            storedPermits = Math.min(maxPermits,
                    storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
            nextFreeTicketMicros = nowMicros;
        }
    }

计数器

控制单位时间内的请求数量

代码语言:javascript
复制
import java.util.concurrent.atomic.AtomicInteger;

public class Counter {

    /**
     * 最大访问数量
     */
    private final int limit = 10;

    /**
     * 访问时间差
     */
    private final long timeout = 1000;

    /**
     * 请求时间
     */
    private long time;

    /**
     * 当前计数器
     */
    private AtomicInteger reqCount = new AtomicInteger(0);


    public boolean limit() {
        long now = System.currentTimeMillis();
        if (now < time + timeout) {
            // 单位时间内
            reqCount.getAndAdd(1);
            return reqCount.get() <= limit;
        } else {
            // 超出单位时间
            time = now;
            reqCount = new AtomicInteger(0);
            return true;
        }

    }

    public static void main(String[] args) {

    }

}
计数方式有没有问题?

假设每分钟请求数量为 60 个,每秒钟系统可以处理1个请求,用户在00:59 发送了60 个请求,然后在 1:00 发生了60个请求,此时 2 秒内有120个请求(每秒60)个请求,这样的方式并没有实现限制流量,因为每分钟可以处理60个,但是实际上这60个是一秒钟发过来的。

滑动窗口计数

滑动窗口是对计数方式对改进,增加一个时间粒度的度量单位。

把一分钟分成了若干等份,比如分成6份, 每份10s, 在一份独立计数器上,在 00:00-00:09 之间计数器累加1, 当等份数量越大,限流统计越详细。

代码语言:javascript
复制
package ratelimit;


import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;

public class TimeWindow {
    private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();

    /**a
     * 间隔秒数
     */
    private int seconds;

    /**
     * 最大限流
     */
    private int max;

    public TimeWindow(int max, int seconds) {
        this.seconds = seconds;
        this.max = max;

        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep((seconds - 1) * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                clean();
            }
        }).start();
    }

    public static void main(String[] args) {
        final TimeWindow timeWindow = new TimeWindow(10, 1);
        IntStream.range(0, 3).forEach((i) -> {
            new Thread(() -> {
                try {
                    while (true) {
                        Thread.sleep(new Random().nextInt(20) * 100);

                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                timeWindow.take();
            }).start();
        });
    }

    public void take() {
        long start = System.currentTimeMillis();
        int size = sizeOfValid();
        if (size > max) {
            System.out.println("超限");
        }
        synchronized (queue) {
            if (sizeOfValid() > max) {
                System.err.println("超限");
                System.err.println("queue 中有:" + queue.size() + "最大数量:" + max);
            }
            this.queue.offer(System.currentTimeMillis());
        }
        System.err.println("queue 中有:" + queue.size() + "最大数量:" + max);

    }

    private int sizeOfValid() {
        Iterator<Long> it = queue.iterator();
        Long ms = System.currentTimeMillis() - seconds * 1000;
        int count = 0;
        while (it.hasNext()) {
            long t = it.next();
            if (t > ms) {
                //在时间窗口范围内
                count++;
            }
        }
        return count;
    }

    public void clean() {
        Long c = System.currentTimeMillis() - seconds * 1000;
        Long t1 = null;
        while ((t1 = queue.peek()) != null && t1 < c) {
            System.out.println("数据清理");
            queue.poll();
        }
    }
}

令牌桶问题

令牌桶规定固定容量的桶,令牌 token 以固定速度往桶内填充,当桶填满时 token 不会继续放入,每过来一个请求把 token 从桶中移除,当没有 token 可以获取时,拒绝请求。

令牌桶算法

当网络设备衡量流量是否超过额定带宽时,需要查看令牌桶,而令牌桶中会放置一定数量的令牌,一个令牌允许接口发送或接收1bit数据(有时是1 Byte数据),当接口通过1bit数据后,同时也要从桶中移除一个令牌。当桶里没有令牌的时候,任何流量都被视为超过额定带宽,只有当桶中有令牌时,数据才可以通过接口。令牌桶中的令牌不仅仅可以被移除,同样也可以往里添加,所以为了保证接口随时有数据通过,就必须不停地往桶里加令牌,由此可见,往桶里加令牌的速度,就决定了数据通过接口的速度。 因此,我们通过控制往令牌桶里加令牌的速度从而控制用户流量的带宽。而设置的这个用户传输数据的速率被称为承诺信息速率(CIR),通常以秒为单位。比如我们设置用户的带宽为1000 bit每秒,只要保证每秒钟往桶里添加1000个令牌即可。

令牌桶可以用来保护自己,主要用来对调用者频率进行限流,为的是不让自己的系统垮掉。

令牌桶算法代码

代码语言:javascript
复制
package com.netease.datastream.util.flowcontrol;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;


/**
 * <pre>
 * Created by inter12 on 15-3-18.
 * </pre>
 */
public class TokenBucket {

    // 默认桶大小个数 即最大瞬间流量是64M
    private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;

    // 一个桶的单位是1字节
    private int everyTokenSize = 1;

    // 瞬间最大流量
    private int maxFlowRate;

    // 平均流量
    private int avgFlowRate;

    // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 *
    // 1024 * 64
    private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(
            DEFAULT_BUCKET_SIZE);

    private ScheduledExecutorService scheduledExecutorService = Executors
            .newSingleThreadScheduledExecutor();

    private volatile boolean isStart = false;

    private ReentrantLock lock = new ReentrantLock(true);

    private static final byte A_CHAR = 'a';

    public TokenBucket() {
    }

    public TokenBucket(int maxFlowRate, int avgFlowRate) {
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
        this.everyTokenSize = everyTokenSize;
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public void addTokens(Integer tokenNum) {

        // 若是桶已经满了,就不再家如新的令牌
        for (int i = 0; i < tokenNum; i++) {
            tokenQueue.offer(Byte.valueOf(A_CHAR));
        }
    }

    public TokenBucket build() {

        start();
        return this;
    }

    /**
     * 获取足够的令牌个数
     * 
     * @return
     */
    public boolean getTokens(byte[] dataSize) {

//        Preconditions.checkNotNull(dataSize);
//        Preconditions.checkArgument(isStart,
//                "please invoke start method first !");

        int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
            if (!result) {
                return false;
            }

            int tokenCount = 0;
            for (int i = 0; i < needTokenNum; i++) {
                Byte poll = tokenQueue.poll();
                if (poll != null) {
                    tokenCount++;
                }
            }

            return tokenCount == needTokenNum;
        } finally {
            lock.unlock();
        }
    }

    public void start() {

        // 初始化桶队列大小
        if (maxFlowRate != 0) {
            tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
        }

        // 初始化令牌生产者
        TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
        scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,
                TimeUnit.SECONDS);
        isStart = true;

    }

    public void stop() {
        isStart = false;
        scheduledExecutorService.shutdown();
    }

    public boolean isStarted() {
        return isStart;
    }

    class TokenProducer implements Runnable {

        private int avgFlowRate;
        private TokenBucket tokenBucket;

        public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
            this.avgFlowRate = avgFlowRate;
            this.tokenBucket = tokenBucket;
        }

        @Override
        public void run() {
            tokenBucket.addTokens(avgFlowRate);
        }
    }

    public static TokenBucket newBuilder() {
        return new TokenBucket();
    }

    public TokenBucket everyTokenSize(int everyTokenSize) {
        this.everyTokenSize = everyTokenSize;
        return this;
    }

    public TokenBucket maxFlowRate(int maxFlowRate) {
        this.maxFlowRate = maxFlowRate;
        return this;
    }

    public TokenBucket avgFlowRate(int avgFlowRate) {
        this.avgFlowRate = avgFlowRate;
        return this;
    }

    private String stringCopy(String data, int copyNum) {

        StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);

        for (int i = 0; i < copyNum; i++) {
            sbuilder.append(data);
        }

        return sbuilder.toString();

    }

    public static void main(String[] args) throws IOException,
            InterruptedException {

        tokenTest();
    }

    private static void arrayTest() {
        ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(
                10);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        System.out.println(tokenQueue.size());
        System.out.println(tokenQueue.remainingCapacity());
    }

    private static void tokenTest() throws InterruptedException, IOException {
        TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512)
                .maxFlowRate(1024).build();

        BufferedWriter bufferedWriter = new BufferedWriter(
                new OutputStreamWriter(new FileOutputStream("D:/ds_test")));
        String data = "xxxx";// 四个字节
        for (int i = 1; i <= 1000; i++) {

            Random random = new Random();
            int i1 = random.nextInt(100);
            boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data,
                    i1).getBytes());
            TimeUnit.MILLISECONDS.sleep(100);
            if (tokens) {
                bufferedWriter.write("token pass --- index:" + i1);
                System.out.println("token pass --- index:" + i1);
            } else {
                bufferedWriter.write("token rejuect --- index" + i1);
                System.out.println("token rejuect --- index" + i1);
            }

            bufferedWriter.newLine();
            bufferedWriter.flush();
        }

        bufferedWriter.close();
    }

}
令牌桶和漏桶的选择问题

如果要让自己的系统不被打垮,用令牌桶,如果保证别人的系统不被打垮,用漏桶算法。

微信号:程序员开发者社区

博客:CSDN 王小明

关注我们,了解更多

参考资料
  • https://www.cnblogs.com/xuwc/p/9123078.html
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-05-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是限流
  • 限流方法
    • 漏桶算法思路
      • 令牌桶算法
        • 计数器
          • 滑动窗口计数
            • 令牌桶问题
              • 令牌桶算法
              • 参考资料
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档