描述 | RxJava 1.X | RxJava 2.X |
---|---|---|
package包名 | rx.xxx | io.reactivex.xxx |
Reactive Streams规范 | 1.X早于Reactive Streams规范出现,仅部分支持规范 | 完全支持 |
Backpressure 背压 | 对背压的支持不完善 | Observable设计为不支持背压新增Flowable支持背压 |
null空值 | 支持 | 不再支持null值,传入null值会抛出 NullPointerException |
Schedulers线程调度器 | Schedulers.immediate()Schedulers.trampoline()Schedulers.computation()Schedulers.newThread()Schedulers.io()Schedulers.from(executor)AndroidSchedulers.mainThread() | 移除Schedulers.immediate()新增Schedulers.single()其它未变 |
Single | 行为类似Observable,但只会发射一个onSuccess或onError | 按照Reactive Streams规范重新设计,遵循协议onSubscribe(onSuccess/onError) |
Completable | 行为类似Observable,要么全部成功,要么就失败 | 按照Reactive Streams规范重新设计,遵循协议onSubscribe (onComplete/onError) |
Maybe | 无 | 2.X新增,行为类似Observable,可能会有一个数据或一个错误,也可能什么都没有。可以将其视为一种返回可空值的方法。这种方法如果不抛出异常的话,将总是会返回一些东西,但是返回值可能为空,也可能不为空。按照Reactive Streams规范设计,遵循协议onSubscribe (onSuccess/onError/onComplete) |
Flowable | 无 | 2.X新增,行为类似Observable,按照Reactive Streams规范设计,支持背压Backpressure |
Subject | AsyncSubjectBehaviorSubjectPublishSubjectReplaySubjectUnicastSubject | 2.X依然维护这些Subject现有的功能,并新增:AsyncProcessorBehaviorProcessorPublishProcessorReplayProcessorUnicastProcessor支持背压Backpressure |
Subscriber | Subscriber | 由于与Reactive Streams的命名冲突,Subscriber已重命名为Disposable |
RxJava 2.X + Retrofit + OkHttp 简单示例点这里
library
依赖变化//1.X
compile 'io.reactivex:rxjava:1.2.1'
compile 'io.reactivex:rxandroid:1.2.1'
//2.X
compile 'io.reactivex.rxjava2:rxjava:2.0.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0'
package
变化变动主要为rx.xxx --> io.reactivex.xxx
//1.X
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import rx.functions.Action1;
//2.X
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.functions.Consumer;
null
值RxJava 2.X
不再支持null
值,如果传入一个null
会抛出NullPointerException
Observable.just(null);
Single.just(null);
Observable.fromCallable(() -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
Observable.just(1).map(v -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
案例1
//1.X
public static final Observable.Transformer IO_TRANSFORMER = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
}
};
public static final <T> Observable.Transformer<T, T> applySchedulers(Observable.Transformer transformer){
return (Observable.Transformer<T, T>)transformer;
}
Action1<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Subscription subscription = Observable.from(items)
.compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
.map(new Func1<String, Integer>() {
@Override public Integer call(String s) {
return Integer.valueOf(s);
}
})
.subscribe(onNext);
//TODO subscription.unsubscribe();
//2.X
public static final ObservableTransformer IO_TRANSFORMER = new ObservableTransformer() {
@Override public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
}
};
public static final <T> ObservableTransformer<T, T> applySchedulers(ObservableTransformer transformer){
return (ObservableTransformer<T, T>)transformer;
}
Consumer<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Disposable disposable = Observable.fromArray(items)
.compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
.map(new Function<String, Integer>() {
@Override public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
})
.subscribe(onNext);
//TODO disposable.dispose();
.subscribe(...)
返回值的变化:1.X为Subscription
, 2.X为Disposable
Transformer
的变化:1.X为rx.Observable
内部的Transformer
接口, 继承自Func1<Observable<T>, Observable<R>>
, 2.X为io.reactivexObservableTransformer<Upstream, Downstream>
,是一个独立的接口AndroidSchedulers
的变化: 1.X为rx.android.schedulers.AndroidSchedulers
, 2.X为io.reactivex.android.schedulers.AndroidSchedulers
Func1
的变化: 1.X为rx.functions.Func1
, 2.X为io.reactivex.functions.Function
1.X
2.X
案例2
//1.X
public class AppBaseActivity extends AppCompatActivity {
...
private CompositeSubscription mCompositeSubscription;
protected void addSubscription(Subscription subscription) {
if (null == mCompositeSubscription) {
mCompositeSubscription = new CompositeSubscription();
}
mCompositeSubscription.add(subscription);
}
@Override protected void onDestroy() {
if (null != mCompositeSubscription) {
mCompositeSubscription.unsubscribe();
}
super.onDestroy();
}
...
}
//2.X
public class AppBaseActivity extends AppCompatActivity {
...
private CompositeDisposable mCompositeDisposable;
protected void addDisposable(Disposable disposable) {
if (null == mCompositeDisposable) {
mCompositeDisposable = new CompositeDisposable();
}
mCompositeDisposable.add(disposable);
}
@Override protected void onDestroy() {
if (null != mCompositeDisposable) {
mCompositeDisposable.clear();
}
super.onDestroy();
}
...
}
我现在维护的项目基本就这些改动,如有错误之处,还望批评指正,谢谢!
作者:续写经典
链接:https://cloud.tencent.com/developer/article/1197122
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。