前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava 中observeOn()与subscribeOn()

RxJava 中observeOn()与subscribeOn()

作者头像
开发者技术前线
发布2020-11-23 11:34:39
4800
发布2020-11-23 11:34:39
举报
文章被收录于专栏:开发者技术前线

放松吧

还是那个味,冬至日来首音乐, 听说吃饺子,听音乐和技术更配哦!

RxJava

observeOn和subscribeOn都是对observable的一种操作,区别就是subscribeOn改变了observable本身产生事件的schedule以及发出事件后相关处理事件的程序所在的schedule,而obseveron仅仅是改变了对发出事件后相关处理事件的程序所在的schedule。

或许你会问,这有多大的区别吗?的确是有的,比如说产生observable事件是一件费时可能会卡主线程的操作(比如说获取网络数据),那么subscribeOn就是你的选择,这样可以避免卡住主线程。

两者最主要的差别是影响的范围不同,observeOn is more limited,但是却是可以多次调用,多次改变不同的接受者所在的schedule,在调用这个函数之后的observable造成影响。而subscribeOn则是一次性的,无论在什么地方调用,总是从改变最原始的observable开始影响整个observable的处理。

subscribeOn()和observeOn()的区别

  • subscribeOn()主要改变的是订阅的线程,即call()执行的线程;
  • observeOn()主要改变的是发送的线程,即onNext()执行的线程。

subscribeOn

我们先看一个例子。

代码语言:javascript
复制

运行如下:

代码语言:javascript
复制

我们看一下subscribeOn()中,都干了什么

代码语言:javascript
复制

很明显,会走if之外的方法。

在这里我们可以看到,我们又创建了一个Observable对象,但创建时传入的参数为OperatorSubscribeOn(this,scheduler),我们看一下此对象以及其对应的构造方法后可以看到,OperatorSubscribeOn实现Onsubscribe,并且由其构造方法可知,他保存了上一个Observable对象,并保存了Scheduler对象。

这里做个总结。

把Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。 那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的应用,即source。

调用Observable_2.subscribe()方法会调用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。

下面分析下call()方法。

  • inner.schedule()改变了线程,此时Action的call()在我们指定的线程中运行。
  • Subscriber被包装了一层。
  • source.unsafeSubscribe(s);,注意source是Observable_1对象。

unsafeSubscribe方法代码:

代码语言:javascript
复制

代码很多,关键代码:

hook.onSubscribeStart(this, onSubscribe).call(subscriber);

该方法即调用了OnSubscribe_1.call()方法。注意,此时的call()方法在我们指定的线程中运行。那么就起到了改变线程的作用。

对于以上线程,我们可以总结,其有如下流程:

  • Observable.create() : 创建了Observable_1和OnSubscribe_1;
  • subscribeOn(): 创建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同时OperatorSubscribeOn保存了Observable_1的引用。
  • observable_2.subscribe(Observer):
    • 调用OperatorSubscribeOn的call()。call()改变了线程的运行,并且调用了Observable_1.unsafeSubscribe(s);
    • Observable_1.unsafeSubscribe(s);,该方法的实现中调用了OnSubscribe_1的call()。 从这个可以了解,无论我们的subscribeOn()放在哪里,他改变的是subscribe()的过程,而不是onNext()的过程。

那么如果有多个subscribeOn(),那么线程会怎样执行呢。如果按照我们的逻辑,有以下程序

代码语言:javascript
复制

那么,我们根据之前的源码分析其执行逻辑。

  • Observable.just(“ss”),创建Observable_1,OnSubscribe_1
  • Observable_1.subscribeOn(Schedulers.io()):创建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。
  • observable_2.subscribeOn(Schedulers.newThread()):创建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。
  • Observable_3.subscribe():
    • 调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。
    • 调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。
    • 调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。 根据以上逻辑分析,会按照1的线程进行执行。

subscribeOn如何工作,关键代码其实就是一行代码:

代码语言:javascript
复制

注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:

subscribeOn的调用,改变了调用前序列所运行的线程。

observeOn

看一下observeOn()源码:

代码语言:javascript
复制

这里引出了新的操作符lift

代码语言:javascript
复制

这里不再介绍了,详见:http://blog.csdn.net/jdsjlzx/article/details/51686152

在lift()中,有如下关键代码:

代码语言:javascript
复制

OperatorObserveOn.call()核心代码:

代码语言:javascript
复制

我们看到其返回了ObserveOnSubscriber< T>,注意:此时只调用了call()方法,但call()方法中并没有改变线程的操作,此时为subscribe()过程。

我们直奔重点,因为,我们了解到其改变的是onNext()过程,那么我们肯定要看一下ObserveOnSubscriber.onNext()找找在哪改变线程

代码语言:javascript
复制

这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker

我们这里需要注意下:

在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。

onNext方法最后调用了schedule(),从方法名可以看到,其肯定是改变线程用的,并且该方法经过一番循环之后,调用了该类的call()方法。

代码语言:javascript
复制

recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()。

scheduler中调用的call()方法

代码语言:javascript
复制

call()中有localChild.onNext(localOn.getValue(v));调用。

在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。

总结起来的结论就是:

  1. observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
  2. observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber

observeOn改变的是onNext()调用

subcribeOn和observeOn 对比分析

代码语言:javascript
复制

有如上逻辑,则我们对其运行进行分析。

首先,我们需要先明白其内部执行的逻辑。

在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。

其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()实在onNext()中作用。

那么对于以上的逻辑,我们可以得出如下结论:

  • 操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。
  • 操作5,6在main线程中运行。因为observeOn()改变了onNext().
  • 特别注意那一个逻辑没起到作用

再简单点总结就是

  1. subscribeOn的调用切换之前的线程。
  2. observeOn的调用切换之后的线程。
  3. observeOn之后,不可再调用subscribeOn 切换线程

复杂情况

我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?

组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?

因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。

@扔物线 大神给的总结:

  1. 下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;
  2. 只有第一subscribeOn() 起作用(所以多个 subscribeOn() 无意义);
  3. 这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();
  4. observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;
  5. 不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。

参考文章: https://segmentfault.com/a/1190000004856071 http://blog.csdn.net/lisdye2/article/details/51113837

这个(学习总结)博客花了3个小时多,看源码真的很头疼,希望以后能有所提高。

---我是分割线---

Tamic开发社区

非专业的移动社区

不只是干货,还有人生

长按二维码关注我们

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2016-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发者技术前线 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 放松吧
  • RxJava
    • subscribeOn()和observeOn()的区别
      • subscribeOn
        • observeOn
          • 复杂情况
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档