前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava2.0你不知道的事(三)

RxJava2.0你不知道的事(三)

作者头像
开发者技术前线
发布2020-11-23 11:43:30
6480
发布2020-11-23 11:43:30
举报
文章被收录于专栏:开发者技术前线

以上一二篇主要是RxJava2.0中的改动,下面我们重点介绍下RxJava2.0中的观察者模式。

RxJava2.0中的观察者模式

RxJava始终以观察者模式为骨架,在2.0中依然如此。

在RxJava2.0中,有五种观察者模式:

  1. Observable/Observer
  2. Flowable/Subscriber
  3. Single/SingleObserver
  4. Completable/CompletableObserver
  5. Maybe/MaybeObserver

后面三种观察者模式差不多,Maybe/MaybeObserver可以说是Single/SingleObserverCompletable/CompletableObserver的复合体。

下面列出这五个观察者模式相关的接口。

Observable/Observer
代码语言:javascript
复制
Completable/CompletableObserver
代码语言:javascript
复制
//代表一个延迟计算没有任何价值,但只显示完成或异常。
  类似事件模式Reactive-Streams:onSubscribe(onError | onComplete)?
  public abstract class Completable implements CompletableSource{
     ...
  }
  //没有子类继承Completable
public interface CompletableSource {
   void subscribe(CompletableObserver cs);
}
public interface CompletableObserver {
   void onSubscribe(Disposable d);    
    void onComplete();    
    void onError(Throwable e);
}
Flowable/Subscriber
代码语言:javascript
复制
public abstract class Flowable<T> implements  Publisher<T>{
  ...
}
public interface Publisher<T> {    
   public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {    
   public void onSubscribe(Subscription s);    
   public void onNext(T t);    
   public void onError(Throwable t);    
   public void onComplete();
}
Maybe/MaybeObserver
代码语言:javascript
复制
//Maybe类似Completable,
它的主要消费类型是MaybeObseer顺序的方式,遵循这个协议:
onSubscribe(onSuccess | onError | onComplete)
public abstract class Maybe<T> implements MaybeSource<T>{
     ...
     }
public interface MaybeSource<T> {
   void subscribe(MaybeObserver<? super T> observer);
}
public interface MaybeObserver<T> {
    void onSubscribe(Disposable d);    
     void onSuccess(T t);    
     void onError(Throwable e);    
     void onComplete();
}
Single/SingleObserver
代码语言:javascript
复制
//Single功能类似于Obserable,除了它只能发出一个成功的值,或者一个错误(没有“onComplete”事件),这个特性是由SingleSource接口决定的。
public abstract class Single<T> implements SingleSource<T>{
     ...
     }
public interface SingleSource<T> {
   void subscribe(SingleObserver<? super T> observer);
}
public interface SingleObserver<T> {
   void onSubscribe(Disposable d);    
    void onSuccess(T t);    
    void onError(Throwable e);
}

其实从API中我们可以看到,每一种观察者都继承自各自的接口(都有一个共同的方法subscrib()),但是参数不一样),正是各自接口的不同,决定了他们功能不同,各自独立(特别是Observable和Flowable),同时保证了他们各自的拓展或者配套的操作符不会相互影响。

下面我们重点说说在实际开发中经常会用到的两个模式:Observable/Observer和Flowable/Subscriber。

Observable/Observer

Observable正常用法:

代码语言:javascript
复制
Observable.create(new ObservableOnSubscribe<Integer>() {    
    @Override
   public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
       emitter.onNext(1);
       emitter.onNext(2);
       emitter.onComplete();
   }
}).subscribe(new Observer<Integer>() {    
    @Override
   public void onSubscribe(Disposable d) {   }    @Override
   public void onNext(Integer integer) {   }    @Override
   public void onError(Throwable e) {   }    @Override
   public void onComplete() {   }
});

需要注意的是,这类观察模式不支持背压,下面我们具体分析下。

当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM。

在测试的时候,快速发送了100000个整形数据,下游延迟接收,结果被观察者的数据全部发送出去了,内存确实明显增加了,遗憾的是没有OOM。

所以,当我们使用Observable/Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线,供各位参考)。

Flowable/Subscriber
代码语言:javascript
复制
Flowable.range(0, 10)
       .subscribe(new Subscriber<Integer>() {
           Subscription subscription;            
         //当订阅后,会首先调用这个方法,其实就相当于onStart(),
           //传入的Subscription s参数可以用于请求数据或者取消订阅
           @Override
           public void onSubscribe(Subscription s) {
               Log.d(TAG, "onsubscribe start");
               subscription = s;
               subscription.request(1);
               Log.d(TAG, "onsubscribe end");
           }            @Override
           public void onNext(Integer o) {
               Log.d(TAG, "onNext--->" + o);
               subscription.request(3);
           }            @Override
           public void onError(Throwable t) {
               t.printStackTrace();
           }            @Override
           public void onComplete() {
               Log.d(TAG, "onComplete");
           }
       });1

输出结果如下:

代码语言:javascript
复制

Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。

当然,Flowable也可以通过create()来创建:

Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的(这个在1.0的时候就很难保证,可以说RxJava2.0收紧了create()的权限)。

根据上面的代码的结果输出中可以看到,当我们调用subscription.request(n)方法的时候,不等onSubscribe()中后面的代码执行,就会立刻执行onNext方法,因此,如果你在onNext方法中使用到需要初始化的类时,应当尽量在subscription.request(n)这个方法调用之前做好初始化的工作;

当然,这也不是绝对的,我在测试的时候发现,通过create()自定义Flowable的时候,即使调用了subscription.request(n)方法,也会等onSubscribe()方法中后面的代码都执行完之后,才开始调用onNext。

平滑升级

RxJava1.x 如何平滑升级到RxJava2.0呢?

由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.0,或者将RxJava2.0转回RxJava1.x。

地址:https://github.com/akarnokd/RxJava2Interop

总结

可以明显的看到,RxJava2.0最大的改动就是对于backpressure的处理,为此将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。

除此之外,就是我们最熟悉和喜爱的RxJava。

---我是分割线---

Tamic开发社区

非专业的移动社区

不只是干货,还有人生

长按二维码关注我们

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-02-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发者技术前线 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RxJava2.0中的观察者模式
    • Observable/Observer
      • Completable/CompletableObserver
        • Flowable/Subscriber
          • Maybe/MaybeObserver
            • Single/SingleObserver
              • Observable/Observer
                • Flowable/Subscriber
                • 平滑升级
                • 总结
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档