专栏首页BAT的乌托邦【小家java】java8新特性之---Stream API 详解 (Map-reduce、Collectors收集器、并行流、groupby多字段分组)

【小家java】java8新特性之---Stream API 详解 (Map-reduce、Collectors收集器、并行流、groupby多字段分组)


我们为什么需要 Stream API

Stream 作为 Java 8 的一大亮点,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。

集合讲的是数据,流讲的是计算

Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。Stream API 借助于同样新出现的 Lambda 表达式,极大的提高编程效率和程序可读性

同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。通常编写并行代码很难而且容易出错, 但使用 Stream API 无需编写一行多线程的代码,就可以很方便地写出高性能的并发程序。所以说,Java 8 中首次出现的 java.util.stream 是一个函数式语言+多核时代综合影响的产物。

浅谈聚合操作(Stream API能协助解决)

在传统的 J2EE 应用中,Java 代码经常不得不依赖于关系型数据库的聚合操作来完成诸如:

  • 客户每月平均消费金额
  • 最昂贵的在售商品
  • 取十个数据样本作为首页推荐

但在当今这个数据大爆炸的时代,在数据来源多样化、数据海量化的今天,很多时候不得不脱离 RDBMS,或者以底层返回的数据为基础进行更上层的数据统计。

这个时候,如果没有Java8提供的Stream API,那简直就是噩梦。在 Java 8 使用 Stream,代码更加简洁易读;而且使用并发模式,程序执行速度更快。

对Stream进一步理解

简单说,对 Stream 的使用就是实现一个 filter-map-reduce 过程,产生一个最终结果,或者导致一个副作用(side effect)。

Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。

对于 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。 Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

Java 的并行 API 演变历程基本如下:

  1. 1.0-1.4 中的 java.lang.Thread
  2. 5.0 中的 java.util.concurrent
  3. 6.0 中的 Phasers 等
  4. 7.0 中的 Fork/Join 框架
  5. 8.0 中的 Stream Stream 的另外一大特点是,数据源本身可以是无限的(即无限流)。

对流的操作概述

流的操作类型分为两种:

  • Intermediate(中间操作):一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。

map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered

  • Terminal(终止操作):一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。

forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

还有一种操作被称为 short-circuiting(短路操作)。用以指:

  • 对于一个 intermediate 操作,如果它接受的是一个无限流,它可以返回一个有限的新 Stream。
  • 对于一个 terminal 操作,如果它接受的是一个无限流,但能在有限的时间计算出结果。

anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit

下面汇总了Stream所有的操作:

IntStream、LongStream、DoubleStream

IntStream、LongStream、DoubleStream。当然我们也可以用 Stream、Stream >、Stream,但是 boxing 和 unboxing 会很耗时,所以特别为这三种基本数值型提供了对应的 Stream。

Java 8 中还没有提供其它数值型 Stream,因为这将导致扩增的内容较多。而常规的数值型聚合运算可以通过上面三种 Stream 进行。

数值流的构造:

IntStream.of(new int[]{1, 2, 3}).forEach(System.out::println);
IntStream.range(1, 3).forEach(System.out::println);
IntStream.rangeClosed(1, 3).forEach(System.out::println);

range,需要传入开始节点和结束节点两个参数,返回的是一个有序的LongStream。包含开始节点和结束节点两个参数之间所有的参数,间隔为1. rangeClosed的功能和range类似。差别就是rangeClosed包含最后的结束节点,range不包含。

