前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Android 中 RxJava 的使用

Android 中 RxJava 的使用

作者头像
码客说
发布2019-10-22 16:55:33
2.1K0
发布2019-10-22 16:55:33
举报
文章被收录于专栏:码客码客

前言

Android原生的多线程和异步处理简直糟透了,反复的嵌套让代码看起来十分不明了,多线程上也没有iOSdispatch好用,但是用了Rxjava后就会有所改善,虽然代码量看起来会多一点,但是逻辑清晰多了

本文代码对应的是Rxjava2

真前言

总的来说Rxjava可以分为5块内容 分别为

  • 发布者(Observable/Flowable/Single/Completable)
  • 订阅者(Subscriber)
  • 中转站(Subject)
  • 线程(Scheduler)
  • 操作符

形象的来说

  • 发布者 就相当于 报社
  • 订阅者 就相当于 用户
  • 中转站 就相当于 报亭 它既是订阅者 又是发布者
  • 线程 是指定在哪个线程上处理
  • 操作符 则是把发布者的数据进行处理,再给订阅者

在发布者和订阅者之间传递的事件总共有三种

  • onNext(): 发送事件的数据
  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted()onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

下面就说一下各块内容

发布者

对比

  • Observable/Flowable: Observable不支持背压(backpressure) Flowable是Rxjava2新增加的支持背压(backpressure) 背压(backpressure):只有上下游运行在各自的线程中,且上游发射数据速度大于下游接收处理数据的速度时,才会产生背压问题。 如果上游发送数据速度远大于下游接收数据的速度 用Observable就会内存溢出 Flowable则会抛弃掉处理不了的数据来防止溢出 但是不能就都用Flowable 因为Observable的性能较高
  • Single: 和Observable,Flowable一样会发送数据,不同的是订阅后只能接受到一次 普通Observable可以使用toSingle转换:Observable.just(1).toSingle()
  • Completable 与Single类似,只能接受到一次完成(onComplete)或错误(onError) 同样也可以由普通的Observable转换而来:Observable.just(1).toCompletable()

发布者发布事件 可以手动创建也可以调用内置方法

Observable

代码语言:javascript
复制
Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {

        }
    })
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String s) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

Flowable

代码语言:javascript
复制
Flowable
    .create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> emitter) throws Exception {

        }
    }, BackpressureStrategy.DROP)
    .subscribe(new FlowableSubscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {

        }

        @Override
        public void onNext(String s) {

        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    });

Single

代码语言:javascript
复制
Single
    .create(new SingleOnSubscribe<String>() {
        @Override
        public void subscribe(SingleEmitter<String> emitter) throws Exception {

        }
    })
    .subscribe(new SingleObserver<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onSuccess(String s) {

        }

        @Override
        public void onError(Throwable e) {

        }
    });

Completable

代码语言:javascript
复制
Completable
    .create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(CompletableEmitter emitter) throws Exception {

        }
    })
    .subscribe(new CompletableObserver() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onComplete() {

        }

        @Override
        public void onError(Throwable e) {

        }
    });

订阅者(Subscriber)

Observer/FlowableOnSubscribe/SingleOnSubscribe/CompletableOnSubscribe/Consumer/Subscriber

发布者

订阅者

Observable

Observer/Consumer

Flowable

FlowableOnSubscribe/Subscriber/Consumer

Single

SingleObserver/Consumer/BiConsumer

Completable

CompletableObserver/Action

创建

代码语言:javascript
复制
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

订阅

代码语言:javascript
复制
observable.subscribe(observer);

注意上面方法的顺序 看上去是发布者订阅了订阅者,之所以这样是因为链式代码的优雅

线程(Scheduler)

常用的方式是分线程中处理数据,主线程中使用数据生成页面

代码语言:javascript
复制
Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("发送的数据");
            emitter.onComplete();
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(String s) {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });

操作符

名称

解析

amb()ambArrayambWith

给定多个Observable,只让第一个发射数据的Observable发射全部数据

defaultIfEmpty()

发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据

switchIfEmpty()

如果原始Observable没有发射数据,它发射一个备用Observable的发射物

skipUntil()

跳过原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据

skipWhile()

判断成功的都跳过 一旦为假 发送剩余的所有数据

takeUntil()

发送为真包括以前的数据 不再处理后续数据

takeWhile()

发送为真的数据 一旦为假就不再处理后续数据

create

参见面发布者部分

just/range/fromArray

  • just
