首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >将任务添加到ThreadPoolExecutor的BlockingQueue中是否可取?

将任务添加到ThreadPoolExecutor的BlockingQueue中是否可取?
EN

Stack Overflow用户
提问于 2011-04-08 02:41:19
回答 5查看 10.2K关注 0票数 21

ThreadPoolExecutor的JavaDoc不清楚是否可以将任务直接添加到支持执行器的BlockingQueue中。调用executor.getQueue()The docs say“主要用于调试和监控”。

我正在用我自己的BlockingQueue构建一个ThreadPoolExecutor。我保留了对队列的引用,因此可以直接向队列中添加任务。同样的队列是由getQueue()返回的,所以我假设getQueue()中的警告适用于通过我的方法获取的后备队列的引用。

示例

代码的一般模式是:

代码语言:javascript
复制
int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer() vs executor.execute()

据我所知,它的典型用法是通过executor.execute()添加任务。上面示例中的方法具有阻塞队列的好处,而如果队列已满并拒绝我的任务,则execute()会立即失败。我还喜欢提交作业与阻塞队列进行交互;这对我来说更像是“纯粹的”生产者-消费者。

直接将任务添加到队列的含义是:我必须调用prestartAllCoreThreads(),否则没有工作线程在运行。假设没有与executor的其他交互,则不会监视队列(对ThreadPoolExecutor源代码的检查证实了这一点)。这也意味着对于直接入队,必须另外为>0个核心线程配置ThreadPoolExecutor,并且不能将其配置为允许核心线程超时。

tl;dr

给定如下配置的ThreadPoolExecutor

  • 核心线程>0
  • 核心线程不允许超时
  • 核心线程是对支持执行器的BlockingQueue的引用

是否可以将任务直接添加到队列中,而不是调用executor.execute()

相关

这个问题( producer/consumer work queues )与此类似,但并不专门涉及直接添加到队列中。

EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2011-04-08 02:53:38

如果是我,我会更喜欢使用Executor#execute()而不是Queue#offer(),原因很简单,因为我已经在使用java.util.concurrent的其他所有东西了。

您的问题很好,它激起了我的兴趣,所以我查看了ThreadPoolExecutor#execute()的源代码

代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

我们可以看到execute本身会在工作队列上调用offer(),但在必要时会先做一些不错的池操作。出于这个原因,我认为使用execute()是明智的;不使用它可能(尽管我不确定)导致池以非最佳方式运行。然而,我不认为使用offer()会破坏执行器--它看起来像是使用以下代码(也来自ThreadPoolExecutor)将任务从队列中拉出:

代码语言:javascript
复制
Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

这个getTask()方法是从循环内部调用的,所以如果executor没有关闭,它就会阻塞,直到有新任务被分配给队列(不管它来自哪里)。

注意:尽管我在这里发布了代码片段,但我们不能依赖它们来得到明确的答案-我们应该只对API进行编码。我们不知道execute()的实现将如何随着时间的推移而改变。

票数 10
EN

Stack Overflow用户

发布于 2011-04-08 03:32:29

一个技巧是实现ArrayBlockingQueue的自定义子类,并覆盖offer()方法来调用您的阻塞版本,然后您仍然可以使用正常的代码路径。

代码语言:javascript
复制
queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(正如您可能猜到的,我认为直接在队列上调用offer作为您的正常代码路径可能不是一个好主意)。

票数 11
EN

Stack Overflow用户

发布于 2013-08-30 15:00:25

通过在实例化时指定RejectedExecutionHandler,可以在队列已满时实际配置池的行为。ThreadPoolExecutor将四个策略定义为内部类,包括AbortPolicyDiscardOldestPolicyDiscardPolicy以及我个人最喜欢的CallerRunsPolicy,它在控制线程中运行新作业。

例如:

代码语言:javascript
复制
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

问题中所需的行为可以使用以下命令获得:

代码语言:javascript
复制
public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

在某些情况下,必须访问队列。最好的地方是在一个自包含的RejectedExecutionHandler中,它可以保存任何代码重复或潜在的bug,这些错误是由于在池对象的作用域中直接操作队列而引起的。请注意,ThreadPoolExecutor中包含的处理程序本身使用getQueue()

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/5585927

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档