前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava之过滤操作符介绍

RxJava之过滤操作符介绍

作者头像
103style
发布2022-12-19 13:27:59
3160
发布2022-12-19 13:27:59
举报

转载请以链接形式标明出处: 本文出自:103style的博客

过滤相关的操作符 以及 官方介绍

RxJava过滤操作符 官方介绍 :Filtering Observables


debounce

丢弃超过debounce设置的时间的事件

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");

        Thread.sleep(1500);
        emitter.onNext("B");

        Thread.sleep(500);
        emitter.onNext("C");

        Thread.sleep(250);
        emitter.onNext("D");

        Thread.sleep(2000);
        emitter.onNext("E");
        emitter.onComplete();
    }
});

source.subscribeOn(Schedulers.io())
        .debounce(1, TimeUnit.SECONDS)
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

代码语言:javascript
复制
onNext: A
onNext: D
onNext: E
onComplete

distinct

过滤相同的事件

官方示例:

代码语言:javascript
复制
Observable.just(2, 3, 4, 4, 2, 1)
        .distinct()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

输出:

代码语言:javascript
复制
2
3
4
1

distinctUntilChanged

过滤连续的相同事件流

官方示例:

代码语言:javascript
复制
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
        .distinctUntilChanged()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

输出:

代码语言:javascript
复制
1
2
1
2
3
4

elementAt

获取事件流中从零开始的第指定下标的元素

官方示例:

代码语言:javascript
复制
Observable.range(0, 10)
        .elementAt(5)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

输出:

代码语言:javascript
复制
5

elementAtOrError

索引不存在则走onError

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);
element.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println("onSuccess will not be printed!");
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        System.out.println("onError: " + throwable);
    }
});

输出:

代码语言:javascript
复制
onError: java.util.NoSuchElementException

filter

自定义过滤规则

官方示例:

代码语言:javascript
复制
Observable.just(1, 2, 3, 4, 5, 6)
        .filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

输出:

代码语言:javascript
复制
2
4
6

first

获取事件流中第一个事件,返回值为 Single

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");

firstOrDefault.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

输出:

代码语言:javascript
复制
A

firstElement

获取事件流中第一个事件,返回值为 Maybe

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> firstOrDefault = source.firstElement();

firstOrDefault.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

输出:

代码语言:javascript
复制
A

firstOrError

输出第一个事件并捕获异常。

官方示例:

代码语言:javascript
复制
Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();

firstOrError.subscribe(
        new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("onSuccess will not be printed!");
            }
        },
        new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println("onError: " + throwable);
            }
        });

输出:

代码语言:javascript
复制
onError: java.util.NoSuchElementException

ignoreElement

过滤一个事件

官方示例:

代码语言:javascript
复制
Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();

completable.doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Done!");
    }
}).blockingAwait();

输出:

代码语言:javascript
复制
Done!

ignoreElements

过滤所有事件

官方示例:

代码语言:javascript
复制
Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();

completable.doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Done!");
    }
}).blockingAwait();

输出:

代码语言:javascript
复制
Done!

last

获取事件流中最后一个事件,返回值为 Single

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");

lastOrDefault.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

输出:

代码语言:javascript
复制
C

lastElement

获取事件流中最后一个事件, 返回值为 Maybe

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> lastOrDefault = source.lastElement();

lastOrDefault.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

输出:

代码语言:javascript
复制
C

lastOrError

firstOrError

官方示例:

代码语言:javascript
复制
Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();

lastOrError.subscribe(
        new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("onSuccess will not be printed!");
            }
        },
        new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println("onError: " + throwable);
            }
        });

输出:

代码语言:javascript
复制
onError: java.util.NoSuchElementException

ofType

根据类型过滤

官方示例:

代码语言:javascript
复制
Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);

integers.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println(integer);
    }
});

输出:

代码语言:javascript
复制
1
3
7

sample

仅在周期性时间间隔内发出最近发出的事件来过滤事件流中的事件。

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");

        Thread.sleep(500);
        emitter.onNext("B");

        Thread.sleep(200);
        emitter.onNext("C");

        Thread.sleep(800);
        emitter.onNext("D");

        Thread.sleep(600);
        emitter.onNext("E");
        emitter.onComplete();
    }
});

source.subscribeOn(Schedulers.io())
        .sample(1, TimeUnit.SECONDS)
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