代码语言:javascript
复制
Observable observable = Observable.just("好好学习", "天天向上");
// 将会依次调用:
// onNext("好好学习");
// onNext("天天向上");
// onCompleted();
  • range
代码语言:javascript
复制
Observable.range(1,10);
  • fromArray
代码语言:javascript
复制
String[] quotations = {"好好学习", "天天向上"};
Observable observable = Observable.fromArray(quotations);

interval/timer

代码语言:javascript
复制
//延迟10s每10s发送一次
Observable.interval(10,10, TimeUnit.SECONDS);
//延迟10s发送一次
Observable.timer(10,TimeUnit.SECONDS);

throttleFirst/throttleLast

throttleFirst操作符:仅发送指定时间段内的第一个信号

throttleLast操作符:仅发送指定时间段内的第一个信号

代码语言:javascript
复制
RxView.clicks(mBtn)
      .throttleFirst(1, TimeUnit.SECONDS);

debounce

指定时间段内没有新的信号时 则发出最后一个信号

比如监听文本变化进行搜索

代码语言:javascript
复制
RxTextView.textChanges(etKey)
          .debounce(400, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread());

map

类型变换

代码语言:javascript
复制
String[] strs = {"11","22","33"};
Observable
    .fromArray(strs)
    .map(new Function<String, Integer>() {
        @Override
        public Integer apply(String s) throws Exception {
            return Integer.valueOf(s);
        }
    })
    .subscribe(observer);

concatMap

concatMap(): 这是一个很有用但非常难理解的变换。

首先假设这么一种需求:上面的{"11","22","33"}我们像最终获取到1,1,2,2,3,3

代码语言:javascript
复制
String[] strs = {"11","22","33"};
Observable
    .fromArray(strs)
    .concatMap(new Function<String, ObservableSource<Integer>>() {
        private Subject<Integer> subject = PublishSubject.create();
        @Override
        public ObservableSource<Integer> apply(String s) throws Exception {
            for (char c:s.toCharArray()){
                subject.onNext(Integer.valueOf(""+c));
            }
            subject.onComplete();
            return subject;
        }
    }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {

    }
});

map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化,就需要用 flatMap()

Kotlin

代码语言:javascript
复制
Observable
    .create<Int> {
        for (i in 0 until 4) {
            it.onNext(i)
        }
        it.onComplete()
    }
    .concatMap {
        L.i("concatMap:${it}")
        var value = it
        return@concatMap Observable.create<String> {
            Thread.sleep((Math.random() * 1000).toLong())
            it.onNext("${value + 100}")
            it.onComplete()
        }
    }.subscribe({
        L.i("最后:${it}")
    }, {

    }, {
        L.i("事件完成")
    })

注意concatMap中一定要发送onComplete事件

flatMap

flatMap和concatMap最大的区别是concatMap发射的数据集是有序的,flatMap发射的数据集是无序的

filter

过滤

假如我们要大于5的数

代码语言:javascript
复制
Integer[] nums = {3, 4, 5, 6, 7};
Observable
    .fromArray(nums)
    .filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            if (integer < 5) {
                return false;
            } else {
                return true;
            }
        }
    })
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {

        }
    });

defaultIfEmpty

当未发送onNext直接发送onCompleteonNext收到的默认值

代码语言:javascript
复制
Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onComplete();
        }
    })
    .defaultIfEmpty("默认数据")
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
            Log.e(TAG, "accept: " + s);
        }
    });

switchEmpty

如果发射源没有发射数据就完成了,就发射switchIfEmpty里面新的Observable发射源

代码语言:javascript
复制
Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onComplete();
        }
    })
    .switchIfEmpty(Observable.just("a", "b", "c"))
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
            Log.e(TAG, "accept: " + s);
        }
    });

zip

代码语言:javascript
复制
Observable
    .zip(
        Observable.just("100", "200"),
        Observable.just("1","2","3"),
        new BiFunction<String, String, Integer>() {
            @Override
            public Integer apply(String s, String s2) throws Exception {
                return Integer.valueOf(s) + Integer.valueOf(s2);
            }
        })
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            L.i(""+integer);
        }
    });

上面的代码会收到 101、202

也就是说多个Observable都发送时 才处理数据

amb/ambArray/ambWith

给定多个Observable,只让第一个发射数据的Observable发射全部数据。

take/takeWhile/takeUntil

take