进阶:自己生成流(无限流)

  public static<T> Stream<T> generate(Supplier<T> s) {
        Objects.requireNonNull(s);
        return StreamSupport.stream(
                new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
    }

可以自己来控制流的生成。这种情形通常用于随机数、常量的 Stream,或者需要前后元素间维持着某种状态信息的 Stream。把 Supplier 实例传递给 Stream.generate() 生成的 Stream,默认是串行(相对 parallel 而言)但无序的(相对 ordered 而言)。由于它是无限的,在管道中,必须利用 limit 之类的操作限制 Stream 大小。

生成 10 个随机整数:

public static void main(String[] args) {
        Stream.generate(new Random()::nextInt).limit(10).forEach(System.out::println);
        //采用IntStream流的方式(推荐使用 逼格很高)
        IntStream.generate(() -> (int) (System.nanoTime() % 100)).
                limit(10).forEach(System.out::println);
    }

另外一种方式自己生成流(这个非常好用):

public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {}

iterate 跟 reduce 操作很像,接受一个种子值,和一个 UnaryOperator(例如 f)。然后种子值成为 Stream 的第一个元素,f(seed) 为第二个,f(f(seed)) 第三个,以此类推。

如生成一个等差数列:

Stream.iterate(0, n -> n + 3).limit(10).forEach(x -> System.out.print(x + " ")); //0 3 6 9 12 15 18 21 24 27 

与 Stream.generate 相仿,在 iterate 时候管道必须有 limit 这样的操作来限制 Stream 大小。

Stream实操案例
创建流Stream

Java8 中的 Collection 接口被扩展,提供两个获取流的方法 :

  • default Stream stream() : 返回一个顺序流
  • default Stream parallelStream() : 返回一个并行流

由数组创建流 Java8 中的 Arrays 的静态方法 stream() 可以获取数组流 :static Stream stream(T[] array) : 返回一个流 重载形式,能够处理对应基本类型的数组IntStream/LongStream/DoubleStream :

由值创建流 可以使用静态方法 Stream.of(), 通过显示值创建一个流,它可以接收任意数量的参数:public static Stream of(T… values) : 返回一个流

由方法创建流 : 创建无限流 可以使用静态方法 Stream.iterate() 和 Stream.generate(), 创建无限流 迭代 public static Stream iterate(final T seed, final UnaryOperator f) 生成 public static Stream generate(Supplier s)

Stream的中间操作实操

多个中间操作可以连接起来形成一个流水线,除非流水线上触发终止操作,否则中间操作不会执行任何的处理。而在终止操作时一次性全部处理,称为**“惰性求值”**

  • 筛选与切片系列

方法

描述

filter(Predicate p)

接收 Lambda , 从流中排除某些元素(true表示通过,false表示被过滤掉了)

distinct()

筛选,通过流所生成元素的 hashCode() 和 equals() 去除重复元素

limit(long maxSize)

截断流,使其元素不超过给定数量

peek(Consumer action)

生成一个包含原Stream的所有元素的新Stream,同时会提供一个消费函数(Consumer实例),新Stream每个元素被消费的时候都会执行给定的消费函数;

S unordered()

属于BaseStream的一个方法。使用较少。unordered操作不会进行任何显式的打乱流的操作(后面会有例子)。它的工作是:消除流中必须保持的有序约束,因此允许之后的操作使用 不必考虑有序的优化。

skip(long n)

跳过元素,返回一个扔掉了前 n 个元素的流。若流中元素不足 n 个,则返回一个空流。与 limit(n) 互补

emps.parallelStream().filter((e) -> e.getSalary() >= 5000).skip(2).forEach(System.out::println);

这块相对比较简单,就一笔带过了。 peek方法用得比较少,这里特殊介绍一下:

Stream.of("one", "two", "three", "four").peek(e -> System.out.println(e));
输出:这样不会有任何的输出;

Stream.of("one", "two", "three", "four").peek(e -> System.out.println(e)).collect(Collectors.toList());
输出:
one
two
three
four

Stream.of("one", "two", "three", "four")
    .peek(e -> System.out.println("Peeked value: " + e))
    .map(String::toUpperCase)
    .peek(e -> System.out.println("Mapped value: " + e))
    .collect(Collectors.toList());
输出:
Peeked value: one
Mapped value: ONE
Peeked value: two
Mapped value: TWO
Peeked value: three
Mapped value: THREE
Peeked value: four
Mapped value: FOUR

这个说白了。当元素被消费的时候,就会触发peek。有多少个就触发多少次。 unordered的使用案例:

public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        list.stream().forEach(System.out::print); //123456789

        System.out.println();

        //使用unordered之后输出
        list.stream().unordered().forEach(System.out::print); //123456789
    }

我们会发现,输出的顺序没有改变。所以它并不是来打乱这个顺序的。所以大家使用的时候不要误解了。正确的使用 姿势:

//使stream无序:对于 distinct() 和 limit() 等方法,如果不关心顺序,则可以使用并行:

LongStream.rangeClosed(5, 10).unordered().parallel().limit(3);
IntStream.of(14, 15, 15, 14, 12, 81).unordered().parallel().distinct();

