Android-RxJava(下)

上一篇我们说了Android-RxJava(上)主要包括组合操作符,变换操作符,创建操作符,我们再接再厉,继续下半部分内容,也就是剩余的操作符:

3.4 过滤操作符

含义:过滤/筛选 被观察者发送的事件。

3.4.1 filter

过滤操作符filter(),通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。 相关代码:

private void rxJavaFilter(){
        Observable.just(1,2,3,4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer < 3;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

结果:

image.png

可看到我们要求打印integer小于3的,所以打印了1和2.

3.4.2 ofType

过滤操作符ofType(),可以过滤不符合该类型事件。

private void rxJavaOfType(){
        Observable.just(1,2,3,4,"薛之涛").ofType(String.class).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(String integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

结果:

image.png

我们要求过滤Integer数据类型,留下String类型,打印结果正确!

3.4.3 skip 或 skipLast

过滤操作符skip,跳过正序部分事件,参数为跳过前多少个事件。

    Observable.just(1,2,3,4).skip(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

结果:

image.png

skipLast和skip操作符相反,它是跳过后多少个事件打印其之前的事件

3.4.4 distinct 或 distinctUntilChanged

过滤操作符distinct,过滤事件序列中的重复事件 代码:

private void rxJavadistinct(){
        Observable.just(1,2,3,3,3,4,2).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

结果:

image.png

  • distinctUntilChanged 过滤操作符distinctUntilChanged ()过滤掉连续重复的事件 代码:
Observable.just(1,2,3,3,3,2,1).distinctUntilChanged().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

结果:

image.png

3.4.5 take 或 takeLast

过滤操作符take(),控制观察者接收的事件的数量。 代码:

Observable.just(1,2,3,3,3,2,1).take(3).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

结果:

image.png

  • takeLast()操作符,控制观察者只能接受事件序列的后面几个请求 代码:
Observable.just(1,2,3,3,3,2,1).takeLast(3).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

结果:

image.png

3.4.6 elementAt 或 elementAtOrError()

过滤操作符 elementAt(),可以指定取出事件序列中事件,下标从0开始,但如果指定的index大于总的事件序列数,则无反应 代码:

 /**
     * 过滤操作符elementAt,指定队列中的事件下标,取出该事件
     */
    private void rxjavaElementAt(){
        Observable.just(1,2,3,4).elementAt(3).subscribe(new Consumer<Integer>(){

            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: "+integer);
            }
        });
    }

结果:

image.png

  • elementAtOrError() 操作符和elementAt不同的是,其在下标超出队列数后会报 NoSuchElementException 异常.
  • irstElement() 取事件序列的第一个元素
  • lastElement() 取事件序列的最后一个元素
3.4.7 throttleFirst 或 throttleLast

过滤操作符throttleFirst (),可以和rxbinding2结合使用和绑定view的点击事件,防止在规定时间内多次点击,也就是防止规定事件防止重复点击。先加入rxbinding2依赖。 implementation 'com.jakewharton.rxbinding2:rxbinding:2.0.0' 代码:

/**
     * throttleFirst操作符绑定view的点击事件,防止在规定时间内多次点击
     */
    int clickNum =1;
    private void rxjavathrottleFirst(){
        TextView tv=findViewById(R.id.tv);

        RxView.clicks(tv).throttleFirst(3,TimeUnit.SECONDS).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                Log.d(TAG, "accept: "+"第"+clickNum+"次点击了TextView");
                clickNum ++ ;
            }
        });
    }

结果:

image.png

我在程序运行期间不断点击TextView,结果只打印了3秒间隔之后的第一次点击。

-throttleWithTimeout throttleWithTimeout() 操作符,如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。

3.5条件操作符

含义:通过指定条件,判断是否接收被观察者发送的事件。

3.5.1 all

条件操作符all(),主要用来判断所有事件是否满足.如果都满足则返回 true,反之则返回 false 代码:

/**
     * 条件操作符,all如果都满足则返回 true,反之则返回 false
     */
    private void rxJavaAll(){
        Observable.just(1,2,3,4).all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer < 5;
            }
        }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d(TAG, "accept: "+aBoolean);
            }
        });
    }

结果:

image.png

3.5.2 takeWhile

条件操作符takeWhile(),当判断发送的事件不满足条件时,就会终止后续事件的接受 代码:

/**
     * 某个数据满足条件时就会发送该数据,反之则不发送
     */
    private void rxJavaTakeWhile(){
        Observable.just(3,2,1,4).takeWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer >= 2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: "+integer);
            }
        });
    }

结果:

image.png

3.5.3 skipWhile

条件操作符skipWhile(),当判断发送的事件不满足条件时,才接受后续事件,反之亦然。 代码:

/**
* 满足条件的事件不发送,不满足时发送其及其之后的事件,注意其之后的数据是不判断的
     */
    private void rxJavaSkipWhile(){
        Observable.just(1,2,4,3).skipWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                Log.d(TAG, "test: "+integer);
                return integer >= 3 ;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: "+integer);
            }
        });
    }

结果:

image.png

3.5.4 takeUntil

条件操作符takeUntil(),满足条件时,其之后的事件不会被发送 代码:

 private void rxJavaTakeUntil(){
        Observable.just(1,2,4,3).takeUntil(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                Log.d(TAG, "test: "+integer);
                return integer >= 3 ;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: "+integer);
            }
        });
    }

结果:

image.png

3.5.6 sequenceEqual

