首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Java 8并行流中的自定义线程池

Java 8并行流中的自定义线程池
EN

Stack Overflow用户
提问于 2014-01-16 13:26:37
回答 16查看 253.1K关注 0票数 468

是否可以为Java8 平行流指定一个自定义线程池?我哪儿都找不到。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在应用程序块任务的一个模块中从另一个模块中缓慢运行任务。

如果我不能对不同的模块使用不同的线程池,这意味着在大多数实际情况下,我不能安全地使用并行流。

尝试下面的示例。有一些CPU密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务中断了,所以每一步都需要1秒(线程睡眠模拟)。问题是其他线程会被卡住,等待坏掉的任务完成。这是一个精心设计的示例,但是假设一个servlet应用程序和一些人向共享的叉连接池提交了一个长期运行的任务。

代码语言:javascript
运行
复制
public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}
EN

回答 16

Stack Overflow用户

回答已采纳

发布于 2014-03-08 13:12:23

实际上,有一个技巧是如何在特定的叉-连接池中执行并行操作。如果您在叉-连接池中作为任务执行它,则它将停留在那里而不使用普通的任务。

代码语言:javascript
运行
复制
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

该技巧基于ForkJoinTask.fork,其中指定:“安排在当前任务运行的池中异步执行此任务,如果适用的话,或者使用ForkJoinPool.commonPool() (如果不是inForkJoinPool())”。

票数 491
EN

Stack Overflow用户

发布于 2014-01-16 20:58:02

并行流使用默认的ForkJoinPool.commonPool默认情况下,只有一个线程,因为您有处理器。,由Runtime.getRuntime().availableProcessors()返回(这意味着并行流为调用线程留下一个处理器)。

对于需要单独或自定义池的应用程序,可以使用给定的目标并行级别构造ForkJoinPool;默认情况下,等于可用处理器的数量。

这也意味着,如果嵌套并行流或多个并行流同时启动,它们都将共享相同的池。优点:您将永远不会使用超过默认值(可用处理器数量)。缺点:您可能无法获得分配给您启动的每个并行流的“所有处理器”(如果您有多个并行流)。(显然,您可以使用ManagedBlocker来避免这种情况。)

若要更改并行流的执行方式,您可以

  • 将并行流执行提交给您自己的ForkJoinPool:yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
  • 您可以使用系统属性:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")来更改公共池的大小,用于20个线程的目标并行性。

后者在我的机器上的例子,它有8个处理器。如果我运行以下程序:

代码语言:javascript
运行
复制
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

产出如下:

215 216 216 216 316 316 316 416 416 416

因此,您可以看到并行流一次处理8个项,即它使用8个线程。但是,如果我取消注释注释行,则输出是:

215 215 215 216 216 216

这一次,并行流使用了20个线程,流中的所有20个元素都被并发处理。

票数 239
EN

Stack Overflow用户

发布于 2015-01-03 08:05:57

或者,为了触发您自己的forkJoinPool中的并行计算,您还可以将该池传递给CompletableFuture.supplyAsync方法,如:

代码语言:javascript
运行
复制
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);
票数 50
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21163108

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档