这样使用,能提高CPU的利用率,进而提高处理的效率
  • 映射系列

方法 | 描述

  • | :-: | -: map(Function f) | 接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素

mapToDouble(ToDoubleFunction f)| 同上 mapToInt(ToIntFunction f)| 同上 mapToLong(ToLongFunction f)| 同上 flatMap(Function f)| 接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流

  public static void main(String[] args) {
        final long count = 10;
        List<Long> list = new ArrayList<>();
        for (long i = 0; i < count; i++) {
            list.add(i);
        }

        //使用mapToLong来处理
        list.stream().mapToLong(x -> x + 10).forEach(System.out::println);
    }

普通的Map映射相对来说比较简单,因此这里也先一笔带过了。现在重点讲解一下flatMap的使用和场景:

//给定一个需求:给定一个列表{"aaa","bbb","ddd","eee","ccc"}。需要在控制台直接输出aaabbbdddeeeccc字样

看如下代码实现,对比下map和flatMap的区别

 public static void main(String[] args) {
        List<String> list = Arrays.asList("aaa", "bbb", "ddd", "eee", "ccc");
        //采用map来做(这里采用了两次forEach循环进行输出,显然不太优雅)
        list.stream().map(x -> {
            List<Character> characterList = new ArrayList<>();
            char[] chars = x.toCharArray();
            for (char c : chars) {
                characterList.add(c);
            }
            return characterList.stream();
        }).forEach(xStream -> xStream.forEach(System.out::print)); //aaabbbdddeeeccc

        //采用flatMap来做  体会一下flatMap的魅力吧
        list.stream().flatMap(x -> {
            List<Character> characterList = new ArrayList<>();
            char[] chars = x.toCharArray();
            for (char c : chars) {
                characterList.add(c);
            }
            return characterList.stream();
        }).forEach(System.out::print); //aaabbbdddeeeccc

    }

再看一个例子

//给定一个需求:给定单词列表["Hello","World"],要返回列表["H","e","l", "o","W","r","d"]

对于这样的需求,我们可能想到的第一个版本可能是这样子的:

  public static void main(String[] args) {
        List<String> list = Arrays.asList("hello", "world");
        List<String[]> collect = list.stream().map(word -> word.split(""))
                .distinct()
                .collect(Collectors.toList());
    }

不用输出结果,一看返回值的结构就肯定不是我们想要的结果. 这个方法的问题在于,传递给map方法的Lambda为每个单词返回了一个String[](String列表)。因此, map 返回的流实际上是Stream<String[]> 类型的。你真正想要的是用Stream来表示一个字符流。因此,这是行不通的。 正确的姿势:

 public static void main(String[] args) {
        List<String> list = Arrays.asList("hello", "world");
        list.stream().flatMap(x -> Arrays.stream(x.split("")))
                .distinct().forEach(System.out::print); //helowrd
    }

其实map和flatMap的差别特别像List的add方法和addAll方法的差异,可参照理解一下,看下面这个例子

  public static void main(String[] args) {
        List list = new ArrayList();
        list.add(1);
        list.add(2);

        List list1 = new ArrayList();
        list1.add(3);
        list1.add(4);

        //注意add和addAll输出的区别
        //list.add(list1);
        //System.out.println(list); //[1, 2, [3, 4]]

        list.addAll(list1);
        System.out.println(list); //[1, 2, 3, 4]

    }
  • 排序 方法 | 描述
  • | :-: | -: sorted() | 产生一个新流,其中按自然顺序排序 sorted(Comparator comp) | 产生一个新流,其中按比较器顺序排序 这个比较简单,这里就不举例子了
Stream的终止操作

终端操作会从流的流水线生成结果,其结果可以是任何不是流的值,例如 : List、 Integer,甚至是 void

  • 查找与匹配 方法 | 描述
  • | :-: | -: allMatch(Predicate p) | 检查是否匹配所有元素 anyMatch(Predicate p) | 检查是否至少匹配一个元素 noneMatch(Predicate p) | 检查是否没有匹配所有元素 findFirst() | 返回第一个元素 findAny() | 返回当前流中的任意元素 count() | 返回流中元素总数 max(Comparator c) | 返回流中最大值 min(Comparator c) | 返回流中最小值 forEach(Consumer c) | 内部迭代(使用 Collection 接口需要用户去做迭代,称为外部迭代。相反, Stream API 使用内部迭代) forEachOrdered(Consumer c) | 基本同forEach,后面会有示例比较 toArray() toArray(IntFunction g) | 这个使用起来和List的toArray差不多

