ThreadPoolExecutor的JavaDoc不清楚是否可以将任务直接添加到支持执行器的BlockingQueue
中。调用executor.getQueue()
的The docs say“主要用于调试和监控”。
我正在用我自己的BlockingQueue
构建一个ThreadPoolExecutor
。我保留了对队列的引用,因此可以直接向队列中添加任务。同样的队列是由getQueue()
返回的,所以我假设getQueue()
中的警告适用于通过我的方法获取的后备队列的引用。
示例
代码的一般模式是:
int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
Runnable job = ...;
queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
try {
Thread.sleep(...);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdownNow();
queue.offer()
vs executor.execute()
据我所知,它的典型用法是通过executor.execute()
添加任务。上面示例中的方法具有阻塞队列的好处,而如果队列已满并拒绝我的任务,则execute()
会立即失败。我还喜欢提交作业与阻塞队列进行交互;这对我来说更像是“纯粹的”生产者-消费者。
直接将任务添加到队列的含义是:我必须调用prestartAllCoreThreads()
,否则没有工作线程在运行。假设没有与executor的其他交互,则不会监视队列(对ThreadPoolExecutor
源代码的检查证实了这一点)。这也意味着对于直接入队,必须另外为>0个核心线程配置ThreadPoolExecutor
,并且不能将其配置为允许核心线程超时。
tl;dr
给定如下配置的ThreadPoolExecutor
:
BlockingQueue
的引用是否可以将任务直接添加到队列中,而不是调用executor.execute()
相关
这个问题( producer/consumer work queues )与此类似,但并不专门涉及直接添加到队列中。
发布于 2011-04-08 02:53:38
如果是我,我会更喜欢使用Executor#execute()
而不是Queue#offer()
,原因很简单,因为我已经在使用java.util.concurrent
的其他所有东西了。
您的问题很好,它激起了我的兴趣,所以我查看了ThreadPoolExecutor#execute()
的源代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
我们可以看到execute本身会在工作队列上调用offer()
,但在必要时会先做一些不错的池操作。出于这个原因,我认为使用execute()
是明智的;不使用它可能(尽管我不确定)导致池以非最佳方式运行。然而,我不认为使用offer()
会破坏执行器--它看起来像是使用以下代码(也来自ThreadPoolExecutor)将任务从队列中拉出:
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
这个getTask()
方法是从循环内部调用的,所以如果executor没有关闭,它就会阻塞,直到有新任务被分配给队列(不管它来自哪里)。
注意:尽管我在这里发布了代码片段,但我们不能依赖它们来得到明确的答案-我们应该只对API进行编码。我们不知道execute()
的实现将如何随着时间的推移而改变。
发布于 2011-04-08 03:32:29
一个技巧是实现ArrayBlockingQueue的自定义子类,并覆盖offer()方法来调用您的阻塞版本,然后您仍然可以使用正常的代码路径。
queue = new ArrayBlockingQueue<Runnable>(queueSize) {
@Override public boolean offer(Runnable runnable) {
try {
return offer(runnable, 1, TimeUnit.HOURS);
} catch(InterruptedException e) {
// return interrupt status to caller
Thread.currentThread().interrupt();
}
return false;
}
};
(正如您可能猜到的,我认为直接在队列上调用offer作为您的正常代码路径可能不是一个好主意)。
发布于 2013-08-30 15:00:25
通过在实例化时指定RejectedExecutionHandler
,可以在队列已满时实际配置池的行为。ThreadPoolExecutor
将四个策略定义为内部类,包括AbortPolicy
、DiscardOldestPolicy
、DiscardPolicy
以及我个人最喜欢的CallerRunsPolicy
,它在控制线程中运行新作业。
例如:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
nproc, // core size
nproc, // max size
60, // idle timeout
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.
问题中所需的行为可以使用以下命令获得:
public class BlockingPolicy implements RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.getQueue.put(r); // Self contained, no queue reference needed.
}
在某些情况下,必须访问队列。最好的地方是在一个自包含的RejectedExecutionHandler
中,它可以保存任何代码重复或潜在的bug,这些错误是由于在池对象的作用域中直接操作队列而引起的。请注意,ThreadPoolExecutor
中包含的处理程序本身使用getQueue()
。
https://stackoverflow.com/questions/5585927
复制相似问题