如何使用Java Stream Collectors(归约器)?

Java 8引入了Stream API,它允许我们以声明的方式处理数据。此外,Stream还可以在不需要编写多线程代码的情况下使用多核架构。

Stream API,将对数据流的所有操作,仅用三个步骤概括全了-过滤、转化、归约。其中,过滤、转化还比较容易理解,但是归约就是一个非常高级的抽象接口了。

规约器定义

归约,就是对中间操作(过滤,转换等)的结果进行收集归一化的步骤,当然也可以对归约结果进行再归约,这就是归约的嵌套了。中间操作不消耗流,归约会消耗流,而且只能消费一次,就像......把流都吃掉了。对于刚接触Stream API的人来说,这样的描述可能太抽象了,请看下面的例子:

public class TestStream {

public static void main(String[] args) {

List<Integer> numbers = Arrays.asList(-3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

int[] sum = numbers.stream()

.filter(i -> i > 0)

.collect(() -> new int[]{0}, (a, b) -> a[0] += b, (s1, s2) -> s1[0] += s2[0]);

System.out.println(sum[0]);

}

}

这是一个简单的先过滤非正整数,然后对剩余元素求和的例子(这个例子纯粹是为了说明Collector原理所写,等你熟悉了Stream,你会有更好的实现方法)。你看,那么长一串数字流最后变成了一个数—被归约了。

Collector接口

collect里面需要传进去的是一个Collector接口,这就是我们今天的主角-归约器了,来看它的源码定义

public interface Collector<T, A, R> {

Supplier<A> supplier();

BiConsumer<A, T> accumulator();

BinaryOperator<A> combiner();

Function<A, R> finisher();

Set<Characteristics> characteristics();

}

Collectors是归约器Collector接口(https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html)的一种实现,它实现各种有用的缩减操作(reduction operations),例如将元素积聚到集合中,根据各种标准汇总元素,等等。

由特殊到一般

我们来从头开始梳理计算从1-9累加和的步骤

1.首先你得提供一个用来接收每一步累加结果的变量,我们用A表示

2.你得确定你的累加变量的初始值是什么。如果我们把计算范围看作一个变量,那这一步就非常有必要了,我

现在给你的计算区间是[1,100),那如果我给你一个[0,0)这样一个区间呢,这个数据流是空的,但你同样要有一个

输出

3.确定你的操作。为了理解这个高级抽象接口的参数意义,我们不得不尽可能把一切都看成可变的,这里这个累加

的操作也是可变的,比如说我想求阶乘了

4.纯逻辑上的抽象已经我们已经做到极致了,但你还可以做的更完美,让我们上升到物理层面上去思考。假如

我想把这个工作分给三台计算机去做,另外一台计算机专门负责收集计算结果。先假设每一台计算机的累加结果

都用A表示?那么负责合并的计算机该怎么把所有的结果A合并起来,这也是个可变的操作

5.想一想还能有什么会是变化的。让我们接着上面的思路,汇总计算机把所有的计算结果都汇总好了,汇总的

结果还是一个A类型的。假设是累加的例子,那么它就是一个int,现在我想要的结果不是一个int了,我想知道

这个值是不是大于5000,那么结果就是一个boolean类型,所以我们还可以抽象出一个结果转换器,来对累加

结果进行转换,转换成我们想要的最终结果

如何使用归约器

为了示例说明怎么样使用Stream对象归约器,让我们先定义一个Employee 类:

class Employee {

private String empId;

private String name;

private Double salary;

private String department;

public Employee(String empId, String name, Double salary, String department) {

this.empId = empId;

this.name = name;

this.salary = salary;

this.department = department;

}

// getters and toString

}

接着,创建一个Employee 类的List对象(Employeeas):

Employee john = new Employee("E123", "John Nhoj", 200.99, "IT");

Employee south = new Employee("E223", "South Htuos", 299.99, "Sales");

Employee reet = new Employee("E133", "Reet Teer", 300.99, "IT");

Employee prateema = new Employee("E143", "Prateema Rai", 300.99, "Benefits");

Employee yogen = new Employee("E323", "Yogen Rai", 200.99, "Sales");

List<Employee> employees = Arrays.asList(john, south, reet, prateema, yogen);

从集合中计算统计值

统计计算平均工资

Double averageSalary = employees.stream().collect(averagingDouble(Employee::getSalary));

// 260.79

有两个很类似的方法 averagingInt(ToIntFunction<? super T> mapper)和 averagingLong(ToLongFunction<? super T> mapper) ,这两个方法可以获取平均值(依照数据类型 Integer 和 Long )。

统计工资总和

Double totalSalary = employees.stream().collect(summingDouble(Employee::getSalary));

// 1303.95

summingInt(ToIntFunction<? super T> mapper) 和summingLong(ToLongFunction<? super T> mapper) 用来汇总数值。

获取最大工资

Double maxSalary = employees.stream().collect(collectingAndThen(maxBy(comparingDouble(Employee::getSalary)), emp -> emp.get().getSalary()));

// 300.99

collectingAndThen 函数声明:

Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream, Function<R,RR> finisher)