条件操作符sequenceEqual(),判断两个非观察者发送的事件是否一样 代码:

private void rxJavaSequenceEqual(){
        Observable.sequenceEqual(Observable.just(1, 2, 3),
                Observable.just(1,2,3))
                .subscribe(new Consumer < Boolean > () {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        Log.d(TAG, "accept====" + aBoolean);
                    }
                });
    }

结果:

注意如果是:Observable.just(1, 2, 3) 和Observable.just(3,2,1)比较返回结果为false,是有顺序之分的

3.6.7 contains

条件操作符contains(),判断是否包含指定数据 代码:

private void rxJavaContains(){
        Observable.just(1, 2, 3)
                .contains(4)
                .subscribe(new Consumer < Boolean > () {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        Log.d(TAG, "accept====" + aBoolean);
                    }
                });
    }

结果:

image.png

3.6.8 isEmpty

条件操作符sEmpty(),判断发送的数据是否为空,如果事件序列中元素为空则返回true 代码:

 private void rxJavaIsEmpty() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onComplete();
            }
        })
                .isEmpty()
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        Log.d(TAG, "accept===" + aBoolean);
                    }
                });
    }

结果:

image.png

3.6其他操作符

含义:被观察者发送事件时,进行功能性拓展。

3.6.1 功能操作符
  • doOnEach doOnEach 操作符:Observable 每发送一件事件之前都会先回调这个方法。 代码:
/**
     *doOnEach(),Observable 每发送一件事件之前都会先回调这个方法。
     */
    private void rxjavaDoOnEach(){
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).doOnEach(new Consumer<Notification<Integer>>() {

            @Override
            public void accept(Notification<Integer> integerNotification) throws Exception {
                Log.d(TAG, "accept: "+"执行了doOnEach获取的元素值为:"+integerNotification.getValue());
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

结果:

image.png

  • doOnLifecycle 在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅,第二个参数则是与 doOnDispose() 一样,在调用 Disposable 的 dispose() 之后回调该方法** 我们先看一个代码:
 Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
                Log.d(TAG, "subscribe: ");
            }
        }).doOnLifecycle(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.d(TAG, "doOnLifecycle ===accept: ");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doOnLifecycle ===Action: ");
            }
        }).doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doOnDispose === run: ");
            }
        }).subscribe(new Observer<Integer>() {
            Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
                mDisposable =d;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
                //此处取消订阅
                mDisposable.dispose();

            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

结果:

image.png

可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。那我们如果使用 doOnLifecycle 进行取消订阅,来看看结果: 代码:

 Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();

            }
        }).doOnLifecycle(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.d(TAG, "doOnLifecycle ===accept: ");
                disposable.dispose();
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doOnLifecycle ===Action: ");
            }
        }).doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doOnDispose === run: ");
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
               
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

结果:

image.png

可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调。 其余的我就不写代码了,大家也都能明白.

  • doAfterNext Observable 每发送 onNext() 之后都会回调这个方法。
  • doOnNext Observable 每发送 onNext() 之前都会先回调这个方法。
  • doOnComplete Observable 每发送 onComplete() 之前都会回调这个方法。
  • doOnError Observable 每发送 onError() 之前都会回调这个方法。
  • doOnSubscribe Observable 每发送 onSubscribe() 之前都会回调这个方法
  • doOnDispose 当调用 Disposable 的 dispose() 之后回调该方法。
  • doOnTerminate & doAfterTerminate doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。
  • doFinally() 在所有事件发送完毕之后回调该方法。
3.6.2 出现错误或异常处理操作符
  • onErrorReturn 当接受到一个 onError() 事件之后回调,将不再走onError回调,返回的值会回调 onNext() 方法,,并正常结束该事件序列。 代码:
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new NullPointerException());

            }
        }).onErrorReturn(new Function<Throwable,Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Exception {
                Log.d(TAG, "onErrorReturn ==== apply: "+throwable);
                return 500;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

结果:

image.png

  • onErrorResumeNext 当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。 代码:
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new NullPointerException());

            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                Log.d(TAG, "onErrorResumeNext ==== apply: "+throwable);
                return Observable.just(4, 5, 6);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

结果:

image.png

  • retry retry(),如果出现错误事件,则会重新发送所有事件序列。参数是代表重新发的次数。 代码:
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new NullPointerException());
            }
        }).retry(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

结果:

image.png

3.6.3 事件重发操作符
  • repeat 重复发送被观察者的事件,times 为发送次数。 代码:
bservable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.repeat(2)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});

结果:

image.png

  • repeatWhen 这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件,具体就不说了都一样的。
  • repeatUntil 根据条件决定是否执行,其实现方法getAsBoolean()的返回值如果一直是false,则无限执行,当返回值是true时则立即中断执行。
3.6.4 被观察者延迟发送操作符
  • delay 延迟一段事件发送事件,没啥说的。
3.6.5 发送事件超时处理操作符
  • timeout 这个timeout有多个不同类型参数:

timeout(long timeout, TimeUnit timeUnit):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出 TimeoutException,以一个错误通知终止Observable。 timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout 在超时时会切换到使用一个你指定的备用的 Observable。 timeout(Function<> itemTimeoutIndicator):timeout使用一个Function对原始Observable发射的每一项进行观察,如果当这个Function执行完但原始Observable还没有发射下一个数据时,系统就会认为是超时了,timeout 就抛出 TimeoutException,以一个错误通知终止原始Observable。

就先说这么多吧,告辞!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券