public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
两个方法参数稍有不同,但用法含义一样。
多个 Observable 中,无论发射的是 onNext 还是 onComplete 或者 onError,只接受第一个发射数据的那个 Observable,其它 Observable 发射的数据都被忽略。
Observable.ambArray(Observable.just(1).delay(100, TimeUnit.MILLISECONDS), Observable.just(2))
.subscribe( { textView.text = "${textView.text}\n $it"})
ambWith 的内部也是 ambArray
public final Observable<T> ambWith(ObservableSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return ambArray(this, other);
}
发射的数据要相同,发射的顺序要相同,终止状态 也要相同。
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1))
.subscribe(Consumer<Boolean> {
textView.text = "${textView.text}\n 两个观察者 ${if(it) "相同" else "不同"}"
})
有一个重载方法,可以自己控制认为相等的规则
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual)
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(6, 7, 8)
, BiPredicate<Int, Int>{t1, t2 -> t1%5 == t2%5 })
.subscribe(Consumer<Boolean> {
textView.text = "${textView.text}\n 两个观察者 ${if(it) "相同" else "不同"}"
})
结果认为这两个 Observable 是一样的。
对发射的数据做判断,如果全都满足某种条件才返回 true,否则返回 false。
Observable.just(1,2,3,4).all { it%2==0 }
.subscribe { t ->
textView.text = "${textView.text}\n ${if(t) "都是偶数" else "不全是偶数"}"
}
有一个符合条件就是 true。
Observable.just(1,2,3,4).any { it%2==0 }
.subscribe { t ->
textView.text = "${textView.text}\n ${if(t) "有偶数" else "没有偶数"}"
}
包含某个数返回 true,否则返回 false。
Observable.just(1,2,3,4).contains(2)
.subscribe { t ->
textView.text = "${textView.text}\n ${if(t) "包含 2" else "不包含 2"}"
}
判断发射数据是否为空
// 非空
Observable.just(1,2,3,4).isEmpty
.subscribe { t ->
textView.text = "${textView.text}\n ${if(t) "空" else "非空"}"
}
// 空
Observable.create(ObservableOnSubscribe<Any> {
it.onComplete()
}).isEmpty.subscribe { t -> textView.text = "${textView.text}\n ${if(t) "空" else "非空"}"}
如果 Observable 正常终止后仍然没有发射任何数据,就发射一个默认值。
// 发射
// onNext(1)
Observable.create(ObservableOnSubscribe<Int> { emitter ->
emitter.onNext(1)
}).defaultIfEmpty(111).subscribe(observerInt)
// 发射
// onError()
Observable.create(ObservableOnSubscribe<Int> { emitter ->
emitter.onError(Throwable())
}).defaultIfEmpty(111).subscribe(observerInt)
// 发射
// onNext(111)
// onComplete()
Observable.create(ObservableOnSubscribe<Int> { emitter ->
emitter.onComplete()
}).defaultIfEmpty(111).subscribe(observerInt)
如果 Observable 正常终止后仍然没有发射任何数据,就使用另一个 Observable。defaultIfEmpty 只能发一个值,而 switchIfEmpty 可以通过 Observable 返回多个值。
// onNext(1)
// onNext(2)
// onComplete
val other = Observable.just(1,2)
Observable.empty<Int>().switchIfEmpty(other).subscribe(observerInt)
Observable.create(ObservableOnSubscribe<Int> { it.onComplete() })
.switchIfEmpty(other).subscribe(observerInt)