前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava2 源码解读之 ConcatMap

RxJava2 源码解读之 ConcatMap

作者头像
三好码农
发布2018-09-11 10:51:37
9140
发布2018-09-11 10:51:37
举报

上一篇文章中我们学习了RxJava2中 FlatMap 的原理,同时知道,FlatMap经过转换后发射的数据不是严格有序的,如果需要数据按顺序被发射,RxJava2提供了另外一个操作符, 也是这篇文章的主角 — ConcatMap.

FlatMap vs ConcatMap

之前分析了FlatMap发射数据无序的原因,但是没有实际用代码验证过,这里我们在分析ConcatMap源码之前,我们先运行测试代码,有个直观的感受。

FlatMap

示例:原始发射的数据是1,2,3,4,5,经过转换后10,100,1000,20,200...50,500,5000

List<String> resultList = new CopyOnWriteArrayList<>();
Observable.create((ObservableOnSubscribe<String>) emitter -> {
     for (int i = 1; i < 6; i++) {
         emitter.onNext(String.valueOf(i));
     }
     emitter.onComplete();
   }).flatMap(s -> {
       String tail = "";
       ArrayList<String> newMap = new ArrayList<>();
       for (int i = 0; i < 3; i++) {
           newMap.add(s + tail);
           tail = tail + "0";
       }
        return Observable.fromIterable(newMap);
    }).doOnComplete(() -> {
        L.d(resultList.toString());
   }).subscribe(resultList::add);

直接看运行结果

flatmap_运行结果.png

看到结果,可能会有疑问,这里不是有序的吗,那不跟我们上面的结论矛盾吗?其实不矛盾,这里之所以有序是因为我们这里的数据转换只是简单的字符串拼接,执行速度非常快,才会产生FlatMap按顺序发射数据的假象,我们将FlatMap的转换加上50 ms 的delay,再运行一下。

List<String> resultList = new CopyOnWriteArrayList<>();
Observable.create((ObservableOnSubscribe<String>) emitter -> {
     for (int i = 1; i < 6; i++) {
         emitter.onNext(String.valueOf(i));
     }
     emitter.onComplete();
   }).flatMap(s -> {
       String tail = "";
       ArrayList<String> newMap = new ArrayList<>();
       for (int i = 0; i < 3; i++) {
           newMap.add(s + tail);
           tail = tail + "0";
       }
        return Observable.fromIterable(newMap).delay(50, TimeUnit.MILLISECONDS);
    }).doOnComplete(() -> {
        L.d(resultList.toString());
   }).subscribe(resultList::add);

flatmap_delay 运行结果.png

可以看到,这一次数据的发射是无序的,验证了我们之前的结论(提一下,虽然1,2,3,4,5的顺序是乱的,但是1,100,1000 它们的顺序不会改变的,因为它们是同一个Observable发射的)

ConcatMap

我们将上面的操作符flatmap 改成 concatMap 再看运行打印结果

List<String> resultList = new CopyOnWriteArrayList<>();
Observable.create((ObservableOnSubscribe<String>) emitter -> {
     for (int i = 1; i < 6; i++) {
         emitter.onNext(String.valueOf(i));
     }
     emitter.onComplete();
   }).concatMap(s -> {
       String tail = "";
       ArrayList<String> newMap = new ArrayList<>();
       for (int i = 0; i < 3; i++) {
           newMap.add(s + tail);
           tail = tail + "0";
       }
        return Observable.fromIterable(newMap).delay(50, TimeUnit.MILLISECONDS);
    }).doOnComplete(() -> {
        L.d(resultList.toString());
   }).subscribe(resultList::add);

concatmap_运行结果.png

可以看到,虽然我们加上了50ms 的发射延时,数据仍然是完全按顺序发射的。废话不多说,直接看ConcatMap源码。

ConcatMap 源码

嗯,先看方法定义,熟悉的味道

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
     return concatMap(mapper, 2);
}

接着往下看

最终生成的是ObservableConcatMap(看名字就知道是个Observable)

ObservableConcatMap.png

我们还是看ObservableConcatMap 的 subscribeActual 方法

subscribeActual.png

我们传入的delayErrors 默认是 IMMEDIATE,所以生成的 是 SourceObserver对象,找到了最终的Observer,我们先看它的onNext方法实现

SourceObserver onNext.png

可以看到,新建了一个队列对象 queue,先将要发射的数据放入队列中,接下来重点看drain方法 (drain,英文渣渣特意查了字典,排水管,很形象是不是~~~)

drain.png

SourceObserver 同样继承了AtomicInteger,getAndIncrement方法保证自增的原子性,所以这里只有初始值为0时,执行下面的循环,进入循环,做了2个判断,一个是判断,是否已经disposed,如果是 清空队列并退出循环,还有个active字段,表示当前是否还有Observable在发射,(比如我们上面的例子,原始发射了5个数据,1,2,3,4,5,所以经过ConcatMap转换就有了5个 Observable,每个Observable各自携带3个数据,所以在这里如果用ConcatMap 操作符,这5个Observable都是严格排序发射,只有上一个发射完全完成之后,才会开始下一个,而且因为所有要发射的数据在之前已经加入到queue队列中,所以不曾在竞争,这样也就保证了数据发射的顺序)

调用我们提供的mapper,生成Observable,调用subscribe方法,传递的是InnerObserver

subscribe inner.png

重点看InnerObserver 的 onComplete 方法

inner oncomplete.png

里面最终调用了SourceObservable 的 innerComplete方法

parent inner compelte.png

我们看到在这里 将 active 设为false,同时调用了 drain 方法,循环获取队列中的数据,然后发射。这样,整个流程就清楚了,不是很难,但是读了源码,用的时候,会更有自信。以上,水平有限,看官们开心-v-

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.08.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • FlatMap vs ConcatMap
    • FlatMap
      • ConcatMap
        • ConcatMap 源码
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档