其余方法使用起来都比较简单,下面通过一个案例对比foreach等:

 public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
        //因为并行 所以输出完全无序
        list.stream().parallel().forEach(x -> System.out.print(x)); //65387421
        System.out.println();
        //使用了forEachOrdered 所以是顺序输出的 即使你是并行流也是顺序的
        list.stream().parallel().forEachOrdered(x -> System.out.print(x)); //12345678
        System.out.println();
        //使用了toList,然后其实也是顺序输出了  内部原理同forEachOrdered(可当面试题哟)
        List<Integer> collect = list.stream().parallel().collect(Collectors.toList());
        System.out.println(collect); //[1, 2, 3, 4, 5, 6, 7, 8]
    }

除了使用forEachOrdered保证顺序外,Collectors.toList()也可以保证顺序,二都最终都是通过ForEachOrderedTask类来实现的,具体可以参看ForEachOp.ForEachOrderedTask类中的代码。

注 : 流进行了终止操作后,不能再次使用

归约:

方法

描述

reduce(T iden, BinaryOperator b)

可以将流中元素反复结合起来,得到一个值,返回 T

reduce(BinaryOperator b)

可以将流中元素反复结合起来,得到一个值,返回 Optional

reduce(U identity, BiFunction a, BinaryOperator combiner)

可以将流中元素反复结合起来,得到一个值,返回 Optional

注 : map 和 reduce 的连接通常称为 map-reduce 模式,因 Google 用它来进行网络搜索而出名

reduce是很重要的一种变成思想。这里重点介绍一下。reduce的作用是把stream中的元素给组合起来。至于怎么组合起来:

  • 它需要我们首先提供一个起始种子,然后依照某种运算规则使其与stream的第一个元素发生关系产生一个新的种子,这个新的种子再紧接着与stream的第二个元素发生关系产生又一个新的种子,就这样依次递归执行,最后产生的结果就是reduce的最终产出,这就是reduce的算法最通俗的描述;

所以运用reduce我们可以做sum,min,max,average,所以这些我们称之为针对具体应用场景的reduce,这些常用的reduce,stream api已经为我们封装了对应的方法。

//求和 sum
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);

        // 没有起始值时返回为Optional类型
        Optional<Integer> sumOptional = integers.stream().reduce(Integer::sum);
        System.out.println(sumOptional.get()); //15

        // 可以给一个起始种子值
        Integer sumReduce = integers.stream().reduce(0, Integer::sum);
        System.out.println(sumReduce); //15

        //直接用sum方法
        Integer sum = integers.stream().mapToInt(i -> i).sum();
        System.out.println(sum); //15

重点说说三个参数的Reduce 三个参数时是最难以理解的。 分析下它的三个参数:

  • identity: 一个初始化的值;这个初始化的值其类型是泛型U,与Reduce方法返回的类型一致;注意此时Stream中元素的类型是T,与U可以不一样也可以一样,这样的话操作空间就大了;不管Stream中存储的元素是什么类型,U都可以是任何类型,如U可以是一些基本数据类型的包装类型Integer、Long等;或者是String,又或者是一些集合类型ArrayList等;后面会说到这些用法。
  • accumulator: 其类型是BiFunction,输入是U与T两个类型的数据,而返回的是U类型;也就是说返回的类型与输入的第一个参数类型是一样的,而输入的第二个参数类型与Stream中元素类型是一样的
  • combiner: 其类型是BinaryOperator,支持的是对U类型的对象进行操作

第三个参数combiner主要是使用在并行计算的场景下;如果Stream是非并行时,第三个参数实际上是不生效的。 因此针对这个方法的分析需要分并行与非并行两个场景。

就是因为U和T不一样,所以给了我们更多的发挥。比如设U的类型是ArrayList,那么可以将Stream中所有元素添加到ArrayList中再返回了,如下示例:

