服务接口的流量控制策略之RateLimit

一、场景描述

很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统。

也就是面对大流量时,如何进行流量控制?

服务接口的流量控制策略:分流、降级、限流等。本文讨论下限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的高可用。

实际场景中常用的限流策略:

  • Nginx前端限流

按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

  • 业务应用系统限流

1、客户端限流

2、服务端限流

  • 数据库限流

红线区,力保数据库

二、常用的限流算法

常用的限流算法由:漏桶算法和令牌桶算法。本文不具体的详细说明两种算法的原理,原理会在接下来的文章中做说明。

1、漏桶算法

漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:

可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。

因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.

2、令牌桶算法

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.

令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.

三、基于Redis功能的实现

简陋的设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并此时我们就设置键的过期时间为60秒,每一个用户对此服务接口的访问就把键值加1,在60秒内当键值增加到10的时候,就禁止访问服务接口。在某种场景中添加访问时间间隔还是很有必要的。

1)使用Redis的incr命令,将计数器作为Lua脚本

local current
current = redis.call("incr",KEYS[1])
if tonumber(current) == 1 then
     redis.call("expire",KEYS[1],1)
 end

Lua脚本在Redis中运行,保证了incr和expire两个操作的原子性。

2)使用Reids的列表结构代替incr命令

FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > 10 THEN
ERROR "too many requests per second"
ELSE
    IF EXISTS(ip) == FALSE
        MULTI
            RPUSH(ip,ip)
           EXPIRE(ip,1)
        EXEC
    ELSE
        RPUSHX(ip,ip)
    END
    PERFORM_API_CALL()
END

Rate Limit使用Redis的列表作为容器,LLEN用于对访问次数的检查,一个事物中包含了RPUSH和EXPIRE两个命令,用于在第一次执行计数是创建列表并设置过期时间,

RPUSHX在后续的计数操作中进行增加操作。

四、基于令牌桶算法的实现

令牌桶算法可以很好的支撑突然额流量的变化即满令牌桶数的峰值。

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
 
import com.google.common.base.Preconditions;
import com.netease.datastream.util.framework.LifeCycle;
 
 20 public class TokenBucket implements LifeCycle {
 
// 默认桶大小个数 即最大瞬间流量是64M
 private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
 
// 一个桶的单位是1字节
 private int everyTokenSize = 1;
 
// 瞬间最大流量
 private int maxFlowRate;
 
// 平均流量
 private int avgFlowRate;
 
// 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
 private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
 
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 
private volatile boolean isStart = false;
 
private ReentrantLock lock = new ReentrantLock(true);
 
private static final byte A_CHAR = 'a';
 
public TokenBucket() {
 }
 
public TokenBucket(int maxFlowRate, int avgFlowRate) {
 this.maxFlowRate = maxFlowRate;
 this.avgFlowRate = avgFlowRate;
 }
 
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
 this.everyTokenSize = everyTokenSize;
 this.maxFlowRate = maxFlowRate;
 this.avgFlowRate = avgFlowRate;
 }
 
public void addTokens(Integer tokenNum) {
 
// 若是桶已经满了,就不再家如新的令牌
 for (int i = 0; i < tokenNum; i++) {
 tokenQueue.offer(Byte.valueOf(A_CHAR));
 }
 }
 
public TokenBucket build() {
 
start();
 return this;
 }
 
/**
 * 获取足够的令牌个数
 *
 * @return
 */
 public boolean getTokens(byte[] dataSize) {
 
Preconditions.checkNotNull(dataSize);
 Preconditions.checkArgument(isStart, "please invoke start method first !");
 
int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
 
final ReentrantLock lock = this.lock;
 lock.lock();
 try {
 boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
 if (!result) {
 return false;
 }
 
int tokenCount = 0;
 for (int i = 0; i < needTokenNum; i++) {
 Byte poll = tokenQueue.poll();
 if (poll != null) {
 tokenCount++;
 }
 }
 
return tokenCount == needTokenNum;
 } finally {
 lock.unlock();
 }
 }
 
