RxJava1.X升级到RxJava2.X笔记

描述

RxJava 1.X

RxJava 2.X

package包名

rx.xxx

io.reactivex.xxx

Reactive Streams规范

1.X早于Reactive Streams规范出现,仅部分支持规范

完全支持

Backpressure 背压

对背压的支持不完善

Observable设计为不支持背压新增Flowable支持背压

null空值

支持

不再支持null值,传入null值会抛出 NullPointerException

Schedulers线程调度器

Schedulers.immediate()Schedulers.trampoline()Schedulers.computation()Schedulers.newThread()Schedulers.io()Schedulers.from(executor)AndroidSchedulers.mainThread()

移除Schedulers.immediate()新增Schedulers.single()其它未变

Single

行为类似Observable,但只会发射一个onSuccess或onError

按照Reactive Streams规范重新设计,遵循协议onSubscribe(onSuccess/onError)

Completable

行为类似Observable,要么全部成功,要么就失败

按照Reactive Streams规范重新设计,遵循协议onSubscribe (onComplete/onError)

Maybe

2.X新增,行为类似Observable,可能会有一个数据或一个错误,也可能什么都没有。可以将其视为一种返回可空值的方法。这种方法如果不抛出异常的话,将总是会返回一些东西,但是返回值可能为空,也可能不为空。按照Reactive Streams规范设计,遵循协议onSubscribe (onSuccess/onError/onComplete)

Flowable

2.X新增,行为类似Observable,按照Reactive Streams规范设计,支持背压Backpressure

Subject

AsyncSubjectBehaviorSubjectPublishSubjectReplaySubjectUnicastSubject

2.X依然维护这些Subject现有的功能,并新增:AsyncProcessorBehaviorProcessorPublishProcessorReplayProcessorUnicastProcessor支持背压Backpressure

Subscriber

Subscriber

由于与Reactive Streams的命名冲突,Subscriber已重命名为Disposable

RxJava 2.X + Retrofit + OkHttp 简单示例点这里

library依赖变化

//1.X
compile 'io.reactivex:rxjava:1.2.1'
compile 'io.reactivex:rxandroid:1.2.1'

//2.X
compile 'io.reactivex.rxjava2:rxjava:2.0.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0'

package变化

变动主要为rx.xxx --> io.reactivex.xxx

//1.X
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import rx.functions.Action1;

//2.X
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.functions.Consumer;

null

RxJava 2.X不再支持null值,如果传入一个null会抛出NullPointerException

Observable.just(null);

Single.just(null);

Observable.fromCallable(() -> null)
    .subscribe(System.out::println, Throwable::printStackTrace);

Observable.just(1).map(v -> null)
    .subscribe(System.out::println, Throwable::printStackTrace);

案例1

//1.X
public static final Observable.Transformer IO_TRANSFORMER = new Observable.Transformer() {
    @Override public Object call(Object observable) {
        return ((Observable) observable).subscribeOn(Schedulers.io())
                                        .unsubscribeOn(Schedulers.io())
                                        .observeOn(Schedulers.io());
    }
};
public static final <T> Observable.Transformer<T, T> applySchedulers(Observable.Transformer transformer){
    return (Observable.Transformer<T, T>)transformer;
}
Action1<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Subscription subscription = Observable.from(items)
                                      .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
                                      .map(new Func1<String, Integer>() {
                                                  @Override public Integer call(String s) {
                                                      return Integer.valueOf(s);
                                                  }
                                              })
                                      .subscribe(onNext);
//TODO subscription.unsubscribe();   

//2.X
public static final ObservableTransformer IO_TRANSFORMER = new ObservableTransformer() {
    @Override public ObservableSource apply(Observable upstream) {
        return upstream.subscribeOn(Schedulers.io())
                       .unsubscribeOn(Schedulers.io())
                       .observeOn(Schedulers.io());
    }
};
public static final <T> ObservableTransformer<T, T> applySchedulers(ObservableTransformer transformer){
    return (ObservableTransformer<T, T>)transformer;
}
Consumer<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Disposable disposable = Observable.fromArray(items)
                                  .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
                                  .map(new Function<String, Integer>() {
                                              @Override public Integer apply(String s) throws Exception {
                                                  return Integer.valueOf(s);
                                              }
                                          })
                                  .subscribe(onNext);
