首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在一段时间内限制线程数

如何在一段时间内限制线程数
EN

Stack Overflow用户
提问于 2019-08-23 00:51:06
回答 2查看 703关注 0票数 4

我正在使用的一个服务在1秒内发出5个请求后开始阻塞请求。

在Spring中使用Java,我正在寻找一种方法来对线程进行排队,这样最多5个线程可以在一秒钟内访问临界区,一旦有带宽让它们继续,任何其他线程都会排队并释放。

目前,我已经尝试使用锁,但它会导致线程总是等待1/5秒,即使我们不会在没有睡眠的情况下达到每秒的最大调用数。

代码语言:javascript
运行
复制
    Lock l = new ReentrantLock();
    try {
        l.lock();
      //critical section
   } finally {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        l.unlock();
    }

在这个实现中,我从来没有超过每秒5次,但是在一切都准备好返回给用户之后,我也会导致响应延迟200毫秒。

我需要一个解决方案,只延迟线程时,需要延迟。在这种情况下,应该延迟秒内的6th+调用,但不需要延迟前5个调用。同样,呼叫6-11也可以同时通过。

EN

回答 2

Stack Overflow用户

发布于 2019-08-23 01:18:25

这种速率限制是微服务体系结构中非常常见的问题,因为它是解决级联故障的更广泛问题的一部分。有很多库可以解决这个问题,其中一个使用最广泛的现代库是Resilience4j,它提供了一个RateLimiter实现。你可能想要类似这样的东西:

创建限制器:

代码语言:javascript
运行
复制
RateLimiterConfig config = RateLimiterConfig.custom()
  .limitRefreshPeriod(Duration.ofSeconds(1))
  .limitForPeriod(5)
  .timeoutDuration(Duration.ofSeconds(4)) //or however long you want to wait before failing
  .build();

// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);

// Use registry
RateLimiter rateLimiter = rateLimiterRegistry
  .rateLimiter("someServiceLimiter", config);

使用它:

代码语言:javascript
运行
复制
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
    .decorateCheckedRunnable(rateLimiter, backendService::doSomething);

//Or, you can use an annotation:
@RateLimiter(name = "someServiceLimiter")
public void doSomething() {
    //backend call
}
票数 3
EN

Stack Overflow用户

发布于 2019-08-23 01:57:46

我认为使用semaphore应用程序接口来解决它将是最好的方法。

代码语言:javascript
运行
复制
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BulkheadSemaphore {

    private Queue<Long> enterQueue = new LinkedList<>();
    private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    private Semaphore semaphore;

    public BulkheadSemaphore(final Long timeLimit, final int concurrentThreadsLimit) {
        this.semaphore = new Semaphore(concurrentThreadsLimit);

        executor.scheduleAtFixedRate(() -> {
            final Long now = now();

            while (!enterQueue.isEmpty() && now - enterQueue.peek() >= timeLimit) {
                enterQueue.poll();
                semaphore.release();
            }
        }, timeLimit, 200, TimeUnit.MILLISECONDS);
    }

    private Long now() {
        return System.currentTimeMillis();
    }

    public void acquire() {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            // todo: handle exception
        }
    }

    public void release() {
        semaphore.release();
    }
}

api非常简单:

对于进入临界区的每个线程,调用bulkheadSemaphore.acqure()

  • After一个外部调用执行完毕,调用bulkheadSemaphore.release()

为什么它能解决这个问题?

  • 这个信号量释放对很久以前进入临界区的线程的许可。
  • 它以一定的速率释放它的许可(我将其设置为200ms,但可以更小)。它还保证,如果一个工作单元已经快速完成,则下一个线程将能够启动新的工作单元。
  • 一些线程仍然会面临多余的等待,但这并不是每次都会发生,它们最多会花费200ms。

由于请求需要时间,我将timeLimit设置为1.5秒,以满足您的1秒限制。

附言:别忘了关闭executor服务

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57613913

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档