首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Java:在一定队列大小后阻塞提交的ExecutorService

Java:在一定队列大小后阻塞提交的ExecutorService
EN

Stack Overflow用户
提问于 2010-12-24 03:50:46
回答 8查看 47K关注 0票数 91

我正在尝试编写一个解决方案,其中单个线程产生可以并行执行的I/O密集型任务。每个任务都有大量的内存数据。因此,我希望能够限制当前挂起的任务的数量。

如果我像这样创建ThreadPoolExecutor:

代码语言:javascript
复制
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue(maxQueue));

然后executor.submit(callable)抛出RejectedExecutionException

当队列填满并且所有线程都已经很忙时。

我能做些什么executor.submit(callable)是否在队列已满且所有线程都繁忙时阻塞?

编辑:我试过了this

代码语言:js
复制
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

它在某种程度上达到了我想要的效果,但以一种不优雅的方式(基本上被拒绝的线程在调用线程中运行,因此这阻止了调用线程提交更多线程)。

编辑:(提问5年后)

对于任何阅读此问题及其答案的人,请不要将接受的答案视为一个正确的解决方案。请通读所有答案和评论。

EN

回答 8

Stack Overflow用户

回答已采纳

发布于 2010-12-24 04:56:31

我也做过同样的事情。诀窍是创建一个BlockingQueue,其中的offer()方法实际上是一个put()。(您可以使用所需的任何基本BlockingQueue实现)。

代码语言:javascript
复制
public class LimitedQueue extends LinkedBlockingQueue 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

请注意,这只适用于线程池,其中corePoolSize==maxPoolSize所以在这里要小心(见注释)。

票数 66
EN

Stack Overflow用户

发布于 2015-08-21 00:27:04

当前接受的答案有一个潜在的重大问题-它改变了ThreadPoolExecutor.execute的行为,这样如果您有一个

corePoolSize < maxPoolSize,ThreadPoolExecutor逻辑永远不会在核心之外添加额外的工作进程。

来自ThreadPoolExecutor.execute(可运行):

代码语言:javascript
复制
if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

具体地说,最后一个'else‘块永远不会被命中。

一个更好的替代方法是做一些类似于OP已经在做的事情--使用RejectedExecutionHandler做同样的事情put逻辑:

代码语言:javascript
复制
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

正如评论中指出的那样,使用这种方法需要注意一些事情(请参阅此answer):

  1. 如果corePoolSize==0,则会出现争用情况,此时池中的所有线程都可能在任务可见之前终止。
  2. 使用包装队列任务的实现(不适用于ThreadPoolExecutor)将导致问题,除非处理程序也以相同的方式包装它。

记住这些问题,这个解决方案将适用于大多数典型的ThreadPoolExecutors,并且可以正确地处理以下情况corePoolSize < maxPoolSize..。

票数 16
EN

Stack Overflow用户

发布于 2014-06-26 08:45:26

下面是我是如何解决这个问题的:

(注意:此解决方案确实会阻塞提交可调用对象的线程,因此它可以防止抛出RejectedExecutionException )

代码语言:javascript
复制
public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public  Future submitButBlockIfFull(final Callable task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
票数 15
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/4521983

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档