Java:ExecutorService,在特定队列大小之后阻止提交

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (2)
  • 关注 (0)
  • 查看 (12)

我正在尝试编写一个解决方案,其中单个线程生成可以并行执行的I / O密集型任务。每项任务都有重要的内存数据。所以我希望能够限制暂时处于待处理状态的任务数量。

如果我这样创建ThreadPoolExecutor:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

然后当队列填满并且所有线程已经忙时executor.submit(callable)抛出RejectedExecutionException

executor.submit(callable)当队列满并且所有线程都忙时,我可以做些什么来阻塞?

我试过这个

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

它在某种程度上达到了我想要达到的效果,但却以一种不雅的方式实现(基本上拒绝的线程在调用线程中运行,因此阻止调用线程提交更多)。

提问于
用户回答回答于

我也做过同样的事。诀窍是创建一个BlockingQueue,在这里,service()方法实际上是一个put()。可以使用你想要的任何基本BlockingQueueimpl

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

用户回答回答于

下面是我如何解决这个问题的方法:

(注意:此解决方案确实阻止了提交可调用线程,因此防止抛出RejectedExecutionException)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}

扫码关注云+社区