Function finisher 可以将归约器输出的最终结果格式化为:

String avgSalary = employees.stream()

.collect(collectingAndThen(averagingDouble(Employee::getSalary), new DecimalFormat("'

0.000")::format));

// $260.790

一次性的计算统计数据

DoubleSummaryStatistics statistics = employees.stream().collect(summarizingDouble(Employee::getSalary));

System.out.println("Average: " + statistics.getAverage() + ", Total: " + statistics.getSum() + ", Max: " + statistics.getMax() + ", Min: "+ statistics.getMin());

// Average: 260.79, Total: 1303.95, Max: 300.99, Min: 200.99

与上面类似, summarizingInt(ToIntFunction<? super T> mapper)和summarizingLong(ToLongFunction<? super T> mapper)分别处理 Integer and Long 数据类型数据。

Mapping and Joining Stream

Mapping Only Employee Names

List<String> employeeNames = employees.stream().collect(mapping(Employee::getName, toList()));

// [John Nhoj, South Htuos, Reet Teer, Prateema Rai, Yogen Rai]

Joining Employee Names

String employeeNamesStr = employees.stream().map(Employee::getName).collect(joining(","));

// John Nhoj,South Htuos,Reet Teer,Prateema Rai,Yogen Rai

The joining() 函数具有重载版本,还有前缀和后缀,如:

Collector<CharSequence,?,String> joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix)

因此,如果您想以特定的格式归集员工姓名,那么您可以:

String employeeNamesStr = employees.stream().map(Employee::getName).collect(joining(", ", "Employees = {", "}"));

// Employees = {John Nhoj, South Htuos, Reet Teer, Prateema Rai, Yogen Rai}

元素分组

员工按部门分组

groupingBy()实现分组的功能:

Collector<T,?,Map<K,List<T>>> groupingBy(Function<? super T,? extends K> classifier)

员工按部门分组的代码:

Map<String, List<Employee>> deptEmps = employees.stream().collect(groupingBy(Employee::getDepartment));

// {Sales=[{empId='E223', name='South Htuos', salary=299.99, department='Sales'}, {empId='E323', name='Yogen Rai', salary=200.99, department='Sales'}], Benefits=[{empId='E143', name='Prateema Rai', salary=300.99, department='Benefits'}], IT=[{empId='E123', name='John Nhoj', salary=200.99, department='IT'}, {empId='E133', name='Reet Teer', salary=300.99, department='IT'}]}

按部门分组统计员工数量

groupingBy() 有一个重载版本:

Collector<T,?,Map<K,List<T>>> groupingBy(Function<? super T,? extends K> classifier,Collector<? super T,A,D> downstream)

因此,每个部门的员工人数应该是:

Map<String, Long> deptEmpsCount = employees.stream().collect(groupingBy(Employee::getDepartment, counting()));

// {Sales=2, Benefits=1, IT=2}

按部门名称计算每个部门的平均工资

groupingBy()方法另外一个重载版本是:

Collector<T,?,M> groupingBy(Function<? super T,? extends K> classifier, Supplier<M> mapFactory, Collector<? super T,A,D> downstream)

TreeMap可用于按部门名称分组,排序如下:

Map<String, Double> averageSalaryDeptSorted = employees.stream().collect(groupingBy(Employee::getDepartment, TreeMap::new, averagingDouble(Employee::getSalary)));

// {Benefits=300.99, IT=250.99, Sales=250.49}

groupBy()方法还一个ConcurrentHashMap(并发)版本,可利用多核架构。

Map<String, Long> deptEmpCount = employees.stream().collect(groupingByConcurrent(Employee::getDepartment, counting()));

// {Sales=2, IT=2, Benefits=1}

Partitioning Elements分类元素

partitionby()使用谓词将结果分割为true以满足谓词条件,false为不满足:

Collector<T,?,Map<Boolean,List<T>>> partitioningBy(Predicate<? super T> predicate)

找到比平均工资高的员工:

Map<Boolean, List<Employee>> portionedEmployees = employees.stream().collect(partitioningBy(e -> e.getSalary() > averageSalary));

// {false=[{empId='E123', name='John Nhoj', salary=200.99, department='IT'}, {empId='E323', name='Yogen Rai', salary=200.99, department='Sales'}],

true=[{empId='E223', name='South Htuos', salary=299.99, department='Sales'}, {empId='E133', name='Reet Teer', salary=300.99, department='IT'}, {empId='E143', name='Prateema Rai', salary=300.99, department='Benefits'}]}

您可以使用该方法的重载版本来过滤结果,如:

Collector<T,?,Map<Boolean,D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T,A,D> downstream)