代码语言:javascript
复制
//取前两个信号
.take(2);
//取后两个信号
.takeLast(2);
//取前1s的信号
.take(1,TimeUnit.SECONDS);
//取后1s的信号
.takeLast(1,TimeUnit.SECONDS);

takeWhile

代码语言:javascript
复制
//发送为真的数据 一旦为假就不再处理后续数据
//会收到1、2
Observable
    .just(1, 2, 3,2)
    .takeWhile(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer<3;
        }
    });

takeUntil

代码语言:javascript
复制
//发送为真包括以前的数据 不再处理后续数据
//会收到1、2
Observable
    .just(1, 2, 3,2)
    .takeUntil(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer >1;
        }
    })
  
//获取原始Observable发射的数据,直到第二个Observable发射了一个数据,不再发送原始Observable的剩余数据
//会收到1,2,3,2
Observable
    .just(1, 2, 3,2)
    .takeUntil(Observable.just("3").delay(1,TimeUnit.SECONDS))
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            L.i("" + integer);
        }
    });

skip/skipWhile/skipUntil

skip

代码语言:javascript
复制
//取前两个信号
.skip(2);
//取后两个信号
.skipLast(2);
//取前1s的信号
.skip(1,TimeUnit.SECONDS);
//取后1s的信号
.skipLast(1,TimeUnit.SECONDS);

skipWhile:判断成功的都跳过 一旦为假 发送剩余的所有数据

代码语言:javascript
复制
Observable
    .just(1, 2, 3,2)
    .skipWhile(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer<2;
        }
    })

会收到2、3、2 判断成功的都跳过 一旦为假 发送剩余的所有数据


skipUntil:跳过原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据

代码语言:javascript
复制
Observable
    .just(1, 2, 3,2)
    .skipUntil(Observable.just("3").delay(1,TimeUnit.SECONDS))
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            L.i("" + integer);
        }
    });

收不到数据 因为第二个Observable延迟1s结束后 原始Observable已经没有剩余数据了

中转站(Subject)

Rxjava和Rxjava2对比

io.reactivex.subjects.AsyncSubject, io.reactivex.subjects.BehaviorSubject, io.reactivex.subjects.PublishSubject, io.reactivex.subjects.ReplaySubject, io.reactivex.subjects.UnicastSubject

在RxJava2中依然存在,但现在他们不支持backpressure

新出现的

io.reactivex.processors.AsyncProcessor, io.reactivex.processors.BehaviorProcessor, io.reactivex.processors.PublishProcessor, io.reactivex.processors.ReplayProcessor io.reactivex.processors.UnicastProcessor

支持backpressure

Subject 在平时开发时 用的不是很多

它分为四种

  • PublishSubject(之后)
  • BehaviorSubject(前一个事件+之后)
  • ReplaySubject(所有事件)
  • AsyncSubject(最后事件)

用法如下

代码语言:javascript
复制
observable.subscribe(subject);
subject.subscribe(observer);

区别

假如发布者 也就是报社 只发布周一到周五的报纸 一天一份 如果我们在周三早上来报厅订报

  • 如果报厅是PublishSubject 我们可以收到 周三 周四 周五的报纸
  • 如果报厅是BehaviorSubject 我们可以收到 周二 至 周五的报纸
  • 如果报厅是ReplaySubject 我们可以收到 周一 至 周五的报纸
  • 如果报厅是AsyncSubject 我们可以收到 周五的报纸 但是发布的事件中如果有错误 那我们只会接受到错误不是错误的前一个事件

Android中应用

添加依赖

代码语言:javascript
复制
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'io.reactivex.rxjava2:rxjava:2.1.10'

implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'
//网络请求库
implementation 'com.lzy.net:okgo:3.0.4'
implementation 'com.lzy.net:okrx2:2.0.2'
//JSON转换
implementation 'com.alibaba:fastjson:1.2.46'
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-08-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 真前言
  • 发布者
    • Observable
      • Flowable
        • Single
          • Completable
          • 订阅者(Subscriber)
          • 线程(Scheduler)
          • 操作符
            • create
              • just/range/fromArray
                • interval/timer
                  • throttleFirst/throttleLast
                    • debounce
                      • map
                        • concatMap
                          • flatMap
                            • filter
                              • defaultIfEmpty
                                • switchEmpty
                                  • zip
                                    • amb/ambArray/ambWith
                                      • take/takeWhile/takeUntil
                                        • skip/skipWhile/skipUntil
                                        • 中转站(Subject)
                                        • Android中应用
                                        领券
                                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档