转载请以链接形式标明出处: 本文出自:103style的博客
RxJava
之 过滤操作符 官方介绍 :Filtering Observables
debounce
distinct
distinctUntilChanged
elementAt
elementAtOrError
filter
first
firstElement
firstOrError
ignoreElement
ignoreElements
last
lastElement
lastOrError
ofType
sample
skip
skipLast
take
takeLast
throttleFirst
throttleLast
throttleLatest
throttleWithTimeout
timeout
丢弃超过debounce
设置的时间的事件
官方示例:
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(1500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2000);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
onNext: A
onNext: D
onNext: E
onComplete
过滤相同的事件
官方示例:
Observable.just(2, 3, 4, 4, 2, 1)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
2
3
4
1
过滤连续的相同事件流
官方示例:
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
1
2
1
2
3
4
获取事件流中从零开始的第指定下标的元素
官方示例:
Observable.range(0, 10)
.elementAt(5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
5
索引不存在则走onError
官方示例:
Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);
element.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " + throwable);
}
});
输出:
onError: java.util.NoSuchElementException
自定义过滤规则
官方示例:
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
2
4
6
获取事件流中第一个事件,返回值为 Single
官方示例:
Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出:
A
获取事件流中第一个事件,返回值为 Maybe
官方示例:
Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> firstOrDefault = source.firstElement();
firstOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出:
A
输出第一个事件并捕获异常。
官方示例:
Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " + throwable);
}
});
输出:
onError: java.util.NoSuchElementException
过滤一个事件
官方示例:
Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("Done!");
}
}).blockingAwait();
输出:
Done!
过滤所有事件
官方示例:
Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("Done!");
}
}).blockingAwait();
输出:
Done!
获取事件流中最后一个事件,返回值为 Single
官方示例:
Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出:
C
获取事件流中最后一个事件, 返回值为 Maybe
官方示例:
Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> lastOrDefault = source.lastElement();
lastOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出:
C
同 firstOrError
官方示例:
Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " + throwable);
}
});
输出:
onError: java.util.NoSuchElementException
根据类型过滤
官方示例:
Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
1
3
7
仅在周期性时间间隔内发出最近发出的事件来过滤事件流中的事件。
官方示例:
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
// 700(500 + 200)
//1500(500 + 200 + 800)
//2100(500 + 200 + 800 + 600)
onNext: C
onNext: D
onComplete
跳过事件流中开头的指定个数事件
官方示例:
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skip(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
5
6
7
8
9
10
跳过事件流中结尾的指定个数事件
官方示例:
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skipLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
1
2
3
4
5
6
取事件流中开头的指定个数事件
官方示例:
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.take(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
1
2
3
4
取事件流中结尾的指定个数事件
官方示例:
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.takeLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出:
7
8
9
10
和 sample
相反 去指定连续时间内的第一个事件
官方示例:
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
onNext: A
onNext: D
onComplete
和 sample
一样 去指定连续时间内的最后一个事件
官方示例:
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleLast(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
onNext: C
onNext: D
onComplete
发出事件流中的事件,然后在它们之间经过指定的超时时定期发出最新项目(如果有)。
官方示例:
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleLatest(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
onNext: A
onNext: C
onNext: D
onComplete
官方示例:
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(1500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2000);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleWithTimeout(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
onNext: A
onNext: D
onNext: E
onComplete
在超时时间内发出每一个事件,如果超过超时事件则报错
官方示例:
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(800);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.timeout(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出:
onNext: A
onNext: B
onNext: C
java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.
以上