结论

归约器collector类具有许多实用函数,可以在Stream上操作并有效地提取结果数据。

原文发布于微信公众号 - 程序你好(codinghello)

原文发表时间:2018-07-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

恶心的0.5四舍五入问题

四舍五入是财务类应用中常见的需求,按中国人的财务习惯,遇到0.5统一向上进位,但是c#与java中默认的却不是这样。 见c#代码: 1 stat...

22010
来自专栏码匠的流水账

聊聊storm的WindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

2434
来自专栏数据结构与算法

BZOJ3277: 串(广义后缀自动机)

字符串是oi界常考的问题。现在给定你n个字符串,询问每个字符串有多少子串(不包括空串)是所有n个字符串中

1422
来自专栏HansBug's Lab

3098: Hash Killer II

3098: Hash Killer II Time Limit: 5 Sec  Memory Limit: 128 MBSec  Special Judge S...

2966
来自专栏noteless

[四] java8 函数式编程 收集器浅析 收集器Collector常用方法 运行原理 内部实现

收集器是由四个函数约定构成,它们一起工作,将条目汇集到一个可变的结果容器中,并可选择性地对结果执行最终转换。

2932
来自专栏ml

位运算的方法,大结

Title:       位操作基础篇之位操作全面总结 Author:     MoreWindows E-mail:      morewindows@126...

6368
来自专栏编舟记

命令式到函数式编程

应用场景:当我们用到 if-elseif-else 的时候,可以考虑使用 Optional 语义。 举例说明:

782
来自专栏HansBug's Lab

3402: [Usaco2009 Open]Hide and Seek 捉迷藏

3402: [Usaco2009 Open]Hide and Seek 捉迷藏 Time Limit: 3 Sec  Memory Limit: 128 MB ...

3457
来自专栏小樱的经验随笔

Educational Codeforces Round 21(A.暴力,B.前缀和,C.贪心)

A. Lucky Year time limit per test:1 second memory limit per test:256 megabytes i...

2857
来自专栏HansBug's Lab

2292: 【POJ Challenge 】永远挑战

2292: 【POJ Challenge 】永远挑战 Time Limit: 10 Sec  Memory Limit: 128 MB Submit: 553 ...

3046

扫码关注云+社区

领取腾讯云代金券