在开发高并发系统时,有很多手段来保护系统,如缓存、降级、限流等。缓存可以提升系统的访问速度,降级可以暂时屏蔽掉非核心业务,使得核心业务不受影响。限流的目的是通过对并发访问进行限速,一旦达到一定的速率就可以拒绝服务(定向到错误页或告知资源没有了)、排队等待(如秒杀、评论、下单等)、降级(直接返回兜底数据,如商品库存默认有货)。
目前主要有以下几种限流方式:
信号量实际上就是限制系统的并发量,来达到限流的目的。
常见的用法是:创建Semaphore,指定permit的数量。在方法开始时,调用Semaphore.acquire()或者Semaphore.tryAcquire()来获取permit,并在方法返回前,调用Semaphore.release()来返还permit。
示例如下:
public class RateLimiter {
private Semaphore semaphore;
public RateLimiter(int permits) {
semaphore = new Semaphore(permits);
}
public void acquire(int permits) throws InterruptedException {
semaphore.acquire(permits);
}
public void release(int permits) {
semaphore.release(permits);
}
}
使用示例如下:
public class Action {
private static final Logger logger = LoggerFactory.getLogger(Action.class);
private RateLimiter rateLimiter;
public Action() {
rateLimiter = new RateLimiter(3);
}
public void doSomething() throws InterruptedException {
try {
rateLimiter.acquire(1);
logger.info("do something");
Thread.sleep(1000);
} finally {
rateLimiter.release(1);
}
}
public static void main(String[] args) throws InterruptedException {
Action action = new Action();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
threadList.add(new Thread(new Runnable() {
@Override
public void run() {
try {
action.doSomething();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}));
}
for (Thread thread : threadList) {
thread.start();
}
for (Thread thread : threadList) {
thread.join();
}
}
}
执行结果如下:
可以看到,使用信号量可以限制方法调用的并发数,从而达到限流的目的。
信号量主要存在以下问题:
计数器的方案比较简单。比如限制1秒钟内请求数最多为10个,每当进来一个请求,则计数器+1。当计数器达到上限时,则触发限流。时间每经过1秒,则重置计数器。
示例如下:
public class RateLimiter {
private long updateTimeStamp;
private int intervalMilliSeconds;
private int maxPermits;
private long storedPermits;
public RateLimiter(int maxPermits) {
this(maxPermits, 1);
}
public RateLimiter(int maxPermits, int intervalSeconds) {
this.maxPermits = maxPermits;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public synchronized Boolean acquire(int permits) {
while (true) {
long now = System.currentTimeMillis();
if (now < updateTimeStamp + intervalMilliSeconds) {
if (storedPermits + permits <= maxPermits) {
storedPermits += permits;
updateTimeStamp = now;
return true;
} else {
return false;
}
} else {
storedPermits = 0;
updateTimeStamp = now;
}
}
}
}
计数器主要存在以下问题:
滑动窗口本质上也是一种计数器,只不过它的粒度更细。比如限制qps为1000,设定窗口大小为10,则每个窗口的时间间隔为100ms。每次窗口滑动时,重置的是前1s至900ms之间内的计数,而不是完整的1s。
示例如下:
public class RateLimiter {
private LinkedList<Integer> deque = new LinkedList<>();
private int windowSize;
private int windowIntervalMilliSeconds;
private int currentWindowPermits;
private long updateTimeStamp;
private int intervalMilliSeconds;
private int maxPermits;
private long storedPermits;
public RateLimiter(int maxPermits, int windowSize) {
this(maxPermits, 1, windowSize);
}
public RateLimiter(int maxPermits, int intervalSeconds, int windowSize) {
this.maxPermits = maxPermits;
this.intervalMilliSeconds = intervalSeconds * 1000;
this.windowSize = windowSize;
this.windowIntervalMilliSeconds = intervalMilliSeconds / windowSize;
}
public synchronized Boolean acquire(int permits) {
while (true) {
long now = System.currentTimeMillis();
if (now < updateTimeStamp + windowIntervalMilliSeconds) {
if (storedPermits + permits + currentWindowPermits <= maxPermits) {
currentWindowPermits += permits;
updateTimeStamp = now;
return true;
} else {
return false;
}
} else {
updateTimeStamp = now;
deque.offerLast(currentWindowPermits);
storedPermits += currentWindowPermits;
currentWindowPermits = 0;
while (deque.size() > windowSize) {
storedPermits -= deque.removeFirst();
}
}
}
}
}
滑动窗口仍然没有解决计数器的问题。
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。
实例如下:
public class RateLimiter {
private int capacity;
private int rate;
private int water;
private int intervalMilliSeconds;
private Semaphore exit;
public RateLimiter(int capacity, int rate) {
this(capacity, rate, 1);
}
public RateLimiter(int capacity, int rate, int intervalSeconds) {
this.exit = new Semaphore(1);
this.capacity = capacity;
this.rate = rate;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public Boolean acquire(int permits) throws InterruptedException {
if (water + permits <= capacity) {
water += permits;
while (!exit.tryAcquire(permits, intervalMilliSeconds / rate, TimeUnit.MILLISECONDS)) {}
Thread.sleep(intervalMilliSeconds / rate);
water -= permits;
exit.release(permits);
return true;
} else {
return false;
}
}
}
漏桶算法的主要问题是:
令牌桶算法的原理是,系统以固定的速率往令牌桶中放入令牌;当请求进来时,则从桶中取走令牌;当桶中令牌为空时,触发限流。
示例如下:
public class RateLimiter {
private long updateTimeStamp;
private int capacity;
private int rate;
private int tokens;
private int intervalMilliSeconds;
public RateLimiter(int capacity, int rate) {
this(capacity, rate, 1);
}
public RateLimiter(int capacity, int rate, int intervalSeconds) {
this.capacity = capacity;
this.rate = rate;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public synchronized Boolean acquire(int permits) {
long now = System.currentTimeMillis();
int newTokens = (int) ((now - updateTimeStamp) / intervalMilliSeconds * rate);
if (newTokens > 0) {
this.tokens = Math.min(capacity, this.tokens + newTokens);
this.updateTimeStamp = now;
}
if (tokens - permits >= 0) {
tokens -= permits;
return true;
} else {
return false;
}
}
}
令牌桶与漏桶相比,好处在于它支持突发流量。
Guava中的RateLimiter提供了令牌桶算法的实现,我们可以直接使用。有关原理的说明请参考:Guava RateLimiter分析
前面我们提到的算法都只能做单机的限流方案,并不支持集群的限流。当然,我们可以根据总体QPS/机器数来做临时方案。不过该方案取决于前端负载均衡的平衡情况,而且当应用增减机器时,需要动态调整该参数,并不十分方便。
我们可以借助于redis,并参考Guava的实现原理来实现一个分布式限流方案。
http://tech.dianwoda.com/2017/09/11/talk-about-rate-limit/ http://moguhu.com/article/detail?articleId=73 https://www.iphpt.com/detail/106/ https://juejin.im/entry/57cce5d379bc440063066d09