首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Java Stream API如何选择执行计划?

Java Stream API如何选择执行计划?
EN

Stack Overflow用户
提问于 2018-04-18 18:14:13
回答 2查看 282关注 0票数 10

我刚开始学习Java8中的Stream API和函数式编程,但对Java并不陌生。我感兴趣的是了解和理解Stream API如何选择执行计划。

它如何知道哪些部分应该并行,哪些部分不应该并行?到底存在多少种类型的执行计划?

基本上,我想知道为什么Java 8中的Streams有助于提高速度,以及它是如何实现这种“魔力”的。

我找不到太多关于它是如何工作的文献。

EN

回答 2

Stack Overflow用户

发布于 2018-04-18 19:09:10

这个问题有点宽泛,不能详细解释,但我会尽力回答得令人满意。我还使用了一个ArrayList流的例子。

当我们创建一个流时,返回的对象称为ReferencePipeline。这个对象可以说是“默认流”对象,因为它还没有任何功能。现在我们必须在懒惰方法和急切方法之间做出选择。因此,让我们来看一下每个示例。

示例一:filter(Predicate<?>) 方法:

filter()方法声明如下:

代码语言:javascript
运行
复制
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

正如您所看到的,它返回一个StatelessOp对象,该对象基本上是一个新的ReferencePipeline,其中过滤器评估现在是“启用”的。换句话说:每次我们向流添加一个新的“功能”时,它都会在旧的流水线的基础上创建一个新的流水线,并使用适当的操作标志/方法覆盖。

正如您可能已经知道的那样,直到调用急切操作时,才会对流进行评估。因此,我们需要一种急切的方法来评估流。

示例二:forEach(Consumer<?>) 方法:

代码语言:javascript
运行
复制
@Override
public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}

首先,这段代码相当短,evaluate()方法除了调用invoke()方法之外,并没有做更多的事情。在这里,理解ForEachOps.makeRef()的作用是很重要的。它设置所需的最后一个标志,并创建一个工作方式与ForkJoinTask对象完全相同的ForEachTask<>。幸运的是,Andrew找到了一个关于它们如何工作的很好的paper

注意:确切的源码可以在here中找到。

票数 5
EN

Stack Overflow用户

发布于 2018-04-18 20:48:35

您可能已经知道,Stream API使用SpliteratorForkJoinPool来执行并行计算。Spliterator用于遍历和划分元素序列,而ForkJoinPool框架递归地将任务分解为更小的、独立的子任务,直到它们足够简单,可以异步执行。

作为一个并行计算框架如何在并行计算中使用SpliteratorForkJoinPool的示例,这里有一种实现相关并行forEach的方法,它说明了主要的习惯用法:

代码语言:javascript
运行
复制
public static void main(String[] args) {
    List<Integer> list = new SplittableRandom()
        .ints(24, 0, 100)
        .boxed().collect(Collectors.toList());

    parallelEach(list, System.out::println);
}

static <T> void parallelEach(Collection<T> c, Consumer<T> action) {
    Spliterator<T> s = c.spliterator();
    long batchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() * 8);
    new ParallelEach(null, s, action, batchSize).invoke(); // invoke the task
}

Fork Join任务:

代码语言:javascript
运行
复制
static class ParallelEach<T> extends CountedCompleter<Void> {
    final Spliterator<T> spliterator;
    final Consumer<T> action;
    final long batchSize;

    ParallelEach(ParallelEach<T> parent, Spliterator<T> spliterator,
                 Consumer<T> action, long batchSize) {
        super(parent);
        this.spliterator = spliterator;
        this.action = action;
        this.batchSize = batchSize;
    }

    // The main computation performed by this task
    @Override
    public void compute() {
        Spliterator<T> sub;
        while (spliterator.estimateSize() > batchSize &&
              (sub = spliterator.trySplit()) != null) {
            addToPendingCount(1);
            new ParallelEach<>(this, sub, action, batchSize).fork();
        }
        spliterator.forEachRemaining(action);
        propagateCompletion();
    }
}

Original source.

此外,请记住,并行计算可能并不总是比顺序计算快,您总是有一个选择- When to use parallel stream

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49897078

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档