我刚开始学习Java8中的Stream API和函数式编程,但对Java并不陌生。我感兴趣的是了解和理解Stream API如何选择执行计划。
它如何知道哪些部分应该并行,哪些部分不应该并行?到底存在多少种类型的执行计划?
基本上,我想知道为什么Java 8中的Streams有助于提高速度,以及它是如何实现这种“魔力”的。
我找不到太多关于它是如何工作的文献。
发布于 2018-04-18 20:48:35
您可能已经知道,Stream API使用Spliterator和ForkJoinPool来执行并行计算。Spliterator用于遍历和划分元素序列,而ForkJoinPool框架递归地将任务分解为更小的、独立的子任务,直到它们足够简单,可以异步执行。
作为一个并行计算框架如何在并行计算中使用Spliterator和ForkJoinPool的示例,这里有一种实现相关并行forEach的方法,它说明了主要的习惯用法:
public static void main(String[] args) {
List<Integer> list = new SplittableRandom()
.ints(24, 0, 100)
.boxed().collect(Collectors.toList());
parallelEach(list, System.out::println);
}
static <T> void parallelEach(Collection<T> c, Consumer<T> action) {
Spliterator<T> s = c.spliterator();
long batchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() * 8);
new ParallelEach(null, s, action, batchSize).invoke(); // invoke the task
}Fork Join任务:
static class ParallelEach<T> extends CountedCompleter<Void> {
final Spliterator<T> spliterator;
final Consumer<T> action;
final long batchSize;
ParallelEach(ParallelEach<T> parent, Spliterator<T> spliterator,
Consumer<T> action, long batchSize) {
super(parent);
this.spliterator = spliterator;
this.action = action;
this.batchSize = batchSize;
}
// The main computation performed by this task
@Override
public void compute() {
Spliterator<T> sub;
while (spliterator.estimateSize() > batchSize &&
(sub = spliterator.trySplit()) != null) {
addToPendingCount(1);
new ParallelEach<>(this, sub, action, batchSize).fork();
}
spliterator.forEachRemaining(action);
propagateCompletion();
}
}此外,请记住,并行计算可能并不总是比顺序计算快,您总是有一个选择- When to use parallel stream。
https://stackoverflow.com/questions/49897078
复制相似问题