public static void main(String[] args) {
        ArrayList<String> result = Stream.of("aa", "ab", "c", "ad").reduce(new ArrayList<>(),
                (u, s) -> {
                    u.add(s);
                    return u;
                }, (strings, strings2) -> strings);
        System.out.println(result); //[aa, ab, c, ad]
    }

注意由于是非并行的,第三个参数实际上没有什么意义,可以指定r1或者r2为其返回值,甚至可以指定null为返回值。下面看看并行的情况:

当Stream是并行时,第三个参数就有意义了,它会将不同线程计算的结果调用combiner做汇总后返回。注意由于采用了并行计算,前两个参数与非并行时也有了差异! 看个例子:

  public static void main(String[] args) {
        Integer reduce = Stream.of(1, 2, 3).parallel().reduce(
                 4,
                (integer, integer2) -> integer + integer2,
                (integer, integer2) -> integer + integer2);
        System.out.println(reduce); //18
    }
输出:18

omg,结果竟然是18。显然串行的话结果是10;这个不太好理解,但是我下面写一个等价的方式,可以帮助很好的理解这个结果:

 public static void main(String[] args) {
        Optional<Integer> reduce = Stream.of(1, 2, 3).map(n -> n + 4).reduce((s1, s2) -> s1 + s2);
        System.out.println(reduce.get()); //18
    }

这种方式有助于理解并行三个参数时的场景,实际上就是第一步使用accumulator进行转换(它的两个输入参数一个是identity, 一个是序列中的每一个元素),由N个元素得到N个结果;第二步是使用combiner对第一步的N个结果做汇总。

好了,三个参数的reduce先介绍到这。下面继续看看reduce能为我们做什么?

public static void main(String[] args) {
        //构造字符串流
        List<String> strs = Arrays.asList("H", "E", "L", "L", "O");
        // reduce
        String concatReduce = strs.stream().reduce("", String::concat);
        System.out.println(concatReduce); //HELLO

        Stream<Integer> integerStream = Stream.of(1, 2, 3, 4, 5);
        Integer minReduce = integerStream.reduce(Integer.MAX_VALUE, Integer::min);
        System.out.println(minReduce); //1

    }
  • 收集(collect),很多时候和reduce很像,但collect更加强大 方法 | 描述
  • | :-: | -: collect(Collector c) | 将流转换为其他形式。接收一个 Collector接口的实现,用于给Stream中元素做汇总的方法

Collectors里常用搜集器介绍:

现在抽取一些不太常用,稍微不太好理解的一些拿来讲一下: toMap: 若要线程安全的Map,用**toConcurrentMapgroupingByConcurrent** 如果生成一个Map,我们需要调用toMap方法。由于Map中有Key和Value这两个值,故该方法与toSet、toList等的处理方式是不一样的。toMap最少应接受两个参数,一个用来生成key,另外一个用来生成value。toMap方法有三种变形:

注:使用Collectors.toMap方法时的两个问题: 1、当key重复时,会抛出异常:java.lang.IllegalStateException: Duplicate key ** 2、当value为null时,会抛出异常:java.lang.NullPointerException

  1. toMap(Function keyMapper,Function valueMapper) keyMapper: 该Funtion用来生成Key valueMapper:该Funtion用来生成value
  public static void main(String[] args) {
        //使用toMap两个参数的(最常用的) 但遇上相同key和null的value都会抛出异常
        //List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        //Map<Integer, Integer> map = list.stream().collect(Collectors.toMap(k -> k, v -> v));
        //System.out.println(map); //{1=1, 2=2, 3=3, 4=4, 5=5}

        //里面放重复的key
        //List<Integer> list = Arrays.asList(1, 2, 3, 4, 1);
        //list.stream().collect(Collectors.toMap(k -> k, v -> v)); //java.lang.IllegalStateException: Duplicate key 1
        //对于里面有重复key的情况,采用三个参数的toMap进行改良
        // java8处理思路:即 两个key相同时 只能有一个key存在,那对应的value如何处理?  value交由我们自己处理
        //Map<Integer, Integer> map = list.stream().collect(Collectors.toMap(k -> k, v -> v, (oldV, newV) -> oldV + newV + 10));
        //System.out.println(map); //{1=12, 2=2, 3=3, 4=4}


        //里面放null值
        //List<Integer> list = Arrays.asList(1, 2, 3, 4, null);
        //list.stream().collect(Collectors.toMap(x -> x, y -> y)); //java.lang.NullPointerException


        //最后 四个参数的toMap 提供了mergeFunction和mapSupplier 调用者可以自定义希望返回什么类型的Map
        List<Integer> list = Arrays.asList(1, 2, 1, 4);
        HashMap<Integer, Integer> map = list.stream().collect(Collectors.toMap(
                k -> k,
                v -> v,
                (oldV, newV) -> oldV + newV + 10,
                HashMap::new)
        );
        System.out.println(map); //{1=12, 2=2, 4=4}

    }

