Java8 Lambda(二)-Stream原理

推荐一篇博文,很好的介绍了Stream的原理.本文对其进行一些补充更加详细的讲解.

作者: 李豪 地址: https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/6-Stream%20Pipelines.md

需求:

"张三","李四","王二","张四五"中选出以开头的名字,然后从再从中选出名字最长的一个,输出其长度.

1.一种直白的实现

缺点:

  1. 迭代次数过多
  2. 频繁产生中间结果,性能无法接受

实际想要的效果: 平常的写法:

int longest = 0;
for(String str : strings){
    if(str.startsWith("张")){// 1. filter(), 保留以张开头的字符串
        int len = str.length();// 2. mapToInt(), 转换成长度
        longest = Math.max(len, longest);// 3. max(), 保留最长的长度
    }
}
System.out.println(longest);

Stream的做法:

Stream.of("张三","李四","王二","张四五")
      .filter(x -> x.startsWith("张"))
      .mapToInt(String::length)
      .max()
      .ifPresent(System.out::println);

2.Stream是怎么做到的?

Stream的操作分类:

中间操作:返回一个新的Stream

- 有状态 sorted(),必须等上一步操作完拿到全部元素后才可操作
- 无状态 filter(),该操作的元素不受上一步操作的影响
list.stream().filter(x -> x.startWith("张").map(x -> x.length())
list.stream().filter(x -> x.startWith("张").sorted().map(x -> x.length())

终端操作:返回结果

- 短路操作findFirst(),找到一个则返回,也就是break当前的循环
- 非短路操作forEach(),遍历全部元素

以上操作决定了Stream一定是先构建完毕再执行的特点,也就是延迟执行,当需要结果(终端操作时)开始执行流水线. Stream做到的是对于多次调用合并到一次迭代中处理完所有的调用方式.换句话说就是解决了上述的两个缺点.大概思路是记录下每一步的操作,然后终端操作时对其迭代依次执行每一步的操作,最后再一次循环中处理.

问题:

  1. 操作是如何记录下来的?
  2. 操作是如何叠加的?
  3. 叠加完如何执行的?
  4. 执行完如何收集结果的?

Stream结构示意图:

示例代码:

List<String> data = new ArrayList<>();
data.add("张三");
data.add("李四");
data.add("王三");
data.add("马六");

data.stream()
    .filter(x -> x.length() == 2)
    .map(x -> x.replace("三","五"))
    .sorted()
    .filter(x -> x.contains("五"))
    .forEach(System.out::println);

1. 操作是如何记录下来的?

  1. Head记录Stream起始操作
  2. StatelessOp记录中间操作
  3. StatefulOp记录有状态的中间操作 这三个操作实例化会指向其父类AbstractPipeline,也就是在AbstractPipeline中建立了双向链表

对于Head

AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null; //首操作上一步为null    
    this.sourceSpliterator = source; //数据
    this.sourceStage = this; //Head操作
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

对于其他Stage:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    //双向链表的建立
    previousStage.nextStage = this;
    this.previousStage = previousStage;
    this.sourceStage = previousStage.sourceStage;        
    this.depth = previousStage.depth + 1;        
    
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
}

调用过程如此用双向链表串联起来,每一步都得知其上一步与下一步的操作. data.stream() .filter(x -> x.length() == 2) .map(x -> x.replace(“三”,”五”)) .sorted() .filter(x -> x.contains(“五”)) .forEach(System.out::println);


2.操作是如何叠加的?

Sink<T>接口:

  1. void begin(long size),循环开始前调用,通知每个Stage做好准备
  2. void end(),循环结束时调用,依次调用每个Stage的end方法,处理结果
  3. boolean cancellationRequested(),判断是否可以提前结束循环
  4. void accept(T value),每一步的处理

其子类之一ChainedReference:

static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
    protected final Sink<? super E_OUT> downstream;

    public ChainedReference(Sink<? super E_OUT> downstream) {
        this.downstream = Objects.requireNonNull(downstream);
    }
    @Override
    public void begin(long size) {
        downstream.begin(size);
    }
    @Override
    public void end() {
        downstream.end();
    }
    @Override
    public boolean cancellationRequested() {
        return downstream.cancellationRequested();
    }
}

例Filter:

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    //条件成立则传递给下一个操作,也因为如此所以有状态的操作必须放到
                    //end方法里面
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

