以上一二篇主要是RxJava2.0中的改动,下面我们重点介绍下RxJava2.0中的观察者模式。
RxJava始终以观察者模式为骨架,在2.0中依然如此。
在RxJava2.0中,有五种观察者模式:
Observable/Observer
Flowable/Subscriber
Single/SingleObserver
Completable/CompletableObserver
Maybe/MaybeObserver
后面三种观察者模式差不多,Maybe/MaybeObserver
可以说是Single/SingleObserver
和Completable/CompletableObserver
的复合体。
下面列出这五个观察者模式相关的接口。
//代表一个延迟计算没有任何价值,但只显示完成或异常。
类似事件模式Reactive-Streams:onSubscribe(onError | onComplete)?
public abstract class Completable implements CompletableSource{
...
}
//没有子类继承Completable
public interface CompletableSource {
void subscribe(CompletableObserver cs);
}
public interface CompletableObserver {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable e);
}
public abstract class Flowable<T> implements Publisher<T>{
...
}
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
//Maybe类似Completable,
它的主要消费类型是MaybeObseer顺序的方式,遵循这个协议:
onSubscribe(onSuccess | onError | onComplete)
public abstract class Maybe<T> implements MaybeSource<T>{
...
}
public interface MaybeSource<T> {
void subscribe(MaybeObserver<? super T> observer);
}
public interface MaybeObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
void onComplete();
}
//Single功能类似于Obserable,除了它只能发出一个成功的值,或者一个错误(没有“onComplete”事件),这个特性是由SingleSource接口决定的。
public abstract class Single<T> implements SingleSource<T>{
...
}
public interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
public interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
}
其实从API中我们可以看到,每一种观察者都继承自各自的接口(都有一个共同的方法subscrib()),但是参数不一样),正是各自接口的不同,决定了他们功能不同,各自独立(特别是Observable和Flowable),同时保证了他们各自的拓展或者配套的操作符不会相互影响。
下面我们重点说说在实际开发中经常会用到的两个模式:Observable/Observer和Flowable/Subscriber。
Observable正常用法:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) { } @Override
public void onNext(Integer integer) { } @Override
public void onError(Throwable e) { } @Override
public void onComplete() { }
});
需要注意的是,这类观察模式不支持背压,下面我们具体分析下。
当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM。
在测试的时候,快速发送了100000个整形数据,下游延迟接收,结果被观察者的数据全部发送出去了,内存确实明显增加了,遗憾的是没有OOM。
所以,当我们使用Observable/Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线,供各位参考)。
Flowable.range(0, 10)
.subscribe(new Subscriber<Integer>() {
Subscription subscription;
//当订阅后,会首先调用这个方法,其实就相当于onStart(),
//传入的Subscription s参数可以用于请求数据或者取消订阅
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onsubscribe start");
subscription = s;
subscription.request(1);
Log.d(TAG, "onsubscribe end");
} @Override
public void onNext(Integer o) {
Log.d(TAG, "onNext--->" + o);
subscription.request(3);
} @Override
public void onError(Throwable t) {
t.printStackTrace();
} @Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});1
输出结果如下:
Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
当然,Flowable也可以通过create()来创建:
Flowable虽然可以通过create()
来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的(这个在1.0的时候就很难保证,可以说RxJava2.0收紧了create()的权限)。
根据上面的代码的结果输出中可以看到,当我们调用subscription.request(n)
方法的时候,不等onSubscribe()
中后面的代码执行,就会立刻执行onNext方法,因此,如果你在onNext方法中使用到需要初始化的类时,应当尽量在subscription.request(n)
这个方法调用之前做好初始化的工作;
当然,这也不是绝对的,我在测试的时候发现,通过create()
自定义Flowable的时候,即使调用了subscription.request(n)
方法,也会等onSubscribe()
方法中后面的代码都执行完之后,才开始调用onNext。
RxJava1.x 如何平滑升级到RxJava2.0呢?
由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.0,或者将RxJava2.0转回RxJava1.x。
地址:https://github.com/akarnokd/RxJava2Interop
可以明显的看到,RxJava2.0最大的改动就是对于backpressure的处理,为此将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。
除此之外,就是我们最熟悉和喜爱的RxJava。
---我是分割线---
Tamic开发社区
非专业的移动社区
不只是干货,还有人生
长按二维码关注我们