Java 8引入了Stream API,它允许我们以声明的方式处理数据。此外,Stream还可以在不需要编写多线程代码的情况下使用多核架构。
Stream API,将对数据流的所有操作,仅用三个步骤概括全了-过滤、转化、归约。其中,过滤、转化还比较容易理解,但是归约就是一个非常高级的抽象接口了。
归约,就是对中间操作(过滤,转换等)的结果进行收集归一化的步骤,当然也可以对归约结果进行再归约,这就是归约的嵌套了。中间操作不消耗流,归约会消耗流,而且只能消费一次,就像......把流都吃掉了。对于刚接触Stream API的人来说,这样的描述可能太抽象了,请看下面的例子:
public class TestStream {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(-3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
int[] sum = numbers.stream()
.filter(i -> i > 0)
.collect(() -> new int[]{0}, (a, b) -> a[0] += b, (s1, s2) -> s1[0] += s2[0]);
System.out.println(sum[0]);
}
}
这是一个简单的先过滤非正整数,然后对剩余元素求和的例子(这个例子纯粹是为了说明Collector原理所写,等你熟悉了Stream,你会有更好的实现方法)。你看,那么长一串数字流最后变成了一个数—被归约了。
collect里面需要传进去的是一个Collector接口,这就是我们今天的主角-归约器了,来看它的源码定义
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Characteristics> characteristics();
}
Collectors是归约器Collector接口(https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html)的一种实现,它实现各种有用的缩减操作(reduction operations),例如将元素积聚到集合中,根据各种标准汇总元素,等等。
由特殊到一般
我们来从头开始梳理计算从1-9累加和的步骤
1.首先你得提供一个用来接收每一步累加结果的变量,我们用A表示
2.你得确定你的累加变量的初始值是什么。如果我们把计算范围看作一个变量,那这一步就非常有必要了,我
现在给你的计算区间是[1,100),那如果我给你一个[0,0)这样一个区间呢,这个数据流是空的,但你同样要有一个
输出
3.确定你的操作。为了理解这个高级抽象接口的参数意义,我们不得不尽可能把一切都看成可变的,这里这个累加
的操作也是可变的,比如说我想求阶乘了
4.纯逻辑上的抽象已经我们已经做到极致了,但你还可以做的更完美,让我们上升到物理层面上去思考。假如
我想把这个工作分给三台计算机去做,另外一台计算机专门负责收集计算结果。先假设每一台计算机的累加结果
都用A表示?那么负责合并的计算机该怎么把所有的结果A合并起来,这也是个可变的操作
5.想一想还能有什么会是变化的。让我们接着上面的思路,汇总计算机把所有的计算结果都汇总好了,汇总的
结果还是一个A类型的。假设是累加的例子,那么它就是一个int,现在我想要的结果不是一个int了,我想知道
这个值是不是大于5000,那么结果就是一个boolean类型,所以我们还可以抽象出一个结果转换器,来对累加
结果进行转换,转换成我们想要的最终结果
为了示例说明怎么样使用Stream对象归约器,让我们先定义一个Employee 类:
class Employee {
private String empId;
private String name;
private Double salary;
private String department;
public Employee(String empId, String name, Double salary, String department) {
this.empId = empId;
this.name = name;
this.salary = salary;
this.department = department;
}
// getters and toString
}
接着,创建一个Employee 类的List对象(Employeeas):
Employee john = new Employee("E123", "John Nhoj", 200.99, "IT");
Employee south = new Employee("E223", "South Htuos", 299.99, "Sales");
Employee reet = new Employee("E133", "Reet Teer", 300.99, "IT");
Employee prateema = new Employee("E143", "Prateema Rai", 300.99, "Benefits");
Employee yogen = new Employee("E323", "Yogen Rai", 200.99, "Sales");
List<Employee> employees = Arrays.asList(john, south, reet, prateema, yogen);
统计计算平均工资
Double averageSalary = employees.stream().collect(averagingDouble(Employee::getSalary));
// 260.79
有两个很类似的方法 averagingInt(ToIntFunction<? super T> mapper)和 averagingLong(ToLongFunction<? super T> mapper) ,这两个方法可以获取平均值(依照数据类型 Integer 和 Long )。
Double totalSalary = employees.stream().collect(summingDouble(Employee::getSalary));
// 1303.95
summingInt(ToIntFunction<? super T> mapper) 和summingLong(ToLongFunction<? super T> mapper) 用来汇总数值。
Double maxSalary = employees.stream().collect(collectingAndThen(maxBy(comparingDouble(Employee::getSalary)), emp -> emp.get().getSalary()));
// 300.99
collectingAndThen 函数声明:
Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream, Function<R,RR> finisher)
Function finisher 可以将归约器输出的最终结果格式化为:
String avgSalary = employees.stream()
.collect(collectingAndThen(averagingDouble(Employee::getSalary), new DecimalFormat("'
0.000")::format));
// $260.790
DoubleSummaryStatistics statistics = employees.stream().collect(summarizingDouble(Employee::getSalary));
System.out.println("Average: " + statistics.getAverage() + ", Total: " + statistics.getSum() + ", Max: " + statistics.getMax() + ", Min: "+ statistics.getMin());
// Average: 260.79, Total: 1303.95, Max: 300.99, Min: 200.99
与上面类似, summarizingInt(ToIntFunction<? super T> mapper)和summarizingLong(ToLongFunction<? super T> mapper)分别处理 Integer and Long 数据类型数据。
Mapping Only Employee Names
List<String> employeeNames = employees.stream().collect(mapping(Employee::getName, toList()));
// [John Nhoj, South Htuos, Reet Teer, Prateema Rai, Yogen Rai]
Joining Employee Names
String employeeNamesStr = employees.stream().map(Employee::getName).collect(joining(","));
// John Nhoj,South Htuos,Reet Teer,Prateema Rai,Yogen Rai
The joining() 函数具有重载版本,还有前缀和后缀,如:
Collector<CharSequence,?,String> joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix)
因此,如果您想以特定的格式归集员工姓名,那么您可以:
String employeeNamesStr = employees.stream().map(Employee::getName).collect(joining(", ", "Employees = {", "}"));
// Employees = {John Nhoj, South Htuos, Reet Teer, Prateema Rai, Yogen Rai}
员工按部门分组
groupingBy()实现分组的功能:
Collector<T,?,Map<K,List<T>>> groupingBy(Function<? super T,? extends K> classifier)
员工按部门分组的代码:
Map<String, List<Employee>> deptEmps = employees.stream().collect(groupingBy(Employee::getDepartment));
// {Sales=[{empId='E223', name='South Htuos', salary=299.99, department='Sales'}, {empId='E323', name='Yogen Rai', salary=200.99, department='Sales'}], Benefits=[{empId='E143', name='Prateema Rai', salary=300.99, department='Benefits'}], IT=[{empId='E123', name='John Nhoj', salary=200.99, department='IT'}, {empId='E133', name='Reet Teer', salary=300.99, department='IT'}]}
按部门分组统计员工数量
groupingBy() 有一个重载版本:
Collector<T,?,Map<K,List<T>>> groupingBy(Function<? super T,? extends K> classifier,Collector<? super T,A,D> downstream)
因此,每个部门的员工人数应该是:
Map<String, Long> deptEmpsCount = employees.stream().collect(groupingBy(Employee::getDepartment, counting()));
// {Sales=2, Benefits=1, IT=2}
按部门名称计算每个部门的平均工资
groupingBy()方法另外一个重载版本是:
Collector<T,?,M> groupingBy(Function<? super T,? extends K> classifier, Supplier<M> mapFactory, Collector<? super T,A,D> downstream)
TreeMap可用于按部门名称分组,排序如下:
Map<String, Double> averageSalaryDeptSorted = employees.stream().collect(groupingBy(Employee::getDepartment, TreeMap::new, averagingDouble(Employee::getSalary)));
// {Benefits=300.99, IT=250.99, Sales=250.49}
groupBy()方法还一个ConcurrentHashMap(并发)版本,可利用多核架构。
Map<String, Long> deptEmpCount = employees.stream().collect(groupingByConcurrent(Employee::getDepartment, counting()));
// {Sales=2, IT=2, Benefits=1}
partitionby()使用谓词将结果分割为true以满足谓词条件,false为不满足:
Collector<T,?,Map<Boolean,List<T>>> partitioningBy(Predicate<? super T> predicate)
找到比平均工资高的员工:
Map<Boolean, List<Employee>> portionedEmployees = employees.stream().collect(partitioningBy(e -> e.getSalary() > averageSalary));
// {false=[{empId='E123', name='John Nhoj', salary=200.99, department='IT'}, {empId='E323', name='Yogen Rai', salary=200.99, department='Sales'}],
true=[{empId='E223', name='South Htuos', salary=299.99, department='Sales'}, {empId='E133', name='Reet Teer', salary=300.99, department='IT'}, {empId='E143', name='Prateema Rai', salary=300.99, department='Benefits'}]}
您可以使用该方法的重载版本来过滤结果,如:
Collector<T,?,Map<Boolean,D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T,A,D> downstream)
归约器collector类具有许多实用函数,可以在Stream上操作并有效地提取结果数据。