Java 8 Stream 教程 (三)

作者:Benjamin

译者:java达人

来源:http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/(点击阅读原文前往)

前面的教程:

Java 8 Stream 教程 (一)

Java 8 Stream 教程 (二)

并行stream

为增强大数据量下的运行性能,stream可以并行执行。并行stream通过静态方法ForkJoinPool.commonPool()使用ForkJoinPool。底层线程池的大小最多5个线程—这取决于可用物理CPU核的数量:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

在我的机器上, common pool 初始化为3 默认值。通过设置以下JVM参数,可以调整该值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

集合支持方法parallelStream()创建一个并行的元素stream。或者,您可以在给定的stream上调用中间方法parallel(),将顺序stream转换为并行的stream。

为了理解并行stream的并行执行行为,下一个示例将打印当前线程的信息:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

通过观察调试输出,我们应该能更好地理解哪些线程被用于执行stream操作:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

正如您所见,并行stream利用了ForkJoinPoolfor执行stream操作的所有可用线程。由于特定线程实际的行为是不确定性的,所以输出在多次运行时可能会有所不同。

让我们通过额外的stream操作来扩展示例,sorted:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

乍一看,结果可能会很奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

sort似乎只在主线程上顺序执行。实际上,在并行stream上,sort底层使用新的Java 8方法Arrays.parallelSort()。如Javadoc中所述,sort顺序还是并行执行,取决于数组的长度:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上一节中reduce的例子。我们已经发现, combiner函数只在并行stream中调用,而不是在顺序stream中调用。我们看看哪些线程用到了:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));persons    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

控制台输出显示 accumulatorcombiner函数在所有可用线程上并行执行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

综上所述,可以得出并行stream在大数据量下会带来不错的性能提升。但是请记住,一些并行stream操作(如reduce和collect)需要额外的计算(合并操作),而这些操作在顺序执行时是不需要的。

此外,我们了解到所有的并行stream操作都共享同一个jvm范围的ForkJoinPool。因此,您要避免慢阻塞stream操作,因为这可能会减慢应用程序中重度依赖于并行stream的部分。

结尾

我的Java 8 stream编程指南在这里完结了。如果您有兴趣了解更多关于Java 8 stream的知识,我向您推荐Stream Javadoc文档。如果您想了解更多关于底层机制的知识,可以阅读Martin fowler关于Collection Pipelines的文章。

如果您对JavaScript感兴趣,可以看看Stream.js —Java 8 Streams API的一个JavaScript实现。您还可以阅读我的Java 8 Tutorial 和 Java 8 Nashorn Tutorial.

希望本教程对您有所帮助,您喜欢阅读。本教程示例的完整源代码托管在GitHub上。你可以免费fork,或者通过Twitter向我发送你的反馈。

编程愉快!

相关链接:

Java 8 Tutorial 和 Java 8 Nashorn Tutorial

http://winterbe.com/posts/2014/03/16/java-8-tutorial/

http://winterbe.com/posts/2014/04/05/java8-nashorn-tutorial/

github源码:

https://github.com/login?return_to=https%3A%2F%2Fgithub.com%2Fwinterbe%2Fjava8-tutorial%2Ffork

原文发布于微信公众号 - java达人(drjava)

原文发表时间:2018-01-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏angularejs学习篇

js中对arry数组的各种操作小结

  最近工作比较轻松,于是就花时间从头到尾的对js进行了详细的学习和复习,在看书的过程中,发现自己平时在做项目的过程中有很多地方想得不过全面,写的不够合理,所以...

24720
来自专栏noteless

原型模式 prototype 创建型 设计模式(七)

用原型实例指定需要创建的对象的类型,然后使用复制这个原型对象的方法创建出更多同类型的对象

11420
来自专栏zaking's

用js来实现那些数据结构09(集合01-集合的实现)

  说到集合,第一个想到的就是中学学到的那个数学概念:集合。在我们开始集合相关的js实现前,我们有必要来了解一下什么是集合以及集合的数学概念。   好吧,我们一...

378100
来自专栏北京马哥教育

Python编程中的反模式

云豆贴心提醒,本文阅读时间7分钟 这篇文章收集了我在Python新手开发者写的代码中所见到的不规范但偶尔又很微妙的问题。 本文的目的是为了帮助那些新手开发者渡...

35970
来自专栏Crossin的编程教室

【Python 第17课】 类型转换

昨天又被微信后台给坑了,导致有些同学收了2遍消息。希望今天能顺利发成功。。。 #==== 类型转换 ====# python的几种最基本的数据类型,我们已经...

29360
来自专栏鸿的学习笔记

Python中__str__和__repr__方法的区别

在stackoverflow有一个很精辟的回答解释这两个内置方法的区别,简单来说,__str__方法是表现给使用者使用的,而__repr__方法是给程序员用的...

11320
来自专栏大数据和云计算技术

#算法基础#选择和插入排序

算法是基础,小蓝同学准备些总结一系列算法分享给大家,这是第二篇《选择和插入排序》,非常赞!希望对大家有帮助,大家会喜欢! 系列文章: 由快速排序到分治思想 ...

35060
来自专栏微信公众号:Java团长

JAVA之旅(一)——基本常识,JAVA概念,开发工具,关键字/标识符,变量/常量,进制/进制转换,运算符,三元运算

比如6:6/2 = 3 余 0 3 / 2 = 1 余 1 那就是从个位数开始011,读起来就是110了

19310
来自专栏Crossin的编程教室

【Python 第56课】 正则表达式(2)

有同学问起昨天那段测试代码里的问题,我来简单说一下。 1. r"hi" 这里字符串前面加了r,是raw的意思,它表示对字符串不进行转义。为什么要加这个?你可以试...

28560
来自专栏编程

Python精华之函数

各位小伙伴,大家周一快乐! 不知道刚刚过去的周末大家过的怎么样? 反正常老师是被糊里糊涂的过了个圣诞节 满大街的商场和超市都张灯结彩,话说,不是有规定不让过这种...

22350

扫码关注云+社区

领取腾讯云代金券