我正在使用的一个服务在1秒内发出5个请求后开始阻塞请求。
在Spring中使用Java,我正在寻找一种方法来对线程进行排队,这样最多5个线程可以在一秒钟内访问临界区,一旦有带宽让它们继续,任何其他线程都会排队并释放。
目前,我已经尝试使用锁,但它会导致线程总是等待1/5秒,即使我们不会在没有睡眠的情况下达到每秒的最大调用数。
Lock l = new ReentrantLock();
try {
l.lock();
//critical section
} finally {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
l.unlock();
}
在这个实现中,我从来没有超过每秒5次,但是在一切都准备好返回给用户之后,我也会导致响应延迟200毫秒。
我需要一个解决方案,只延迟线程时,需要延迟。在这种情况下,应该延迟秒内的6th+调用,但不需要延迟前5个调用。同样,呼叫6-11也可以同时通过。
发布于 2019-08-23 01:18:25
这种速率限制是微服务体系结构中非常常见的问题,因为它是解决级联故障的更广泛问题的一部分。有很多库可以解决这个问题,其中一个使用最广泛的现代库是Resilience4j,它提供了一个RateLimiter
实现。你可能想要类似这样的东西:
创建限制器:
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(5)
.timeoutDuration(Duration.ofSeconds(4)) //or however long you want to wait before failing
.build();
// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
// Use registry
RateLimiter rateLimiter = rateLimiterRegistry
.rateLimiter("someServiceLimiter", config);
使用它:
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
//Or, you can use an annotation:
@RateLimiter(name = "someServiceLimiter")
public void doSomething() {
//backend call
}
发布于 2019-08-23 01:57:46
我认为使用semaphore应用程序接口来解决它将是最好的方法。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BulkheadSemaphore {
private Queue<Long> enterQueue = new LinkedList<>();
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private Semaphore semaphore;
public BulkheadSemaphore(final Long timeLimit, final int concurrentThreadsLimit) {
this.semaphore = new Semaphore(concurrentThreadsLimit);
executor.scheduleAtFixedRate(() -> {
final Long now = now();
while (!enterQueue.isEmpty() && now - enterQueue.peek() >= timeLimit) {
enterQueue.poll();
semaphore.release();
}
}, timeLimit, 200, TimeUnit.MILLISECONDS);
}
private Long now() {
return System.currentTimeMillis();
}
public void acquire() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
// todo: handle exception
}
}
public void release() {
semaphore.release();
}
}
api非常简单:
对于进入临界区的每个线程,调用bulkheadSemaphore.acqure()
bulkheadSemaphore.release()
,
为什么它能解决这个问题?
由于请求需要时间,我将timeLimit
设置为1.5秒,以满足您的1秒限制。
附言:别忘了关闭executor服务
https://stackoverflow.com/questions/57613913
复制相似问题