代码语言:javascript
复制
// 700(500 + 200)   
//1500(500 + 200 + 800)   
//2100(500 + 200 + 800 + 600)
onNext: C
onNext: D
onComplete

skip

跳过事件流中开头的指定个数事件

官方示例:

代码语言:javascript
复制
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.skip(4)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

输出:

代码语言:javascript
复制
5
6
7
8
9
10

skipLast

跳过事件流中结尾的指定个数事件

官方示例:

代码语言:javascript
复制
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.skipLast(4)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

输出:

代码语言:javascript
复制
1
2
3
4
5
6

take

取事件流中开头的指定个数事件

官方示例:

代码语言:javascript
复制
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.take(4)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

输出:

代码语言:javascript
复制
1
2
3
4

takeLast

取事件流中结尾的指定个数事件

官方示例:

代码语言:javascript
复制
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.takeLast(4)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

输出:

代码语言:javascript
复制
7
8
9
10

throttleFirst

sample相反 去指定连续时间内的第一个事件

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");

        Thread.sleep(500);
        emitter.onNext("B");

        Thread.sleep(200);
        emitter.onNext("C");

        Thread.sleep(800);
        emitter.onNext("D");

        Thread.sleep(600);
        emitter.onNext("E");
        emitter.onComplete();
    }
});

source.subscribeOn(Schedulers.io())
        .throttleFirst(1, TimeUnit.SECONDS)
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

代码语言:javascript
复制
onNext: A
onNext: D
onComplete

throttleLast

sample一样 去指定连续时间内的最后一个事件

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");

        Thread.sleep(500);
        emitter.onNext("B");

        Thread.sleep(200);
        emitter.onNext("C");

        Thread.sleep(800);
        emitter.onNext("D");

        Thread.sleep(600);
        emitter.onNext("E");
        emitter.onComplete();
    }
});

source.subscribeOn(Schedulers.io())
        .throttleLast(1, TimeUnit.SECONDS)
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

代码语言:javascript
复制
onNext: C
onNext: D
onComplete

throttleLatest

发出事件流中的事件,然后在它们之间经过指定的超时时定期发出最新项目(如果有)。

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");

        Thread.sleep(500);
        emitter.onNext("B");

        Thread.sleep(200);
        emitter.onNext("C");

        Thread.sleep(800);
        emitter.onNext("D");

        Thread.sleep(600);
        emitter.onNext("E");
        emitter.onComplete();
    }
});

source.subscribeOn(Schedulers.io())
        .throttleLatest(1, TimeUnit.SECONDS)
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

代码语言:javascript
复制
onNext: A
onNext: C
onNext: D
onComplete

throttleWithTimeout

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");

        Thread.sleep(1500);
        emitter.onNext("B");

        Thread.sleep(500);
        emitter.onNext("C");

        Thread.sleep(250);
        emitter.onNext("D");

        Thread.sleep(2000);
        emitter.onNext("E");

        emitter.onComplete();
    }
});

source.subscribeOn(Schedulers.io())
        .throttleWithTimeout(1, TimeUnit.SECONDS)
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

代码语言:javascript
复制
onNext: A
onNext: D
onNext: E
onComplete

timeout

在超时时间内发出每一个事件,如果超过超时事件则报错

官方示例:

代码语言:javascript
复制
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");

        Thread.sleep(800);
        emitter.onNext("B");

        Thread.sleep(400);
        emitter.onNext("C");

        Thread.sleep(1200);
        emitter.onNext("D");

        emitter.onComplete();
    }
});

source.subscribeOn(Schedulers.io())
        .timeout(1, TimeUnit.SECONDS)
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

代码语言:javascript
复制
onNext: A
onNext: B
onNext: C
java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.

以上

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-06-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 过滤相关的操作符 以及 官方介绍
    • debounce
      • distinct
        • distinctUntilChanged
          • elementAt
            • elementAtOrError
              • filter
                • first
                  • firstElement
                    • firstOrError
                      • ignoreElement
                        • ignoreElements
                          • last
                            • lastElement
                              • lastOrError
                                • ofType
                                  • sample
                                    • skip
                                      • skipLast
                                        • take
                                          • takeLast
                                            • throttleFirst
                                              • throttleLast
                                                • throttleLatest
                                                  • throttleWithTimeout
                                                    • timeout
                                                    领券
                                                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档