//TODO disposable.dispose();
  • .subscribe(...)返回值的变化:1.X为Subscription, 2.X为Disposable
  • Transformer的变化:1.X为rx.Observable内部的Transformer接口, 继承自Func1<Observable<T>, Observable<R>>, 2.X为io.reactivexObservableTransformer<Upstream, Downstream>,是一个独立的接口
  • AndroidSchedulers的变化: 1.X为rx.android.schedulers.AndroidSchedulers, 2.X为io.reactivex.android.schedulers.AndroidSchedulers
  • Func1的变化: 1.X为rx.functions.Func1, 2.X为io.reactivex.functions.Function
  • 其它重载方法见下方截图

1.X

2.X

案例2

//1.X
public class AppBaseActivity extends AppCompatActivity {
    ...
    private CompositeSubscription mCompositeSubscription;
    
    protected void addSubscription(Subscription subscription) {
        if (null == mCompositeSubscription) {
            mCompositeSubscription = new CompositeSubscription();
        }
        mCompositeSubscription.add(subscription);
    }

    @Override protected void onDestroy() {
        if (null != mCompositeSubscription) {
            mCompositeSubscription.unsubscribe();
        }
        super.onDestroy();
    }
    ...
}

//2.X
public class AppBaseActivity extends AppCompatActivity {
    ...
    private   CompositeDisposable mCompositeDisposable;

    protected void addDisposable(Disposable disposable) {
        if (null == mCompositeDisposable) {
            mCompositeDisposable = new CompositeDisposable();
        }
        mCompositeDisposable.add(disposable);
    }

    @Override protected void onDestroy() {
        if (null != mCompositeDisposable) {
            mCompositeDisposable.clear();
        }
        super.onDestroy();
    }
    ...
}

我现在维护的项目基本就这些改动,如有错误之处,还望批评指正,谢谢!

作者:续写经典 链接:https://www.jianshu.com/p/2badfbb3a33b 來源:简书 简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编程微刊

三分钟教你学会如何将密文解码成明文

1724
来自专栏Java与Android技术栈

Cold Observable 和 Hot Observable

Hot Observable 无论有没有 Subscriber 订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时,Hot Observa...

1602
来自专栏技术博文

php des 加密解密实例

des加密是对称加密中在互联网应用的比较多的一种加密方式,php 通过mcrypt扩展库来支持des加密,要在Php中使用des加密,需要先安装mcrypt扩展...

47110
来自专栏web开发

JavaScript前端和Java后端的AES加密和解密

在实际开发项目中,有些数据在前后端的传输过程中需要进行加密,那就需要保证前端和后端的加解密需要统一。这里给大家简单演示AES在JavaScript前端和Java...

6476
来自专栏三流程序员的挣扎

RxJava 创建操作符

内部触发对 Observer 的 onNext 方法的调用,just 中传递的参数将直接在 onNext 方法中接收到,参数的类型要和 Observer 的泛型...

2391
来自专栏小工匠技术圈

【小工匠聊密码学】--消息摘要--SHA算法

1905
来自专栏数据结构与算法

08:Vigenère密码

08:Vigenère密码 总时间限制: 1000ms 内存限制: 65536kB描述 16世纪法国外交家Blaise de Vigenère设计了一种多表...

4579
来自专栏张善友的专栏

.NET中的DES对称加密

DES是一种对称加密(Data Encryption Standard)算法,于1977年得到美国政府的正式许可,是一种用56位密钥来加密64位数据的方法。一般...

21510
来自专栏Hongten

PBE_Password-based encryption(基于密码加密)_项目中你也可以有

中说道了PBE——Password-based encryption(基于密码加密)。我也做测试了一下,现在把我做的效果给大家演示一下:

1001
来自专栏三流程序员的挣扎

RxJava 条件和布尔操作符

多个 Observable 中,无论发射的是 onNext 还是 onComplete 或者 onError,只接受第一个发射数据的那个 Observable,...

961

扫码关注云+社区

领取腾讯云代金券