首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Java:执行器与队列的关系

Java:执行器与队列的关系
EN

Stack Overflow用户
提问于 2012-05-02 01:53:16
回答 1查看 2.7K关注 0票数 1

因此,在Java并发中,有一个任务的概念,它实际上是任何实现RunnableCallable的任务(更具体地说,该接口的被覆盖的run()call()方法)。

我很难理解以下之间的关系:

ExecutorService使用的底层并发工作队列或列表结构

  • A任务(Runnable/Callable);以及提交任务的
  • Runnable ExecutorService;以及

我相信这种关系有以下几点:

开发人员

  • You必须选择哪种ExecutorService和工作结构最适合手头的任务--
  • --使用要使用的底层结构(例如,ArrayBlockingQueue)初始化ExecutorService (比方说,作为ScheduledThreadPool) (如果是的话,how?!?!)
  • You将您的任务提交给ExecutorService,后者使用其线程/池策略使用任务
  • 的副本填充给定的结构(ABQ或其他),每个生成/池线程现在从工作结构的任务副本中提取任务的副本,并执行

首先,请纠正/澄清上述假设中的任何一个,如果我在其中任何一个假设的基础上错了!

其次,如果任务只是简单地在底层工作结构中反复复制/复制(例如,列表的每个索引中的相同副本),那么您如何将一个大问题分解为较小的(并发的)问题?换句话说,如果任务只是执行步骤as,并且有一个ABQ,其中有1000个任务,那么每个线程不也只执行as吗?你怎么说“一些线程应该在and上工作,而其他线程应该在H上工作,而其他线程应该在I上工作”等等?

对于第二个例子,我可能需要一个代码示例来可视化它是如何结合在一起的。提前谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2012-05-02 02:32:08

你的最后一个假设不完全正确。ExecutorService不提取任务的副本。程序必须提供由ExecutorService单独执行的所有任务。当任务完成后,将执行队列中的下一个任务。

ExecutorService是用于处理线程池的接口。您通常要在池上执行多个任务,每个任务都在问题的不同部分执行。作为开发人员,在将问题发送到ExecutorService之前,必须指定每个任务在创建问题时应处理的问题的哪些部分。每个任务的结果(假设它们正在处理一个常见的问题)应该添加到一个BlockingQueue或其他并发集合中,在这个集合中,另一个线程可以使用结果或者等待所有任务完成。

下面是一篇关于如何使用ExecutorServicehttp://www.vogella.com/articles/JavaConcurrency/article.html#threadpools的文章。

更新:ExecutorService的一个常见用途是实现生产者/消费者模式。下面是一个我快速拼凑起来让您开始的示例--它仅用于演示目的,因为为了简单起见,忽略了一些细节和关注事项。线程池包含多个生产者线程和一个使用者线程。正在执行的工作是将0.N中的数字求和,每个生产者线程与较小的数字间隔相加,并将结果发布到BlockingQueue。使用者线程处理添加到BlockingQueue中的每个结果。

代码语言:javascript
运行
复制
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NumberCounter {

    private final ExecutorService pool = Executors.newFixedThreadPool(2);
    private final BlockingQueue<Integer> queue = new ArrayBlockingQueue(100);

    public void startCounter(int max, int workers) {
        // Create multiple tasks to add numbers. Each task submits the result
        // to the queue.
        int increment = max / workers;
        for (int worker = 0; worker < workers; worker++) {
            Runnable task = createProducer(worker * increment, (worker + 1) * increment);
            pool.execute(task);
        }

        // Create one more task that will consume the numbers, adding them up
        // and printing the results.
        pool.execute(new Runnable() {

            @Override
            public void run() {
            int sum = 0;

            while (true) {
                try {
                Integer result = queue.take();
                sum += result;
                System.out.println("New sum is " + sum);
                } catch (InterruptedException e) {
                e.printStackTrace();
                }
            }

            }
        });

    }

    private Runnable createProducer(final int start, final int stop) {
        return new Runnable() {

            @Override
            public void run() {
            System.out.println("Worker started counting from " + start + " to " + stop);
            int count = 0;
            for (int i = start; i < stop; i++) {
                count += i;
                }
                queue.add(count);
            }

        };
    }

    public static void main(String[] args) throws InterruptedException {
        NumberCounter counter = new NumberCounter();
        counter.startCounter(10000, 5);
    }

}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/10406730

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档