我正在使用最新的具有并行流的Java 8 lambda处理数据。我的代码如下:
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
List<String> files = Arrays.asList(new String[]{"1.txt"});
List<String> result = forkJoinPool.submit(() ->
files.stream().parallel()
.flatMap(x -> stage1(x)) //at this stage we add more elements to the stream
.map(x -> stage2(x))
.map(x -> stage3(x))
.collect(Collectors.toList())
).get();流从一个元素开始,但是在第二个阶段添加了更多的元素。我的假设是,这个流应该并行运行,但在这种情况下,只使用一个工作线程。
如果我从两个元素开始(即我将第二个元素添加到初始列表中),那么就会产生两个线程来处理流,等等……如果我没有显式地将流提交给ForkJoinPool,也会发生这种情况。
问题是:这是记录在案的行为,还是在实施过程中可能发生变化?是否有任何方法来控制这种行为并允许更多的线程,而不管最初的列表如何?
发布于 2015-03-11 16:16:22
您可以尝试从LazyFutureStream或EagerFutureStream Stream实现简单反应。这两个流将为每个处理单元创建一个CompletableFuture,每个处理单元都可以在一个单独的线程上执行。这可能会导致更有效的处理(取决于您的实际用例和资源)。
例如。
LazyFutureStream.parallelBuilder(10)
.of("1.txt")
.flatMap(x -> stage1(x))
.map(x -> stage2(x))
.map(x -> stage3(x))
.collect(Collectors.toList());或
EagerFutureStream.parallelBuilder(10)
.of("1.txt")
.flatMap(x -> stage1(x))
.map(x -> stage2(x))
.map(x -> stage3(x))
.collect(Collectors.toList());https://stackoverflow.com/questions/25706234
复制相似问题