限流最核心的就是“在指定的时间内指定的访问者能访问多少次”。
顺着这个思路?现在举个例子“访问者A在2秒内访问次数不能超过5次”。
那么你就需要一个数据结构来存储"访问者A",同时记录“2秒”的过期时间,同时要记录在这2秒内的访问次数。
计数+过期时间
接下来我们就手动编写一个单机版的限流器。
/**
* 简单的速率限制器
* @author hezhuofan
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Bucket {
/**
* 指定的请求
*/
private volatile String key;
/**
* 开始时间
*/
private volatile Long start;
/**
* 定时时长
*/
private volatile Long interval;
/**
* 当前次数
*/
private volatile AtomicInteger count;
/**
* 请求次数
*/
private volatile Integer limit;
public static void main(String[] args) {
Bucket bucket = Bucket.builder().key("request a").limit(5)
.interval(2000L).build();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
bucket.request();
}
public boolean request() {
if(start==null){
start=new Date().getTime();
}
if(System.currentTimeMillis()-start<=interval){
if(count==null){
count=new AtomicInteger(0);
}
if(count.intValue()<=limit) {
count.incrementAndGet();
return true;
}else{
System.out.println(key+"被拒绝访问");
return false;
}
}else{
start=new Date().getTime();
return request();
}
}
}
就像上面分析的那样我们使用key来记录“指定的访问者”,用start记录开始时间,用interval记录指定时间段,用limit记录允许访问的次数,使用count记录实际访问次数。
然后里边有一个request方法,这个方法会返回一个请求是否有资格被放行。
然而上面的只支持一个请求。为了支持多个请求。我们需要有一个map或者数组来维护多个请求各自的状态。
接下来我们就新建一个RateLimiter类来包装这个Bucket,使得它支持多个请求。
/**
* 支持多个请求
* @author hezhuofan
*/
public class RateLimiter {
private ConcurrentHashMap<String,Bucket> buckets=new ConcurrentHashMap<>(300);//支持多个请求
public boolean getKey(String key){
Bucket bucket=buckets.get(key);
if(bucket==null){
Bucket bucket1 = Bucket.builder().key(key).limit(12)
.interval(2000L).build();
bucket=bucket1;
buckets.put(key,bucket1);
}
return bucket.request();
}
public static void main(String[] args) {
RateLimiter rateLimiter= new RateLimiter();
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
rateLimiter.getKey("request a");
}
}
这样我们就支持了多个请求。每个请求都被存放在了map中,各自的key就是请求id本身。
好,这只是一个简单的通过计数和加过期时间的单机版的限流器。
令牌桶
事实上,限流还有令牌桶的方式。令牌桶的方式同样也是类似计数的方式。只是变为了有个地方可以发牌照,然后请求来了都去领牌照,如果拿到了则可以继续,否则就被拒绝。我们不妨来简单的实现一下:
/**
* 令牌桶
*
* 核心算法,每个请求去领取token,拿到token然后继续
* @author hezhuofan
*/
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class TokenBucket {
/**
* 唯一标识
*/
private String key;
/**
* 桶的大小
*/
private Integer limit;
/**
* 桶当前的token
*/
private volatile AtomicInteger tokens;
/**
* 加token
*/
public void addToken(){
if(tokens==null){
tokens=new AtomicInteger(0);
}
tokens.incrementAndGet();
}
/**
* 减token
*/
public void delToken(){
tokens.decrementAndGet();
}
public synchronized boolean getToken(){
if(tokens==null){
tokens=new AtomicInteger(0);
}
if(tokens.intValue()>0){
return tokens.decrementAndGet()>0;
}
return false;
}
public static void main(String[] args) {
}
}
上面我们写了一个令牌桶。用limit来表示桶的大小,用tokens来表示桶当前的容量,用key表示当前的请求ID(或者指代某个服务,比如服务ID)。
然后我们提供了对桶的基本操作。向桶中添加token,删除桶中的token,得到桶中的某个token。为了简单清晰明了,这里我们使用一个AtomicInteger的数量来表示“token们”。特别注意的是:getToken方法是通过简单的decrementAndGet方法减去1,然后判断返回结果大于0,则认为是拿到了token。
好,现在我们构建了一个基本的令牌桶。现在是时候往桶中添加token了。那么我们需要一个人按照指定速率向桶中添加token。
现在我们就去构造一个任务类。
/**
* @author hezhuofan
*/
public class TokenProducer implements Runnable {
protected TokenBucket tokenBucket;
public TokenProducer(TokenBucket tokenBucket){
this.tokenBucket=tokenBucket;
}
@Override
public void run() {
tokenBucket.addToken();
}
}
这个任务类只干一件事情:向指定的桶中添加令牌(token)。
然后我们就来构造一个管理器,这个管理器负责按照指定速率添加token。
/**
* token 生产者管理器
* @author hezhuofan
*/
public class TokenProducerManager
{
/**
* 按照指定速率添加token
* @param key 指定请求ID
* @param period 速率
* @param limit 桶的大小
*/
private static TokenBucket execute(String key,Long period,Integer limit) {
ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(1);
TokenBucket tokenBucket = TokenBucket.builder().key(key).limit(limit).build();
scheduledExecutorService.scheduleAtFixedRate(new TokenProducer(tokenBucket),0l,period, TimeUnit.SECONDS);
return tokenBucket;
}
public static TokenBucket addTokenAtFixRate(String key,Long period,Integer limit){
return execute(key,period,limit);
}
}
这里我们使用并发包里的ScheduledExecutorService来实现定时添加token。我们把刚才新建的任务类TokenProducer的实例传入,然后调用scheduleAtFixedRate来启动定时添加token的任务,然后把已实例化并且正在被定时添加token的令牌桶TokenBucket实例返回,方便后续使用。我们暴露的方法addTokenAtFixRate支持指定请求key,指定速率,指定桶的大小。
然后我们执行如下测试代码:
public static void main(String[] args) throws InterruptedException {
TokenBucket tokenBucket=TokenProducerManager.addTokenAtFixRate("request a",1l,2000);
Thread.sleep(6000L);
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
System.out.println(tokenBucket.getToken());
}
输出:
true
true
true
true
true
true
false
false
false
false
false
发现了没?其实本质上还是计数。当和前面的那种计数不一样的地方是,令牌桶支持动态的添加token,也就是动态改变上限。你可以控制添加令牌的速率。
漏桶
好,现在我们再来写个漏桶(leak bucket)算法的限流。漏桶算法和令牌桶有点像。但漏桶是直接把请求放进桶里,桶满了,其他放不进去的请求直接拒绝,。
漏桶核心是:请求来了以后,直接进桶,然后桶根据自己的漏洞大小慢慢往外面漏。
接下来我们就尝试来实现一下漏桶算法。实现漏桶最简单的方式就是使用一个FIFO的队列,一端负责不断的放入请求,另外一端负责吐出请求。
我们首先实现一个LeakBucket。
/**
* 漏桶
* @author hezhuofan
*/
public class LeakBucket<T> {
private volatile String key;
private volatile Integer limit=3000;
private volatile Queue<T> queue = new ArrayDeque<T>(this.limit);
public boolean flow(T request){
return queue.add(request);
}
public T leak(){
return queue.poll();
}
public void setLimit(Integer limit){
this.limit=limit;
}
public void setKey(String key){
this.key=key;
}
}
以上就是我们实现的简易漏桶。其中key依然是表示请求ID或服务ID,用来唯一标识。然后我们规定了漏桶的大小为3000,也就是说如果瞬间的高并发请求大量涌入的话,超出桶的大小就会直接被拒绝掉。然后是我们使用ArrayDeque这样一个FIFO队列来模拟漏桶。
接下来实现了两个核心方法一个是flow方法,一个是leak方法,顾名思义一个是负责流入,一个是负责漏出。其中flow方法使用队列的add方法实现,leak方法使用队列的poll方法实现。
现在漏桶已经有了。接下来就是需要有人负责往漏桶里加水(请求),有人按照指定的频率(对应于“漏洞的大小”)把请求从漏桶下面漏出。
为此,我们需要创建两个定时任务,一个任务负责加水,一个负责按照指定频率向外漏水。
public class FlowTask implements Runnable{
protected LeakBucket<String> leakBucket;
public FlowTask(LeakBucket<String> leakBucket){
this.leakBucket=leakBucket;
}
@Override
public void run() {
leakBucket.flow("request a");
}
}
public class LeakTask implements Runnable{
protected LeakBucket leakBucket;
public LeakTask(LeakBucket leakBucket){
this.leakBucket=leakBucket;
}
@Override
public void run() {
leakBucket.leak();
}
}
接下来我们像之前的算法那样编写一个manager类来启动两个定时任务,然后模拟漏桶效果。
public class LeakManager {
private static LeakBucket<String> execute(String key, Long period, Integer limit) {
ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(1);
LeakBucket<String> leakBucket=new LeakBucket<>();
leakBucket.setLimit(limit);
leakBucket.setKey(key);
ScheduledExecutorService flowScheduledExecutorService= Executors.newScheduledThreadPool(1);
flowScheduledExecutorService.scheduleAtFixedRate(new FlowTask(leakBucket),0l,period, TimeUnit.SECONDS);
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledExecutorService.scheduleAtFixedRate(new LeakTask(leakBucket),0l,period, TimeUnit.SECONDS);
return leakBucket;
}
/**
* 按照指定速率添加token
* @param key 指定请求ID
* @param period 速率
* @param limit 桶的大小
*/
public static LeakBucket<String> leakRequestAtFixRate(String key,Long period,Integer limit){
return execute(key,period,limit);
}
public static void main(String[] args) throws InterruptedException {
LeakBucket<String> leakBucket= LeakManager.leakRequestAtFixRate("request a",1l,2000);
Thread.sleep(6000L);
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
System.out.println(leakBucket.leak());
}
}
好,以上就是三种限流的简单实现。
分布环境下的限流
好,上面的都只是单机版的简易实现,旨在让你明白各种限流算法的核心脉络。在分布式环境下,你就需要在一个集中的地方来维护计数和队列等等这些桶了。这时候就需要用到诸如redis或zookeeper来对上面对应的变量和队列进行修改了。
现成的解决方案
以下是摘抄的一些很不错的分布式限流的现成解决思路:
1、redis incr加过期时间来限流。
int current = jedis.incr(key);
if (current + 1 > limit) //如果超出限流大小
return 0;
else if (current == 1) //只有第一次访问需要设置2秒的过期时间
jedis.expire(key, "2");
return 1
2、另外还有通过redis+lua来实现限流。
3、hystrix的线程池就类似漏桶的思路。
4、guava包中有现成的基于令牌桶的限流实现。
总结
计数+过期时间的方式就是一种粗暴的限流方式,也是常见的限流方式。但无法对流量整形。比如你规定了一分钟内请求A最多能访问60次。但如果前10秒就来了60次请求,而之后的50秒则没有请求。令牌桶的方式是请求从桶中领取token,拿到后才可继续。而漏桶则是将请求放入桶中,然后漏洞按照指定的频率漏出请求,这样就可以对突发流量进行整形,让请求总是按照漏洞的大小平稳的漏出。如果在分布式下实现限流,需要把你的计数器和漏桶队列维护到一个公共的地方,比如redis,zookeeper,数据库等。hystrix的线程池就类似漏桶的思路,guava里有现成的基于令牌桶的限流实现。