我正在使用最新的具有并行流的Java 8 lambda处理数据。我的代码如下:
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
List<String> files = Arrays.asList(new String[]{"1.txt"});
List<String> result = forkJoinPool.submit(() ->
files.stream().parallel()
.flatMap(x -> stage1(x)) //at this stage we add more elements to the stream
.map(x -> stage2(x))
.map(x -> stage3(x))
.collect(Collectors.toList())
).get();流从一个元素开始,但是在第二个阶段添加了更多的元素。我的假设是,这个流应该并行运行,但在这种情况下,只使用一个工作线程。
如果我从两个元素开始(即我将第二个元素添加到初始列表中),那么就会产生两个线程来处理流,等等……如果我没有显式地将流提交给ForkJoinPool,也会发生这种情况。
问题是:这是记录在案的行为,还是在实施过程中可能发生变化?是否有任何方法来控制这种行为并允许更多的线程,而不管最初的列表如何?
发布于 2014-09-07 03:24:45
您所观察到的是特定于实现的行为,而不是特定的行为。
当前的JDK 8实现查看最外层流的Spliterator,并使用它作为划分并行工作负载的基础。因为这个示例在原始源流中只有一个元素,所以不能分割它,并且流运行为单线程。对于flatMap返回零个、一个或几个元素的情况(但不仅仅是这样),这很好,但是在返回许多元素的情况下,它们都是按顺序处理的。实际上,flatMap函数返回的流被迫进入顺序模式。请参阅ReferencePipeline.java的第270行。
要做的“显而易见”的事情是使这个流并行,或者至少不要强迫它是顺序的。这可能会或不会改善一些事情。很有可能它会改善一些事情,但会使其他事情变得更糟。这里当然需要一个更好的政策,但我不确定它会是什么样子。
还请注意,通过向并行流提交运行管道的任务,迫使并行流在您选择的叉-连接池中运行的技术也是特定于实现的行为。它在JDK 8中是这样工作的,但将来可能会改变。
发布于 2015-03-11 16:16:22
您可以尝试从LazyFutureStream或EagerFutureStream Stream实现简单反应。这两个流将为每个处理单元创建一个CompletableFuture,每个处理单元都可以在一个单独的线程上执行。这可能会导致更有效的处理(取决于您的实际用例和资源)。
例如。
LazyFutureStream.parallelBuilder(10)
.of("1.txt")
.flatMap(x -> stage1(x))
.map(x -> stage2(x))
.map(x -> stage3(x))
.collect(Collectors.toList());或
EagerFutureStream.parallelBuilder(10)
.of("1.txt")
.flatMap(x -> stage1(x))
.map(x -> stage2(x))
.map(x -> stage3(x))
.collect(Collectors.toList());https://stackoverflow.com/questions/25706234
复制相似问题