我们常常遇到要把List转成Map的现象。并且要求保证List的顺序,那么此时我们必须使用LinedHashMap,这点特别重要,处理方式如下:

Map<Integer, NormalPeriodResponse> tmpNormalPeriodMap = tmpNormalPeriods.stream()
                .collect(toMap(t -> t.getId(), Function.identity(), (k1, k2) -> k1, LinkedHashMap::new));
public static void main(String[] args) {
        //这个summarizing 算是一个比较整合的搜集
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        IntSummaryStatistics summary = list.stream().collect(Collectors.summarizingInt(x -> x));
        System.out.println(summary.getCount()); //5
        System.out.println(summary.getAverage()); //3.0
        System.out.println(summary.getSum()); //15

    }
public static void main(String[] args) {
        //连接流中的字符串  可以指定连接符、首位符等
        List<String> list = Arrays.asList("aa", "bb", "cc", "dd");
        String str = list.stream().collect(Collectors.joining(",", "==>", "<=="));
        System.out.println(str); //==>aa,bb,cc,dd<==
    }

当使用maxBy、minBy统计最值时,结果会封装在Optional中。有时候明明我们知道不可能为null,那这个时候我们优雅的处理的方式可以采用collectingAndThen函数包裹maxBy、minBy,从而将maxBy、minBy返回的Optional对象进行转换

 public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        Optional<Integer> max = list.stream().collect(Collectors.maxBy(Integer::compare));
        Integer maxAndThen = list.stream().collect(Collectors.collectingAndThen(Collectors.maxBy(Integer::compare), Optional::get));
        System.out.println(max); //Optional[5]
        System.out.println(maxAndThen); //5
    }

备注:groupBy搜集也是用得非常对的,并且可以无限的分组下去。这里需要注意一点groupingByConcurrent的使用方式。他和groupBy的区别就是,它返回的是ConcurrentMap,而普通的就是返回的Map,需要注意区别,这里不做演示了。

多字段分组案例

此处为我后续新增内容,因为很多同学问我多字段怎么groupby,其实非常简单哈。看一下API就能知道怎么处理

    public static <T, K, A, D>
    Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                          Collector<? super T, A, D> downstream) {
        return groupingBy(classifier, HashMap::new, downstream);
    }

给个栗子:多字段分组

Map<String, Map<String, List<Article>>> result = articles.stream()
                .collect(Collectors.groupingBy(Article::getCountryCode
                ,Collectors.groupingBy(Article::getProvince)));

给个栗子:分组统计

 //统计每个应用实际支付总额
Map<Long, Long> tradeAmountMap = list.stream().filter(o->o.getStatus()==2)
                .collect(Collectors.groupingBy(OrdersDO::getAppId
                //downstream其实可以做任何搜集的作用
                ,Collectors.summingLong(OrdersDO::getTradeAmount)));

collectingAndThen可用于很多实例,进行持续操作。比如先根据某属性去重,然后再收集等等

分区:partitioningBy

partitioningBy(Predicate predicate)
partitioningBy(Predicate predicate,Collector downstream)

分区是分组的一种特殊情况,它只能分成true、false两组。 下面这个实例:其实就是数据在手上,可以各种玩