再例如sorted():

@Override
public void begin(long size) {
    if (size >= Nodes.MAX_ARRAY_SIZE)
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
    list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
public void end() {
    list.sort(comparator);
    downstream.begin(list.size());
    if (!cancellationWasRequested) {
        list.forEach(downstream::accept);
    }
    else {
        for (T t : list) {
            if (downstream.cancellationRequested()) break;
            downstream.accept(t);
        }
    }
    downstream.end();
    list = null;
}
@Override
public void accept(T t) {
    list.add(t);
}

叠加后如何执行?

执行操作是由终端操作来触发的,例如foreach操作

@Override
public void forEach(Consumer<? super P_OUT> action) {
    //evaluate就是开关,一旦调用就立即执行整个Stream    
    evaluate(ForEachOps.makeRef(action, false));
}

执行前会对操作从末尾到起始反向包裹起来,得到调用链

Sink opWrapSink(int flags, Sink<P_OUT> sink) ;
//这个Sink是终端操作所对应的Sink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        //依次执行调用链
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

有状态的中间操作何时执行?

例如sorted()操作,其依赖上一次操作的结果集,按照调用链来说结果集必须在accept()调用完才会产生.那也就说明sorted操作需要在end中,然后再重新开启调用链.

sorted的end方法:

@Override
 public void end() {
     list.sort(comparator);
     downstream.begin(list.size());
     if (!cancellationWasRequested) {
         list.forEach(downstream::accept);
     }
     else {
         for (T t : list) {
             if (downstream.cancellationRequested()) break;
             downstream.accept(t);
         }
     }
     downstream.end();
     list = null;
 }

那么就相当于sorted给原有操作断路了一次,然后又重新接上,再次遍历.

如何收集到结果?

foreach是不需要收集到结果的,但是对于collect这样的操作是需要拿到最终end产生的结果.end产生的结果在最后一个Sink中,这样的操作最终都会提供一个取出数据的get方法.

@Override
 public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                    Spliterator<P_IN> spliterator) {
     return helper.wrapAndCopyInto(makeSink(), spliterator).get();
 }

如此拿到数据返回

个人博客 mrdear.cn ,欢迎交流

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java Web

Java 面试知识点解析(四)——版本特性篇

15950
来自专栏技术/开源

从C#到TypeScript - 类型

从C#到TypeScript - 类型 TypeScript和C#一样是微软搞出来的,而且都是大牛Anders Hejlsberg领导开发的,它们之间有很多共同...

25250
来自专栏向治洪

ES7、ES8新特性

概述 JavaScript,作为一门处于高速发展期的开发语言,正在变的越来越完善、稳定。我们必须拥抱这些变化,并且我们需要把ES8加入到我们的技术栈中。 E...

1.9K50
来自专栏恰童鞋骚年

【译】.NET中六个重要的概念:栈、堆、值类型、引用类型、装箱和拆箱

  一来是为了感受国外优秀技术社区知名博主的高质量文章,二来是为了复习对.NET技术的基础拾遗达到温故知新的效果,最后也是为了锻炼一下自己的英文读写能力。因为是...

8120
来自专栏java 成神之路

URI 源码分析

396150
来自专栏圣杰的专栏

线程安全知多少

1. 如何定义线程安全 线程安全,拆开来看: 线程:指多线程的应用场景下。 安全:指数据安全。 多线程就不用过多介绍了,相关类型集中在System.Thread...

35250
来自专栏逍遥剑客的游戏开发

在C#中派生C++的抽象类

22940
来自专栏影子

jQuery中的常用内容总结(三)

转载请注明地址:http://www.cnblogs.com/funnyzpc/p/7571998.html

11110
来自专栏向治洪

ES7和ES8新特性介绍

概述 JavaScript,作为一门处于高速发展期的开发语言,正在变的越来越完善、稳定。我们必须拥抱这些变化,并且我们需要把ES8加入到我们的技术栈中。 ECM...

69360
来自专栏二进制文集

JDK Timer 实现原理分析

注意其中最大的区别,在于 schedule 调用 sched 函数时,将传入的 period 取反了。如果某次执行任务的开始时间延后了,那么此后的每次任务都会延...

10930

扫码关注云+社区

领取腾讯云代金券