@Override
 public void start() {
 
// 初始化桶队列大小
 if (maxFlowRate != 0) {
 tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
 }
 
// 初始化令牌生产者
 TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
 scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
 isStart = true;
 
}
 
@Override
 public void stop() {
 isStart = false;
 scheduledExecutorService.shutdown();
 }
 
@Override
 public boolean isStarted() {
 return isStart;
 }
 
class TokenProducer implements Runnable {
 
private int avgFlowRate;
 private TokenBucket tokenBucket;
 
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
 this.avgFlowRate = avgFlowRate;
 this.tokenBucket = tokenBucket;
 }
 
@Override
 public void run() {
 tokenBucket.addTokens(avgFlowRate);
 }
 }
 
public static TokenBucket newBuilder() {
 return new TokenBucket();
 }
 
public TokenBucket everyTokenSize(int everyTokenSize) {
 this.everyTokenSize = everyTokenSize;
 return this;
 }
 
public TokenBucket maxFlowRate(int maxFlowRate) {
 this.maxFlowRate = maxFlowRate;
 return this;
 }
 
public TokenBucket avgFlowRate(int avgFlowRate) {
 this.avgFlowRate = avgFlowRate;
 return this;
 }
 
private String stringCopy(String data, int copyNum) {
 
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
 
for (int i = 0; i < copyNum; i++) {
 sbuilder.append(data);
 }
 
return sbuilder.toString();
 
}
 
public static void main(String[] args) throws IOException, InterruptedException {
 
tokenTest();
 }
 
private static void arrayTest() {
 ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
 tokenQueue.offer(1);
 tokenQueue.offer(1);
 tokenQueue.offer(1);
 System.out.println(tokenQueue.size());
 System.out.println(tokenQueue.remainingCapacity());
 }
 
private static void tokenTest() throws InterruptedException, IOException {
 TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
 
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
 String data = "xxxx";// 四个字节
 for (int i = 1; i <= 1000; i++) {
 
Random random = new Random();
 int i1 = random.nextInt(100);
 boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
 TimeUnit.MILLISECONDS.sleep(100);
 if (tokens) {
 bufferedWriter.write("token pass --- index:" + i1);
 System.out.println("token pass --- index:" + i1);
 } else {
 bufferedWriter.write("token rejuect --- index" + i1);
 System.out.println("token rejuect --- index" + i1);
 }
 
bufferedWriter.newLine();
 bufferedWriter.flush();
 }
 
bufferedWriter.close();
 }
 
}

五、示例

RateLimiter 使用Demo

package ratelimite;

import com.google.common.util.concurrent.RateLimiter;

public class RateLimiterDemo {

public static void main(String[] args) {

testNoRateLimiter();

testWithRateLimiter();

}

public static void testNoRateLimiter() {
Long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
System.out.println("call execute.." + i);
}
Long end = System.currentTimeMillis();
System.out.println(end - start);
}

public static void testWithRateLimiter() {
Long start = System.currentTimeMillis();
RateLimiter limiter = RateLimiter.create(10.0); // 每秒不超过10个任务被提交  

for (int i = 0; i < 10; i++) {

limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞  

System.out.println("call execute.." + i);
}

Long end = System.currentTimeMillis();
System.out.println(end - start);
}
}

五、 Guava并发:ListenableFuture与RateLimiter示例

概念

ListenableFuture顾名思义就是可以监听的Future,它是对Java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用ListenableFuture Guava帮我们检测Future是否完成了,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

推荐使用第二种方法,因为第二种方法可以直接得到Future的返回值,或者处理错误情况。本质上第二种方法是通过调动第一种方法实现的,做了进一步的封装。

另外ListenableFuture还有其他几种内置实现:

  1. SettableFuture:不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,可以通过程序设置此Future的返回值或者异常信息
  2. CheckedFuture: 这是一个继承自ListenableFuture接口,他提供了checkedGet()方法,此方法在Future执行发生异常时,可以抛出指定类型的异常。

