我正在尝试编写一个解决方案,其中单个线程产生可以并行执行的I/O密集型任务。每个任务都有大量的内存数据。因此,我希望能够限制当前挂起的任务的数量。
如果我像这样创建ThreadPoolExecutor:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(maxQueue));
然后executor.submit(callable)
抛出RejectedExecutionException
当队列填满并且所有线程都已经很忙时。
我能做些什么executor.submit(callable)
是否在队列已满且所有线程都繁忙时阻塞?
编辑:我试过了this
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
它在某种程度上达到了我想要的效果,但以一种不优雅的方式(基本上被拒绝的线程在调用线程中运行,因此这阻止了调用线程提交更多线程)。
编辑:(提问5年后)
对于任何阅读此问题及其答案的人,请不要将接受的答案视为一个正确的解决方案。请通读所有答案和评论。
发布于 2010-12-24 04:56:31
我也做过同样的事情。诀窍是创建一个BlockingQueue,其中的offer()方法实际上是一个put()。(您可以使用所需的任何基本BlockingQueue实现)。
public class LimitedQueue extends LinkedBlockingQueue
{
public LimitedQueue(int maxSize)
{
super(maxSize);
}
@Override
public boolean offer(E e)
{
// turn offer() and add() into a blocking calls (unless interrupted)
try {
put(e);
return true;
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
}
请注意,这只适用于线程池,其中corePoolSize==maxPoolSize
所以在这里要小心(见注释)。
发布于 2015-08-21 00:27:04
当前接受的答案有一个潜在的重大问题-它改变了ThreadPoolExecutor.execute的行为,这样如果您有一个
corePoolSize < maxPoolSize
,ThreadPoolExecutor逻辑永远不会在核心之外添加额外的工作进程。
来自ThreadPoolExecutor.execute(可运行):
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
具体地说,最后一个'else‘块永远不会被命中。
一个更好的替代方法是做一些类似于OP已经在做的事情--使用RejectedExecutionHandler做同样的事情put
逻辑:
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
if (!executor.isShutdown()) {
executor.getQueue().put(r);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
}
}
正如评论中指出的那样,使用这种方法需要注意一些事情(请参阅此answer):
corePoolSize==0
,则会出现争用情况,此时池中的所有线程都可能在任务可见之前终止。ThreadPoolExecutor
)将导致问题,除非处理程序也以相同的方式包装它。记住这些问题,这个解决方案将适用于大多数典型的ThreadPoolExecutors,并且可以正确地处理以下情况corePoolSize < maxPoolSize
..。
发布于 2014-06-26 08:45:26
下面是我是如何解决这个问题的:
(注意:此解决方案确实会阻塞提交可调用对象的线程,因此它可以防止抛出RejectedExecutionException )
public class BoundedExecutor extends ThreadPoolExecutor{
private final Semaphore semaphore;
public BoundedExecutor(int bound) {
super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
semaphore = new Semaphore(bound);
}
/**Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
public Future submitButBlockIfFull(final Callable task) throws InterruptedException{
semaphore.acquire();
return submit(task);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
semaphore.release();
}
}
https://stackoverflow.com/questions/4521983
复制相似问题