public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 5, 5, 6, 9);
        Map<Boolean, List<Integer>> partition = list.stream().collect(Collectors.partitioningBy(x -> x >= 4));
        System.out.println(partition); //{false=[1, 2, 3], true=[4, 5, 5, 5, 6, 9]}

        Map<Boolean, Map<Boolean, List<Integer>>> partition2 = list.stream().collect(Collectors.partitioningBy(x -> x >= 4, Collectors.partitioningBy(x -> x > 6)));
        System.out.println(partition2); //{false={false=[1, 2, 3], true=[]}, true={false=[4, 5, 5, 5, 6], true=[9]}}

        //也可以结合groupBy搞
        Map<Boolean, Map<Integer, List<Integer>>> collect = list.stream().collect(Collectors.partitioningBy(x -> x >= 4, Collectors.groupingBy(x -> x)));
        System.out.println(collect); //{false={1=[1], 2=[2], 3=[3]}, true={4=[4], 5=[5, 5, 5], 6=[6], 9=[9]}}
    }
mapping :跟map操作类似
 String str = Stream.of("a", "b", "c").collect(Collectors.mapping(x -> x.toUpperCase(), Collectors.joining(",")));
        System.out.println(str); //A,B,C

它的源码声明如下:

mapping(Function<? super T, ? extends U> mapper,
                               Collector<? super U, A, R> downstream)

下面介绍另外一个需求:比如我要分组,可以在分组的时候把这个对象转换成另外一个对象(比如此例中我们要求把String对象转换成Integer对象,hashCode分组 或者toMap 做法一样的) 常规做法:是先map成另外一个对象,再分组 这也是ok的

List<String> list = Arrays.asList("1", "2", "3");

        //另外一个比较实用的场景   比如我要分组,可以在分组的时候把这个对象转换成另外一个对象
        Map<Integer, List<Integer>> hashCodeMap = list.stream().map(x -> Integer.valueOf(x)).collect(groupingBy(x -> x.hashCode()));
        System.out.println(hashCodeMap); //{1=[1], 2=[2], 3=[3]}

现在我们有了mapping,可以更加优雅的处理如下

List<String> list = Arrays.asList("1", "2", "3");
        Map<Integer, List<Integer>> collect = list.stream().collect(groupingBy(x -> x.hashCode(), mapping(x -> Integer.parseInt(x), toList())));
        System.out.println(collect); //{1=[1], 2=[2], 3=[3]}

生成统计信息(IntSummaryStatistics、DoubleSummaryStatistics等)

另一组非常有用的收集器是用来产生统计信息的收集器。这能够在像int、double和long这样的原始数据类型上起到作用;并且能被用来生成像下面这样的统计信息。

IntSummaryStatistics summaryStatistics = tasks.stream().map(Task::getTitle).collect(summarizingInt(String::length));
System.out.println(summaryStatistics.getAverage()); //32.4
System.out.println(summaryStatistics.getCount()); //5
System.out.println(summaryStatistics.getMax()); //44
System.out.println(summaryStatistics.getMin()); //24
System.out.println(summaryStatistics.getSum()); //162

也有其它的变种形式,像针对其它原生类型的LongSummaryStatistics和DoubleSummaryStatistics。

    public static void main(String[] args) {
        IntSummaryStatistics statistics1 = new IntSummaryStatistics();
        IntSummaryStatistics statistics2 = new IntSummaryStatistics();
        statistics2.combine(statistics1);
    }

你也可以通过使用combine操作来将一个IntSummaryStatistics与另一个组合起来(必须是同一类型哦)。

    public static void main(String[] args) {
        IntSummaryStatistics statistics1 = new IntSummaryStatistics();
        statistics1.accept(10);
        System.out.println(statistics1.getSum()); //10
        System.out.println(statistics1.getCount()); //1

        IntSummaryStatistics statistics2 = new IntSummaryStatistics();
        statistics2.accept(20);

        statistics1.combine(statistics2);
        System.out.println(statistics1.getSum()); //30
        System.out.println(statistics1.getCount()); //2

    }

介绍几个Stream的静态方法

of 这个方法不说了

需要注意的是,不能全是null,否则报错。这个在JDK9做了改善

empty 构造一个空流
public static void main(String[] args) {
        List<Integer> list = Stream.<Integer>empty().collect(toList());
        System.out.println(list); //[]
    }

由此课件,流生成的集合,都是不会为null的

iterate、generate 这两个上面已经介绍了,很好用

concat

顾名思义,就是拼接流。这个在很多场合比较实用。比如要合并提取两个或者更多的List集合的时候,就没必要先合并集合,再处理流了,可以一步到位,并且效率很高。

    public static void main(String[] args) {
        List<Integer> list1 = Arrays.asList(1,2,3);
        List<Integer> list2 = Arrays.asList(4,3,2);
        Stream.concat(list1.stream(),list2.stream()).forEach(System.out::print);

    }