示例

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;

public class ListenableFutureDemo {
 
    public static void main(String[] args) {
        testRateLimiter();
        testListenableFuture();
    }
 
    /**
     * RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数
     */
 
    public static void testRateLimiter() {
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交

        for (int i = 0; i < 10; i++) {
 
            limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
 
            final ListenableFuture<Integer> listenableFuture = executorService
 
                    .submit(new Task("is "+ i));
 
        }
 
    }
 
    public static void testListenableFuture() {
 
        ListeningExecutorService executorService = MoreExecutors
 
                .listeningDecorator(Executors.newCachedThreadPool());
 
        final ListenableFuture<Integer> listenableFuture = executorService.submit(new Task("testListenableFuture"));
                
        //同步获取调用结果
        try {
 
            System.out.println(listenableFuture.get());
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        } catch (ExecutionException e1) {
            e1.printStackTrace();
 
        }
 

        //第一种方式
 
        listenableFuture.addListener(new Runnable() {
 
            @Override
 
            public void run() {
                try {
                    System.out.println("get listenable future's result " + listenableFuture.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);

        //第二种方式
 
        Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(Integer result) {
                System.out .println("get listenable future's result with callback " + result);
            }

            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
 
        });
    }
}

class Task implements Callable<Integer> {
 
    String str;
 
    public Task(String str){
 
        this.str = str;
 
    }
    @Override
 
    public Integer call() throws Exception {
 
        System.out.println("call execute.." + str);
 
        TimeUnit.SECONDS.sleep(1);
 
        return 7;
 
    }
 
}

guava版本

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
</dependency>

转载自:

https://blog.csdn.net/lovelichao12/article/details/73973929

原文发布于微信公众号 - 我的小碗汤(mysmallsoup)

原文发表时间:2018-10-31

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏向治洪

android 加载图片oom若干方案小结

本文根据网上提供的一些技术方案加上自己实际开发中遇到的情况小结。 众所周知,每个Android应用程序在运行时都有一定的内存限制,限制大小一般为16MB或2...

2008
来自专栏Phoenix的Android之旅

Dagger2 Android应用:@Component和@Module

这部分会介绍一下DI的主要概念,包括Component,Module,但不涉及和Android有关的具体代码。

1342
来自专栏JackieZheng

把玩爬虫框架Gecco

如果你现在接到一个任务,获取某某行业下的分类。 作为一个非该领域专家,没有深厚的运营经验功底,要提供一套摆的上台面且让人信服的行业分类,恐怕不那么简单。 找不到...

5714
来自专栏wblearn

我的Web开发实战总结(二)

这篇是继我的Web开发实战总结(一)的第二篇文章,在此篇里,我主要总结一下如何把Web页面上的报表或列表数据转换成pdf文件下载到本地。其中涉及到的知识我也会提...

1141
来自专栏点滴积累

使用 python 处理 nc 数据

6624
来自专栏祝威廉

Spark Tungsten-sort Based Shuffle 分析

看这篇文章前,建议你先简单看看Spark Sort Based Shuffle内存分析。

1392
来自专栏cmazxiaoma的架构师之路

C#学习之路(1)--数据库技术

2094
来自专栏木东居士的专栏

漫谈并发编程:Future模型(Java、Clojure、Scala多语言角度分析)

3253
来自专栏Windows Community

Windows Community Toolkit 3.0 - Gaze Interaction

Gaze Input & Tracking - 也就是视觉输入和跟踪,是一种和鼠标/触摸屏输入非常不一样的交互方式,利用人类眼球的识别和眼球方向角度的跟踪,来判...

1283
来自专栏扎心了老铁

python分布式环境下的限流器

项目中用到了限流,受限于一些实现方式上的东西,手撕了一个简单的服务端限流器。 服务端限流和客户端限流的区别,简单来说就是: 1)服务端限流 对接口请求进行限流,...

5228

扫码关注云+社区

领取腾讯云代金券