RxJava2 提供了多种条件过滤器操作符,用于根据特定条件筛选或转换 Observable 发出的数据项。以下是常用的条件过滤器操作符及其用法:
filter()
- 基本过滤filter()
操作符只允许满足条件的数据项通过。
Observable.just(1, 2, 3, 4, 5)
.filter(item -> item % 2 == 0) // 只保留偶数
.subscribe(System.out::println);
// 输出: 2, 4
take()
- 取前N个元素take()
操作符只发出前N个数据项,然后完成。
Observable.range(1, 10)
.take(3) // 只取前3个
.subscribe(System.out::println);
// 输出: 1, 2, 3
takeLast()
- 取最后N个元素takeLast()
操作符只发出最后N个数据项。
Observable.range(1, 5)
.takeLast(2) // 取最后2个
.subscribe(System.out::println);
// 输出: 4, 5
takeWhile()
- 满足条件时持续取值takeWhile()
在条件为true时继续发出数据,直到条件第一次变为false。
Observable.range(1, 10)
.takeWhile(i -> i < 5) // 当i<5时继续发出
.subscribe(System.out::println);
// 输出: 1, 2, 3, 4
takeUntil()
- 直到另一个Observable发出数据takeUntil()
在另一个Observable发出数据前继续发出数据。
Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS)
.takeUntil(observable -> {
// 3秒后发出数据,停止原Observable
Observable.timer(3, TimeUnit.SECONDS).subscribe();
return Observable.timer(3, TimeUnit.SECONDS);
})
.blockingSubscribe(System.out::println);
// 输出前30个数字(约3秒)
skip()
- 跳过前N个元素skip()
跳过前N个数据项。
Observable.range(1, 5)
.skip(2) // 跳过前2个
.subscribe(System.out::println);
// 输出: 3, 4, 5
skipLast()
- 跳过最后N个元素skipLast()
跳过最后N个数据项。
Observable.range(1, 5)
.skipLast(2) // 跳过最后2个
.subscribe(System.out::println);
// 输出: 1, 2, 3
skipWhile()
- 跳过满足条件的元素skipWhile()
跳过条件为true的元素,直到条件第一次变为false。
Observable.range(1, 5)
.skipWhile(i -> i < 3) // 跳过i<3的元素
.subscribe(System.out::println);
// 输出: 3, 4, 5
skipUntil()
- 跳过直到另一个Observable发出数据skipUntil()
在另一个Observable发出数据前跳过所有数据。
Observable.intervalRange(1, 5, 0, 100, TimeUnit.MILLISECONDS)
.skipUntil(Observable.timer(300, TimeUnit.MILLISECONDS))
.blockingSubscribe(System.out::println);
// 跳过前3个,输出后2个
distinct()
- 去重distinct()
只发出唯一的数据项。
Observable.just(1, 2, 2, 3, 4, 4, 5)
.distinct()
.subscribe(System.out::println);
// 输出: 1, 2, 3, 4, 5
distinctUntilChanged()
- 相邻去重distinctUntilChanged()
只有当当前项与前一项不同时才发出。
Observable.just(1, 2, 2, 3, 3, 3, 4, 5, 5)
.distinctUntilChanged()
.subscribe(System.out::println);
// 输出: 1, 2, 3, 4, 5
elementAt()
- 获取指定位置的元素elementAt()
获取指定索引处的元素(索引从0开始)。
Observable.just(10, 20, 30, 40)
.elementAt(2) // 获取第3个元素
.subscribe(System.out::println);
// 输出: 30
first()
/ last()
- 获取第一个/最后一个元素Observable.just(5, 3, 8, 1)
.first() // 获取第一个元素
.subscribe(System.out::println);
// 输出: 5
Observable.just(5, 3, 8, 1)
.last() // 获取最后一个元素
.subscribe(System.out::println);
// 输出: 1
single()
- 确保只有一个元素single()
确保Observable只发出一个元素,否则抛出异常。
Observable.just(42)
.single()
.subscribe(System.out::println);
Observable.just(1, 2)
.single() // 抛出异常,因为有多个元素
.subscribe(System.out::println, Throwable::printStackTrace);
takeWhile()
和 takeUntil()
的组合使用Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS)
.takeWhile(i -> i < 5) // 先取i<5
.takeUntil(i -> i == 3) // 再取到i==3时停止
.blockingSubscribe(System.out::println);
// 输出: 0, 1, 2, 3