来源:http://t.cn/ELmra8O
怎样在一行代码里同时计算一个列表的和、最大值、最小值、平均值、元素个数、奇偶分组、指数、排序呢?
答案是思维反转!将行为作为数据传递。 文艺青年的代码如下所示:
public class FunctionUtil {
public static <T,R> List<R> multiGetResult(List<Function<List<T>, R>> functions, List<T> list) {
return functions.stream().map(f -> f.apply(list)).collect(Collectors.toList());
}
public static void main(String[] args) {
System.out.println(multiGetResult(
Arrays.asList(
list -> list.stream().collect(Collectors.summarizingInt(x->x)),
list -> list.stream().filter(x -> x < 50).sorted().collect(Collectors.toList()),
list -> list.stream().collect(Collectors.groupingBy(x->(x%2==0? "even": "odd"))),
list -> list.stream().sorted().collect(Collectors.toList()),
list -> list.stream().sorted().map(Math::sqrt).collect(Collectors.toMap(x->x, y->Math.pow(2,y)))),
Arrays.asList(64,49,25,16,9,4,1,81,36)));
}
}
呃,有点卖弄小聪明。 不过要是能将行为作为数据自由传递和施加于数据集产生结果,那么其代码表达能力将如庄子之言,恣意潇洒而无所极限。
行为就是数据。
函数编程的最直接的表现,莫过于将函数作为数据自由传递,结合泛型推导能力,使代码表达能力获得飞一般的提升。那么,Java8是怎么支持函数编程的呢?主要有三个核心概念:
关于函数接口,需要记住的就是两件事:
最直接的支持就是 java.util.Function 包。定义了四个最基础的函数接口:
其中, compose, andThen, and, or, negate 用来组合函数接口而得到更强大的函数接口。
其它的函数接口都是通过这四个扩展而来。
那么,这些函数接口可以接收哪些值呢?
在博文“使用函数接口和枚举实现配置式编程(Java与Scala实现)”, “精练代码:一次Java函数式编程的重构之旅” 给出了基本的例子。后面还有更多例子。重在练习和尝试。
先说聚合器。每一个流式计算的末尾总有一个类似 collect(Collectors.toList()) 的方法调用。collect 是 Stream 的方法,而参数则是聚合器Collector。已有的聚合器定义在Collectors 的静态方法里。 那么这个聚合器是怎么实现的呢?
大部分聚合器都是基于 Reduce 操作实现的。 Reduce ,名曰推导,含有三个要素: 初始值 init, 二元操作符 BinaryOperator, 以及一个用于聚合结果的数据源S。
Reduce 的算法如下:
STEP1: 初始化结果 R = init ;
STEP2: 每次从 S 中取出一个值 v,通过二元操作符施加到 R 和 v ,产生一个新值赋给 R = BinaryOperator(R, v);重复 STEP2, 直到 S 中没有值可取为止。
比如一个列表求和,Sum([1,2,3]) , 那么定义一个初始值 0 以及一个二元加法操作 BO = a + b ,通过三步完成 Reduce 操作:step1: R = 0; step2: v=1, R = 0+v = 1; step2: v=2, R = 1 + v = 3 ; step3: v = 3, R = 3 + v = 6。
一个聚合器的实现,通常需要提供四要素:
Collectors.CollectorImpl 的实现展示了这一点:
static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;
CollectorImpl(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Function<A,R> finisher,
Set<Characteristics> characteristics) {
this.supplier = supplier;
this.accumulator = accumulator;
this.combiner = combiner;
this.finisher = finisher;
this.characteristics = characteristics;
}
}
列表类聚合器实现,基本是基于Reduce 操作完成的。 看如下代码:
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
首先使用 ArrayList::new 创造一个空列表; 然后 List:add 将Stream累积操作的中间结果加入到这个列表;第三个函数则将两个列表元素进行合并成一个结果列表中。 就是这么简单。集合聚合器 toSet(), 字符串连接器 joining(),以及列表求和(summingXXX)、最大(maxBy)、最小值(minBy)等都是这个套路。
映射类聚合器基于Map合并来完成。看这段代码:
private static <K, V, M extends Map<K,V>>
BinaryOperator<M> mapMerger(BinaryOperator<V> mergeFunction) {
return (m1, m2) -> {
for (Map.Entry<K,V> e : m2.entrySet())
m1.merge(e.getKey(), e.getValue(), mergeFunction);
return m1;
};
}
根据指定的值合并函数 mergeFunction, 返回一个map合并器,用来合并两个map里相同key的值。mergeFunction用来对两个map中相同key的值进行运算得到新的value值,如果value值为null,会移除相应的key,否则使用value值作为对应key的值。这个方法是私有的,主要为支撑 toMap,groupingBy 而生。
toMap的实现很简短,实际上就是将指定stream的每个元素分别使用给定函数keyMapper, valueMapper进行映射得到 newKey, newValue,从而形成新的MapnewKey,newValue, 再使用mapMerger(mergeFunction) 生成的 map 合并器将其合并到 mapSupplier (2)。如果只传 keyMapper, valueMapper,那么就只得到结果(1)。
public static <T, K, U, M extends Map<K, U>>
Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier) {
BiConsumer<M, T> accumulator
= (map, element) -> map.merge(keyMapper.apply(element),
valueMapper.apply(element), mergeFunction);
return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
}
toMap 的一个示例见如下代码:
List<Integer> list = Arrays.asList(1,2,3,4,5);
Supplier<Map<Integer,Integer>> mapSupplier = () -> list.stream().collect(Collectors.toMap(x->x, y-> y * y));
Map<Integer, Integer> mapValueAdd = list.stream().collect(Collectors.toMap(x->x, y->y, (v1,v2) -> v1+v2, mapSupplier));
System.out.println(mapValueAdd);
将一个 List 转成 map[1=1,2=2,3=3,4=4,5=5],然后与另一个map[1=1,2=4,3=9,4=16,5=25]的相同key的value进行相加。注意到, toMap 的最后一个参数是 Supplier
让我们仿照 Collectors.toList() 做一个自定义的聚合器。实现一个含N个数的斐波那契序列 List。由于 Reduce 每次都从流中取一个数,因此需要生产一个含N个数的stream;可使用 Arrays.asList(1,2,3,4,5,6,7,8,9,10).stream() , 亦可使用 IntStream.range(1,11) ,不过两者的 collector 方法是不一样的。这里我们取前者。
现在,需要构造四要素:
代码如下:
/**
* Created by shuqin on 17/12/5.
*/
public class FiboCollector implements Collector<Integer, List<Integer>, List<Integer>> {
public Supplier<List<Integer>> supplier() {
return () -> {
List<Integer> result = new ArrayList<>();
result.add(0); result.add(1);
return result;
};
}
@Override
public BiConsumer<List<Integer>, Integer> accumulator() {
return (res, num) -> {
Integer next = res.get(res.size()-1) + res.get(res.size()-2);
res.add(next);
};
}
@Override
public BinaryOperator<List<Integer>> combiner() {
return null;
//return (left, right) -> { left.addAll(right); return left; };
}
@Override
public Function<List<Integer>, List<Integer>> finisher() {
return res -> { res.remove(0); res.remove(1); return res; };
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
List<Integer> fibo = Arrays.asList(1,2,3,4,5,6,7,8,9,10).stream().collect(new FiboCollector());
System.out.println(fibo);
流(Stream)是Java8对函数式编程的重要支撑。大部分函数式工具都围绕Stream展开。
Stream 主要有四类接口:
除了 Stream 本身自带的生成Stream 的方法,数组和容器及StreamSupport都有转换为流的方法。比如 Arrays.stream , [List|Set|Collection].[stream|parallelStream] , StreamSupport.[int|long|double|]stream;
流的类型主要有:Reference(对象流), IntStream (int元素流), LongStream (long元素流), Double (double元素流) ,定义在类 StreamShape 中,主要将操作适配于类型系统。
flatMap 的一个例子见如下所示,将一个二维数组转换为一维数组:
List<Integer> nums = Arrays.asList(Arrays.asList(1,2,3), Arrays.asList(1,4,9), Arrays.asList(1,8,27))
.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
System.out.println(nums);
这里我们仅分析串行是怎么实现的。入口在类 java.util.stream.ReferencePipeline 的 collect 方法:
container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container : collector.finisher().apply(container);
这里的关键是 ReduceOps.makeRef(collector)。 点进去:
public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
private static abstract class Box<U> {
U state;
Box() {} // Avoid creation of special accessor
public U get() {
return state;
}
}
Box 是一个结果值的持有者; ReducingSink 用begin, accept, combine 三个方法定义了要进行的计算;ReducingSink是有状态的流数据消费的计算抽象,阅读Sink接口文档可知。ReduceOps.makeRef(collector) 返回了一个封装了Reduce操作的ReduceOps对象。注意到,这里都是声明要执行的计算,而不涉及计算的实际过程。展示了表达与执行分离的思想。真正的计算过程启动在 ReferencePipeline.evaluate 方法里:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
使用 IDE 的 go to implementations 功能, 跟进去,可以发现,最终在 AbstractPipeLine 中定义了:
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
Spliterator 用来对流中的元素进行分区和遍历以及施加Sink指定操作,可以用于并发计算。Spliterator的具体实现类定义在 Spliterators 的静态类和静态方法中。其中有:
数组Spliterator:
static final class ArraySpliterator<T> implements Spliterator<T>
static final class IntArraySpliterator implements Spliterator.OfInt
static final class LongArraySpliterator implements Spliterator.OfLong
static final class DoubleArraySpliterator implements Spliterator.OfDouble
迭代Spliterator:
static class IteratorSpliterator<T> implements Spliterator<T>
static final class IntIteratorSpliterator implements Spliterator.OfInt
static final class LongIteratorSpliterator implements Spliterator.OfLong
static final class DoubleIteratorSpliterator implements Spliterator.OfDouble
抽象Spliterator:
public static abstract class AbstractSpliterator<T> implements Spliterator<T>
private static abstract class EmptySpliterator<T, S extends Spliterator<T>, C>
public static abstract class AbstractIntSpliterator implements Spliterator.OfInt
public static abstract class AbstractLongSpliterator implements Spliterator.OfLong
public static abstract class AbstractDoubleSpliterator implements Spliterator.OfDouble
每个具体类都实现了trySplit,forEachRemaining,tryAdvance,estimateSize,characteristics, getComparator。 trySplit 用于拆分流,提供并发能力;forEachRemaining,tryAdvance 用于遍历和消费流中的数据。下面展示了IteratorSpliterator的forEachRemaining,tryAdvance 两个方法的实现。可以看到,木有特别的地方,就是遍历元素并将指定操作施加于元素。
@Override
public void forEachRemaining(Consumer<? super T> action) {
if (action == null) throw new NullPointerException();
Iterator<? extends T> i;
if ((i = it) == null) {
i = it = collection.iterator();
est = (long)collection.size();
}
i.forEachRemaining(action);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (action == null) throw new NullPointerException();
if (it == null) {
it = collection.iterator();
est = (long) collection.size();
}
if (it.hasNext()) {
action.accept(it.next());
return true;
}
return false;
}
整体流程就是这样。回顾一下:
那么,Spliterator 又是从哪里来的呢?是通过类 java.util.stream.AbstractPipeline 的方法 sourceSpliterator 拿到的:
private Spliterator<?> sourceSpliterator(int terminalFlags) {
// Get the source spliterator of the pipeline
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
spliterator = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
// code for isParallel
return spliterator;
}
这里的 sourceStage 是一个 AbstractPipeline。 Pipeline 是实现流式计算的流水线抽象,也是Stream的实现类。可以看到,java.util.stream 定义了四种 pipeline: DoublePipeline, IntPipeline, LongPipeline, ReferencePipeline。可以重点看 ReferencePipeline 的实现。比如 filter, map
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT>
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
套路基本一样,关键点在于 accept 方法。filter 只在满足条件时将值传给下一个 pipeline, 而 map 将计算的值传给下一个 pipeline. StatelessOp 没有什么逻辑,JDK文档解释是:Base class for a stateless intermediate stage of a Stream。相应还有一个 StatefulOp, Head。 这些都是 ReferencePipeline ,负责将值在 pipeline 之间传递,交给 Sink 去计算。
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
至此,我们对整个流计算过程有了更清晰的认识。 细节可以再逐步推敲