把每个 Observable 最近发射的数据组合在一起。
共有 13 个重载方法。
// 前两个参数是两个 Observable,内部是将这几个 Observable 封装成数组调用下面的重载方法
public static <T1, T2, R> Observable<R> combineLatest(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> combiner)
// 还有 7 个方法,区别在于前面的参数分别是 3 个,4 个 ... 9 个 Observable
// 下面两个方法的区别在于一个没有 buffersize 参数,内部会获取一个默认的大小。
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super Object[], ? extends R> combiner)
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super Object[], ? extends R> combiner, int bufferSize)
// 下面两个方法和上面方法的区别只在于第一个参数,一个是集合,一个是数组,本质是一样的
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
Function<? super Object[], ? extends R> combiner)
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
Function<? super Object[], ? extends R> combiner, int bufferSize)
// 这个方法内部就是简单的调用了上面的方法
public static <T, R> Observable<R> combineLatest(Function<? super Object[], ? extends R> combiner, int bufferSize, ObservableSource<? extends T>... sources)
现在看上面的最后一个方法,最后一个参数是多个 Observable,第二个参数 bufferSize 是内部缓冲队列的大小。
被组合的被观察者的数据类型是 T,组合后变成一个数组 Object[],在第一个参数 Function 里被转换成 R,最后的被观察者就是发射 R 类型的数据。Function 的第一个泛型是 Object 数组,所以可以组合不同类型的被观察者。
val observableInt = Observable.just(1, 2, 3, 4, 5)
val observableStr = Observable.just("a","b","c")
Observable.combineLatest(Function<Array<Any>, String> {
it.map { it.toString() }.reduce { acc, i -> "$acc-$i"}
}, 3, observableInt, observableStr)
.subscribe { Log.e("RX", "$it") }
由于上面的例子都在主线程,也没控制进度,按顺序发送两个 Observable,所以第一个 Observable 发送结束了才发第二个,observableInt 最近的就是 5,结果是
5-a
5-b
5-c
再看一个例子
// computation 线程上
val ob1 = Observable.intervalRange(0, 5, 500, 500, TimeUnit.MILLISECONDS)
// io 线程上,两个错开
val ob2 = Observable.create(ObservableOnSubscribe<String> {
Thread.sleep(200)
it.onNext("a")
Thread.sleep(500)
it.onNext("b")
Thread.sleep(500)
it.onError(Throwable())
}).subscribeOn(Schedulers.io())
Observable.combineLatest(Function<Array<Any>, String> {
it.map { it.toString() }.reduce { acc, i -> "$acc-$i"}
}, 1, ob1, ob2)
.subscribe(object : Observer<String> { // 没转到 UI 线程,别去修改 UI
override fun onComplete() { Log.e("RX", "onComplete") }
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: String) { Log.e("RX", "onNext $t") }
override fun onError(e: Throwable) {Log.e("RX", "onError")}
})
此时日志是
onNext 0-a
onNext 0-b
onNext 1-b
onError
有 DelayError 后缀的方法,就是如果有 Observable 发射 onError 事件,先忽略继续发射剩下的其它事件,发射结束了再发射 onError,而没有 DelayError 的方法,一遇到 onError 整个发射就立即停止。所以它可以延迟发射 onError。
val ob1 = Observable.intervalRange(0, 5, 500, 500, TimeUnit.MILLISECONDS)
// io 线程上,两个错开
val ob2 = Observable.create(ObservableOnSubscribe<String> {
Thread.sleep(200)
it.onNext("a")
Thread.sleep(500)
it.onNext("b")
Thread.sleep(500)
it.onError(Throwable())
}).subscribeOn(Schedulers.io())
Observable.combineLatestDelayError(Function<Array<Any>, String> {
it.map { it.toString() }.reduce { acc, i -> "$acc-$i"}
}, 1, ob1, ob2)
.subscribe(object : Observer<String> { // 没转到 UI 线程,别去修改 UI
override fun onComplete() { Log.e("RX", "onComplete") }
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: String) { Log.e("RX", "onNext $t") }
override fun onError(e: Throwable) {Log.e("RX", "onError")}
})
仅是将 combineLatest 换成 combineLatestDelayError,日志是
onNext 0-a
onNext 0-b
onNext 1-b
onNext 2-b
onNext 3-b
onNext 4-b
onError
它要等所有 Observable 都开始发射数据后才会发射组合后的数据
val ob1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS).take(10)
val ob2 = Observable.intervalRange(100, 10, 50, 100, TimeUnit.MILLISECONDS).take(10)
val ob3 = Observable.intervalRange(200, 10, 100, 200, TimeUnit.MILLISECONDS).take(10)
ob1.withLatestFrom(ob2, ob3, Function3<Long, Long, Long, String> {
t1, t2, t3 -> "$t1 : $t2 : $t3"
}).subscribe({Log.e("RX","$it")})
日志:
1 : 100 : 200
2 : 101 : 200
3 : 102 : 201
4 : 103 : 201
5 : 104 : 202
6 : 105 : 202
7 : 106 : 203
8 : 107 : 203
9 : 108 : 204
不保证按每个 Observable 的顺序来合并。
mergeDelayError 和 mergeArrayDelayError 是在 merge 过程中先忽略 onError。mergeArray 是合并数组,merge 是合并集合,多个 ObservableSource 做参数的内部调用 fromArray,和 mergeArray 一样。
看一个构造方法
public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)
maxConcurrency,bufferSize 可用于控制底层合并时的性能,一次合几个 Observable,取出几个数据,对外使用来说是透明的。
Observable.merge(listOf(observableStr, observableInt, observableBoolean), 2, 5)
.subscribe({
textView.text = "${textView.text}\nonNext $it"
})
依次发送 a,b,c,1,2,3,4,5,true,false,true,false。
mergeWith 内部调的 merge(this, other)。
后面的 Observable 发射数据时忽略前面的 Observable 发射的数据。
val ob1 = Observable.intervalRange(1, 5, 0, 400, TimeUnit.MILLISECONDS, Schedulers.newThread())
val ob2 = Observable.intervalRange(10, 5, 600, 400, TimeUnit.MILLISECONDS)
val observable = Observable.create<ObservableSource<*>> { emitter ->
emitter.onNext(ob1)
emitter.onNext(ob2)
}
Observable.switchOnNext(observable).subscribe({ Log.e("RX", "$it") })
结果和想象的不同,只发射了 ob2 的内容。参考,先后应该是指 Observable 被订阅的时候,而上面的例子是在同时订阅的,ob2 从一开始就是后面那个,所以只发射了它里面的内容。
val ob = Observable.intervalRange(1, 4, 0, 40, TimeUnit.MILLISECONDS)
.map { Observable.intervalRange(it * 10, 10 - 2 * it, 0, 20, TimeUnit.MILLISECONDS) }
Observable.switchOnNext(ob).subscribe({Log.e("RX", "$it")})
上面的 ob,每隔 40ms 创建一个 Observable,第一个 Observable 被订阅后的 40ms 后,第二个 Observable 才被创建被订阅,这样多个 Observable 就错开了。而这四个 Observable 发送的数据如下:
10--11--12--13--14--15--16--17
--------20--21--22--23--24--25
----------------30--31--32--33
------------------------40--41
所以最后输出的是 10,11,20,21,30,31,40,41
。
将多个被观察者发射的数据按顺序组合在一起。比如先定义了三个被观察者
// 发射 3 个 String
val observableStr = Observable.just("a","b","c")
// 发射 5 个 Int
val observableInt = Observable.just(1, 2, 3, 4, 5)
// 发射 4 个 Boolean
val observableBoolean = Observable.just(true, false, true, false)
Observable.zip(observableStr, observableInt, BiFunction<String, Int, String> {
// 分别会调用
// onNext("a : 1")
// onNext("b : 2")
// onNext("c : 3")
// onComplete()
t1, t2 -> "$t1 : $t2"
}).subscribe(observerStr)
第三个参数 BiFunction,泛型 <String, Int, String>
,第一个 String 表示 zip 的第一个参数 observableStr 发射的数据类型,第二个 Int 表示 zip 的第二个参数 observableInt 发射的数据类型,最后一个 String 表示经过 BiFunction 转换后返回的数据类型,即最终发射给观察者的类型。
Observable.zip(observableStr, observableInt, observableBoolean, Function3<String, Int, Boolean, String> {
// 分别会调用
// onNext("a : 1 : true")
// onNext("b : 2 : false")
// onNext("c : 3 : true")
// onComplete()
t1, t2, t3 -> "$t1 : $t2 : $t3"
}).subscribe(observerStr)
组合更多个被观察者,用 Function3、Function4...,前几个泛型表示被观察者的类型,最后一个表示最终发给观察者的数据类型。
按观察者的顺序组合数据,并且发射次数是最少的那个被观察者发射的次数。
zipArray 和 zipIterable 有个布尔型的参数控制是否 delayError,zip 方法都是默认为 false。
zipWith 不是静态方法,相当于将自己这个 Observable 和其它的进行 zip。
/**
* @param other 源 Observable 所要组合的目标 Observable
* @param leftEnd 接收从源 Observable 发射来的数据,并返回一个 Observable,这个 Observable 的生命周期决定了源 Observable 发射数据的有效期
* @param rightEnd 接收从目标 Observable 发射来的数据,并返回一个 Observable,这个 Observable 的生命周期决定了目标 Observable 发射数据的有效期
@param resultSelector 接收从源 Observable 和目标 Observable 发射来的数据,并返回最终组合完的数据。
*/
public final <TRight, TLeftEnd, TRightEnd, R> Observable<R> join(
ObservableSource<? extends TRight> other,
Function<? super T, ? extends ObservableSource<TLeftEnd>> leftEnd,
Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
BiFunction<? super T, ? super TRight, ? extends R> resultSelector
)
public final <TRight, TLeftEnd, TRightEnd, R> Observable<R> groupJoin(
ObservableSource<? extends TRight> other,
Function<? super T, ? extends ObservableSource<TLeftEnd>> leftEnd,
Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
BiFunction<? super T, ? super Observable<TRight>, ? extends R> resultSelector
)
val other = Observable.just("a", "b", "c", "d")
Observable.just(1,2)
.join(other,
Function<Int, Observable<Long>>{
Observable.timer(300, TimeUnit.MILLISECONDS)
},
Function<String, Observable<Long>>{
Observable.timer(200, TimeUnit.MILLISECONDS)
},
BiFunction<Int, String, String>{ t1, t2 -> "$t1-$t2"})
.subscribe(object: Observer<String> {
override fun onComplete() {}
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: String) { Log.e("RX", "$t") }
override fun onError(e: Throwable) {}
})
本身的发射未做延时,每个发射的数据都处于生命周期内,所以结果是
1-a
2-a
1-b
2-b
1-c
2-c
1-d
2-d
Observable.just(1,2)
.map {
Thread.sleep(250)
Log.e("RX", "发射 $it")
it
}
.join(other,
Function<Int, Observable<Long>>{
Log.e("RX", "leftEnd $it")
Observable.timer(300, TimeUnit.MILLISECONDS)
},
Function<String, Observable<Long>>{
Log.e("RX", "rightEnd $it")
Observable.timer(200, TimeUnit.MILLISECONDS)
},
BiFunction<Int, String, String>{ t1, t2 -> "$t1-$t2"})
.subscribe(object: Observer<String> {
override fun onComplete() {}
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: String) { Log.e("RX", "$t") }
override fun onError(e: Throwable) {}
})
05-16 18:10:40.393 7961-7961/pot.ner347.androiddemo E/RX: 发射 1
leftEnd 1
05-16 18:10:40.679 7961-7961/pot.ner347.androiddemo E/RX: 发射 2
05-16 18:10:40.680 7961-7961/pot.ner347.androiddemo E/RX: leftEnd 2
05-16 18:10:40.682 7961-7961/pot.ner347.androiddemo E/RX: other 发射 a
05-16 18:10:40.683 7961-7961/pot.ner347.androiddemo E/RX: rightEnd a
05-16 18:10:40.684 7961-7961/pot.ner347.androiddemo E/RX: 1-a
05-16 18:10:40.685 7961-7961/pot.ner347.androiddemo E/RX: 2-a
other 发射 b
05-16 18:10:40.986 7961-7961/pot.ner347.androiddemo E/RX: rightEnd b
05-16 18:10:40.988 7961-7961/pot.ner347.androiddemo E/RX: other 发射 c
05-16 18:10:41.289 7961-7961/pot.ner347.androiddemo E/RX: rightEnd c
05-16 18:10:41.291 7961-7961/pot.ner347.androiddemo E/RX: other 发射 d
05-16 18:10:41.592 7961-7961/pot.ner347.androiddemo E/RX: rightEnd d
现在看这日志,感觉首先源 Observable 先发送出来,发射一个,join 的第一个参数就收到一个,从最初到收到没有超过里面的 Observable 的生命周期就活着,对于 other,join 的第二个参数收到一个后,紧接着发下一个,如果收到时的间隔超过了回调里那个 Observable 的生命周期,就挂了。
groupJoin 和 join 只有第四个参数的第二,第三个泛型不同,groupJoin 是 Observable,导致最后 subscribe 的泛型也变了。
Observable.just(1,2)
.map {
Thread.sleep(250)
Log.e("RX", "发射 $it")
it
}
.groupJoin(other,
Function<Int, Observable<Long>>{
Log.e("RX", "leftEnd $it")
Observable.timer(300, TimeUnit.MILLISECONDS)
},
Function<String, Observable<Long>>{
Log.e("RX", "rightEnd $it")
Observable.timer(200, TimeUnit.MILLISECONDS)
},
BiFunction<Int, Observable<String>, Observable<String>>{ t1, t2 ->
t2.map { "$t1-$it" }
})
.subscribe(object: Observer<Observable<String>> {
override fun onComplete() {}
override fun onSubscribe(d: Disposable) {}
// 收到的是一个 Observable,需要再订阅
override fun onNext(t: Observable<String>) {
t.subscribe {Log.e("RX", "$it") }
}
override fun onError(e: Throwable) {}
})
在前面插入数据。
public final Observable<T> startWith(T item)
public final Observable<T> startWith(Iterable<? extends T> items)
public final Observable<T> startWith(ObservableSource<? extends T> other)
public final Observable<T> startWithArray(T... items)
// 收到 10,1,2
Observable.just(1,2).startWith(10).subscribe(observerInt)
// 下面三个都是收到 10,11,12,1,2
Observable.just(1,2).startWith(listOf(10,11,12)).subscribe(observerInt)
Observable.just(1,2).startWith(Observable.just(10,11,12)).subscribe(observerInt)
Observable.just(1,2).startWithArray(10,11,12).subscribe(observerInt)