并行流(ParallelStream)

首先简单的介绍下Fork/Join 框架(JDK1.7后提出) Fork/Join 框架与传统线程池的区别: 采用 “工作窃取”模式 (work-stealing) : 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中

相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。或者当线程任务完成速度快,就会随机抽取其它未完成任务的进程中的最后一个任务进行计算操作。这种方式减少了线程的等待时间,提高了性能

  • 普通 for(最慢,数据量越大CPU使用率低,速度越慢)
    • 备注:如果数据量较小,它还是蛮快的,毕竟for循环是偏底层的代码
  • ForkJoin框架(比较快) 但任务拆分的代码门槛有点高,使用起来过于复杂
  • Java8 并行流(底层使用ForkJoin框架,速度最快 CPU使用率可以达到 100%)

所以,如果是大任务(小任务并行流没有任何效果反而可能还会慢一些),极力推荐使用并行流处理大数量的计算。比如从1加到1000亿的和这种,或者类似的更加耗时的操作(比如多次访问库等等)

Stream的执行原理

Stream的执行原理过于复杂,本文不做过多讨论,请关注后续博文

结束语

Stream 的特性可以归纳为:

  • 不是数据结构
  • 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。
  • 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。
  • 所有 Stream 的操作必须以 lambda 表达式为参数。
  • 不支持索引访问
  • 你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。
  • 惰性化(惰性求值)操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始
  • 并行能力(当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的)
  • 可以是无限的。集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。
  • 知识交流

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【小家java】聊聊Java中的java.util.Arrays类和java.util.Collections工具类

    有很多开发了很多年的人,只使用过它的asList方法去快速构建一个List,但其实它是非常强大的,可以很大程度上简化我们操作数组的方式。

    YourBatman
  • 【小家java】大杂烩---那些年我们一起躺过的坑

    此篇博文没有具体的主题,主要针对于平时开发过程中遇到的一些小问题的记录,并且大都从源码的角度去解释为什么会报错。并且此篇博文是持续更新中。。。

    YourBatman
  • 【小家java】Java中集合List、Set、Map删除元素的方法大总结(避免ConcurrentModificationException异常)

    java中的集合框架是我们日常使用得最多的数据结构,而List作为Collection里最重要的一员,使用就更加的频繁了。因此我们平时使用中少不了对List的增...

    YourBatman
  • VR+全景播放器+头控讲解-01

    近两年随着AR/VR逐渐火热,企业为了给自己的产品中加入新的元素,有可能会将3D元素添加到应用中去,对于IOS 工程师,你有三种选择 OpenGL ES / M...

    酷走天涯
  • Ajax与Python服务器交互,在网页

    然后就是通过点击按钮事件获取输入的内容;(用到了Ajax与服务器交互;它会自动封装请求行,请求头,空格,我们只需要把内容send就可以了)

    py3study
  • 【程序源代码】.Net Core快速权限工作流系统

    .Net Core快速应用开发框架、最好用的权限工作流系统. 基于经典领域驱动设计的权限管理及快速开发框架,源于Martin Fowler企业级应用开发思想及最...

    程序源代码
  • 《程序人生》系列-害敖丙差点被开除的P0事故

    这是帅丙真实事件,大家都知道很多公司都是有故障等级这么一说的,这就是敖丙在公司背的P0级故障,敖丙差点因此被解雇,事情经过十分惊心动魄,我的心脏病都差点复发。

    敖丙
  • Mysql Dual Master双主复制架构

    常用的复制方式是一主一从的基本架构,但有时可能还会需要在一些特定的场景下进行Master的切换 如在Master端进行一些维护操作时,可能要停止MySQL的服务...

    dys
  • Spring Security 中如何细化权限粒度?

    有小伙伴表示微人事(https://github.com/lenve/vhr)的权限粒度不够细。不过松哥想说的是,技术都是相通的,明白了 vhr 中权限管理的原...

    江南一点雨
  • 第一章:Maven环境下如何配置QueryDSL环境

    恒宇少年

扫码关注云+社区

领取腾讯云代金券