前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava2源码解读之 Map、FlatMap

RxJava2源码解读之 Map、FlatMap

作者头像
三好码农
发布2018-09-11 10:51:13
1.3K0
发布2018-09-11 10:51:13
举报
文章被收录于专栏:三好码农的三亩自留地

RxJava给我们提供了很多变换的操作符,map、flatMap就是比较常用的操作符,一般我们使用的时候,都是看官方文档来了解每个操作符的含义,但是我自己感觉下来,看官方文档使用没问题,但是总有一点隔靴搔痒的意思,所以我还要去RxJava的源码一探究竟,做到心中有数。

我们先从相对简单的 Map 开始

Map

官方定义:transform the items emitted by an Observable by applying a function to each item 拙劣的翻译:应用一个函数 转换所有的被发射的item

官方的图解:

map 图例.png

到这里我们总结一下:

  • map 转换是一对一的,原来发射了几个数据,转换之后还是几个
  • map 转换可以改变发射的数据类型

这里抛出一个问题,map 调用我们提供的function进行转换,那么这个function在什么时候被调用?在哪个线程被调用?(这个对我们实际工程中使用map有意义,知道代码被执行的线程是必须的)

废话不多说,进入源码

Map源码

Observable类是RxJava的门面,基本上所有的转换符都在这里定义,直接看Map 的方法定义

map 方法.png

可以看到,Function类,泛型有2个参数,第一个是原数据类型,第二个是转换后的数据类型,最终返回的是ObservableMap 类(RxJava的类命名很规范,如果是Observable类型的就是Observable开头 + 具体的操作符名称,如果是Observer类型的 就是 具体的操作符名称 + Observer结尾)我们进入ObservableMap类,Observable类之前的文章有提到过,subscribeActual 是个重要的钩子方法,所以我们直接看ObservableMap如何重写该方法的

ObservaleMap.png

方法代码就一行,调用装饰的Observable的subscribe方法,传递一个MapObserver对象,Observer类我们就比较熟悉了,我们这里主要看onNext方法

map onnext.png

代码也很简单,红框标识的就是 mapper 转换函数被调用的地方,得到转换后的对象v,传递给被装饰的Observer 的onNext方法,到这里,一次数据的map转换就结束了。源码的实现还是很简单的,在我们了解了源码的实现后,思路会更清晰,写代码时也会更有把握。

现在我们来解答前面我们抛出的问题,Function在什么时候被调用?在哪个线程被调用? Function调用的地方已经清楚了,在ObserverMap 的 onNext方法中,那么调用的线程呢,因为是在Observer方法中被调用,所以如果在map 之前 调用了 ObserverOn 方法设置监听线程,那么就在该监听线程,如果没有设置 ObserverOn 但是设置了 SubscribeOn方法设置发射线程,那么就在该 发射线程,如果SubscribeOn也没有设置,那就在Observable的创建线程。

到此Map 就介绍完了,接下来是Map 的好兄弟 FlatMap,调用逻辑稍微复杂一点点,看官们耐心 -。-

FlatMap

官方定义:transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable 拙劣的翻译:应用一个函数 转换所有的被发射的item从一个Observable转成为多个Observable,并将所有要发射的数据平铺为一个Observable

官方的图解:

flatmap 图例.png

到这里我们总结一下:

  • flatmap 转换是一对多的(一对一当然也支持),原来发射了几个数据,转换之后可以是更多个
  • flatMap 转换同样可以改变发射的数据类型
  • flatMap 转换后的数据,还是会逐个发射给我们的Observer来接收(就像这些数据是由一个Observable发射的一样,其实是多个Observable发射然后合并的)

这里抛出一个问题,flatMap会将原来的Observable,转换为多个Observable来发射数据,那么这些发射的数据是否会严格按顺序发射然后被Observer接收

问题先留在这里,进入源码

FlatMap 源码

FlatMap操作符涉及的代码会相对多一些,但是也是有规律可循。 同样到Observable 类中看 flatMap的定义,源码作者为了方便开发者调用,提供了多个方法重载,我们最常用的方法定义如下

flatmap 方法1.png

最终调用的方法是

flatmap 方法.png

跟map 的套路 差不多,我们直接进入 ObservableFlatMap类, 我们还是看它的 subscribeActual 方法实现

ObservableFlatMap.png

可以看到,它给原Observer 装饰后的 Observer 是 MergeObserver,我们再继续看 MergeObserver 的 onNext 方法

MergeObserver onnext.png

由于我们默认调用的flatmap 的 maxConcurrency 大小是 Integer.MAX_VALUE, 所以最终会调用 subscribeInner(p),注意这里我们的mapper方法以及被调用了,p就是跟我们传入的Function生成的Observable,我们再继续往下看

MergeObserver subscribeInner.png

一般我们传入的Function 生成的Observable 都不是 Callable类型的,所以最终传给Observable p 的 是InnerObserver, 找到了最终元凶,直接去看它的onNext方法实现吧。

MergeObserver onnext.png

funsionMode 默认是 None,走第一个if 逻辑,最终调用的是 上面的MergeObserable 的 tryEmit 方法,继续进去看

MergeObservable tryEmit.png

这里要插一句,MergeObserver 继承了 AtomicInteger,所以这里的tryEmit方法就利用了 AtomicInteger 的同步机制,所以同时只会有一个 value 被 actual Observer 发射,而且这里 刚好 可以解答我们上面留下的 问题,由于 AtomicInteger CAS锁只能保证操作的原子性,并不保证锁的获取顺序,是抢占式的,所以最终数据的发射顺序并不是固定的(同一个Observable发出的数据是有序的)

如果没有获取到锁,就会将要发射的数据放入 队列中,drainLoop 方法会循环去获取队列中的 数据,然后发射,由于篇幅有限,更详细的调用过程大家可以看源码。

dramloop.png

Map 和 FlatMap 二个操作符的 源码就解析到这里,水平有限,有不对的,还望大佬不吝赐教。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.08.05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Map
    • Map源码
    • FlatMap
      • FlatMap 源码
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档