前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava 组合操作符

RxJava 组合操作符

作者头像
三流编程
发布2018-09-11 15:50:07
1.5K0
发布2018-09-11 15:50:07
举报

ReactiveX 系列文章目录


combineLatest

把每个 Observable 最近发射的数据组合在一起。

共有 13 个重载方法。

代码语言:javascript
复制
// 前两个参数是两个 Observable,内部是将这几个 Observable 封装成数组调用下面的重载方法
public static <T1, T2, R> Observable<R> combineLatest(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
            BiFunction<? super T1, ? super T2, ? extends R> combiner)
// 还有 7 个方法,区别在于前面的参数分别是 3 个,4 个 ... 9 个 Observable

// 下面两个方法的区别在于一个没有 buffersize 参数,内部会获取一个默认的大小。
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSource<? extends T>> sources,
                Function<? super Object[], ? extends R> combiner)
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSource<? extends T>> sources,
            Function<? super Object[], ? extends R> combiner, int bufferSize)

// 下面两个方法和上面方法的区别只在于第一个参数,一个是集合,一个是数组,本质是一样的    
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
            Function<? super Object[], ? extends R> combiner)
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
            Function<? super Object[], ? extends R> combiner, int bufferSize)

// 这个方法内部就是简单的调用了上面的方法
public static <T, R> Observable<R> combineLatest(Function<? super Object[], ? extends R> combiner, int bufferSize, ObservableSource<? extends T>... sources)

现在看上面的最后一个方法,最后一个参数是多个 Observable,第二个参数 bufferSize 是内部缓冲队列的大小。

被组合的被观察者的数据类型是 T,组合后变成一个数组 Object[],在第一个参数 Function 里被转换成 R,最后的被观察者就是发射 R 类型的数据。Function 的第一个泛型是 Object 数组,所以可以组合不同类型的被观察者。

代码语言:javascript
复制
val observableInt = Observable.just(1, 2, 3, 4, 5)
val observableStr = Observable.just("a","b","c")

Observable.combineLatest(Function<Array<Any>, String> {
            it.map { it.toString() }.reduce { acc, i ->  "$acc-$i"}
        }, 3, observableInt, observableStr)
                .subscribe { Log.e("RX", "$it") }

由于上面的例子都在主线程,也没控制进度,按顺序发送两个 Observable,所以第一个 Observable 发送结束了才发第二个,observableInt 最近的就是 5,结果是

代码语言:javascript
复制
5-a
5-b
5-c

再看一个例子

代码语言:javascript
复制
// computation 线程上
val ob1 = Observable.intervalRange(0, 5, 500, 500, TimeUnit.MILLISECONDS)
// io 线程上,两个错开
val ob2 = Observable.create(ObservableOnSubscribe<String> {
    Thread.sleep(200)
    it.onNext("a")
    Thread.sleep(500)
    it.onNext("b")
    Thread.sleep(500)
    it.onError(Throwable())
}).subscribeOn(Schedulers.io())

Observable.combineLatest(Function<Array<Any>, String> {
    it.map { it.toString() }.reduce { acc, i ->  "$acc-$i"}
}, 1, ob1, ob2)
        .subscribe(object : Observer<String> { // 没转到 UI 线程,别去修改 UI
            override fun onComplete() { Log.e("RX", "onComplete") }
            override fun onSubscribe(d: Disposable) {}
            override fun onNext(t: String) { Log.e("RX", "onNext $t") }
            override fun onError(e: Throwable) {Log.e("RX", "onError")}
        })

此时日志是

代码语言:javascript
复制
onNext 0-a
onNext 0-b
onNext 1-b
onError

combineLatestDelayError

有 DelayError 后缀的方法,就是如果有 Observable 发射 onError 事件,先忽略继续发射剩下的其它事件,发射结束了再发射 onError,而没有 DelayError 的方法,一遇到 onError 整个发射就立即停止。所以它可以延迟发射 onError。

代码语言:javascript
复制
val ob1 = Observable.intervalRange(0, 5, 500, 500, TimeUnit.MILLISECONDS)
// io 线程上,两个错开
val ob2 = Observable.create(ObservableOnSubscribe<String> {
    Thread.sleep(200)
    it.onNext("a")
    Thread.sleep(500)
    it.onNext("b")
    Thread.sleep(500)
    it.onError(Throwable())
}).subscribeOn(Schedulers.io())

Observable.combineLatestDelayError(Function<Array<Any>, String> {
    it.map { it.toString() }.reduce { acc, i ->  "$acc-$i"}
}, 1, ob1, ob2)
        .subscribe(object : Observer<String> { // 没转到 UI 线程,别去修改 UI
            override fun onComplete() { Log.e("RX", "onComplete") }
            override fun onSubscribe(d: Disposable) {}
            override fun onNext(t: String) { Log.e("RX", "onNext $t") }
            override fun onError(e: Throwable) {Log.e("RX", "onError")}
        })

仅是将 combineLatest 换成 combineLatestDelayError,日志是

代码语言:javascript
复制
onNext 0-a
onNext 0-b
onNext 1-b
onNext 2-b
onNext 3-b
onNext 4-b
onError

withLatestFrom

它要等所有 Observable 都开始发射数据后才会发射组合后的数据

代码语言:javascript
复制
val ob1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS).take(10)
val ob2 = Observable.intervalRange(100, 10, 50, 100, TimeUnit.MILLISECONDS).take(10)
val ob3 = Observable.intervalRange(200, 10, 100, 200, TimeUnit.MILLISECONDS).take(10)

ob1.withLatestFrom(ob2, ob3, Function3<Long, Long, Long, String> {
    t1, t2, t3 -> "$t1 : $t2 : $t3"
}).subscribe({Log.e("RX","$it")})

日志:

代码语言:javascript
复制
1 : 100 : 200
2 : 101 : 200
3 : 102 : 201
4 : 103 : 201
5 : 104 : 202
6 : 105 : 202
7 : 106 : 203
8 : 107 : 203
9 : 108 : 204

merge/mergeDelayError/mergeArray/mergeArrayDelayError/mergeWith

不保证按每个 Observable 的顺序来合并。

mergeDelayError 和 mergeArrayDelayError 是在 merge 过程中先忽略 onError。mergeArray 是合并数组,merge 是合并集合,多个 ObservableSource 做参数的内部调用 fromArray,和 mergeArray 一样。

看一个构造方法

代码语言:javascript
复制
public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)

maxConcurrency,bufferSize 可用于控制底层合并时的性能,一次合几个 Observable,取出几个数据,对外使用来说是透明的。

代码语言:javascript
复制
Observable.merge(listOf(observableStr, observableInt, observableBoolean), 2, 5)
                  .subscribe({
                      textView.text = "${textView.text}\nonNext $it"
                  })

依次发送 a,b,c,1,2,3,4,5,true,false,true,false。

mergeWith 内部调的 merge(this, other)。

switchOnNext/switchOnNextDelayError

后面的 Observable 发射数据时忽略前面的 Observable 发射的数据。

代码语言:javascript
复制
val ob1 = Observable.intervalRange(1, 5, 0, 400, TimeUnit.MILLISECONDS, Schedulers.newThread())
val ob2 = Observable.intervalRange(10, 5, 600, 400, TimeUnit.MILLISECONDS)

val observable = Observable.create<ObservableSource<*>> { emitter ->
   emitter.onNext(ob1)
   emitter.onNext(ob2)
}

Observable.switchOnNext(observable).subscribe({ Log.e("RX", "$it") })

结果和想象的不同,只发射了 ob2 的内容。参考,先后应该是指 Observable 被订阅的时候,而上面的例子是在同时订阅的,ob2 从一开始就是后面那个,所以只发射了它里面的内容。

代码语言:javascript
复制
val ob = Observable.intervalRange(1, 4, 0, 40, TimeUnit.MILLISECONDS)
                .map { Observable.intervalRange(it * 10, 10 - 2 * it, 0, 20, TimeUnit.MILLISECONDS) }
Observable.switchOnNext(ob).subscribe({Log.e("RX", "$it")})

上面的 ob,每隔 40ms 创建一个 Observable,第一个 Observable 被订阅后的 40ms 后,第二个 Observable 才被创建被订阅,这样多个 Observable 就错开了。而这四个 Observable 发送的数据如下:

代码语言:javascript
复制
10--11--12--13--14--15--16--17
--------20--21--22--23--24--25
----------------30--31--32--33
------------------------40--41

所以最后输出的是 10,11,20,21,30,31,40,41

zip/zipArray/zipIterable/zipWith

将多个被观察者发射的数据按顺序组合在一起。比如先定义了三个被观察者

代码语言:javascript
复制
// 发射 3 个 String
val observableStr = Observable.just("a","b","c")

// 发射 5 个 Int
val observableInt = Observable.just(1, 2, 3, 4, 5)

// 发射 4 个 Boolean
val observableBoolean = Observable.just(true, false, true, false)
代码语言:javascript
复制
Observable.zip(observableStr, observableInt, BiFunction<String, Int, String> {
// 分别会调用
// onNext("a : 1")
// onNext("b : 2")
// onNext("c : 3")
// onComplete()
t1, t2 -> "$t1 : $t2"
}).subscribe(observerStr)

第三个参数 BiFunction,泛型 <String, Int, String>,第一个 String 表示 zip 的第一个参数 observableStr 发射的数据类型,第二个 Int 表示 zip 的第二个参数 observableInt 发射的数据类型,最后一个 String 表示经过 BiFunction 转换后返回的数据类型,即最终发射给观察者的类型。

代码语言:javascript
复制
Observable.zip(observableStr, observableInt, observableBoolean, Function3<String, Int, Boolean, String> {
// 分别会调用
// onNext("a : 1 : true")
// onNext("b : 2 : false")
// onNext("c : 3 : true")
// onComplete()
t1, t2, t3 -> "$t1 : $t2 : $t3"
}).subscribe(observerStr)

组合更多个被观察者,用 Function3、Function4...,前几个泛型表示被观察者的类型,最后一个表示最终发给观察者的数据类型。

按观察者的顺序组合数据,并且发射次数是最少的那个被观察者发射的次数。

zipArray 和 zipIterable 有个布尔型的参数控制是否 delayError,zip 方法都是默认为 false。

zipWith 不是静态方法,相当于将自己这个 Observable 和其它的进行 zip。

join/groupJoin

代码语言:javascript
复制
/**
 * @param other 源 Observable 所要组合的目标 Observable
 * @param leftEnd 接收从源 Observable 发射来的数据,并返回一个 Observable,这个 Observable 的生命周期决定了源 Observable 发射数据的有效期
 * @param rightEnd 接收从目标 Observable 发射来的数据,并返回一个 Observable,这个 Observable 的生命周期决定了目标 Observable 发射数据的有效期
 @param resultSelector 接收从源 Observable 和目标 Observable 发射来的数据,并返回最终组合完的数据。
 */
public final <TRight, TLeftEnd, TRightEnd, R> Observable<R> join(
           ObservableSource<? extends TRight> other,
           Function<? super T, ? extends ObservableSource<TLeftEnd>> leftEnd,
           Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
           BiFunction<? super T, ? super TRight, ? extends R> resultSelector
                   )

public final <TRight, TLeftEnd, TRightEnd, R> Observable<R> groupJoin(
           ObservableSource<? extends TRight> other,
           Function<? super T, ? extends ObservableSource<TLeftEnd>> leftEnd,
           Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd,
           BiFunction<? super T, ? super Observable<TRight>, ? extends R> resultSelector
                   )
代码语言:javascript
复制
val other = Observable.just("a", "b", "c", "d")
Observable.just(1,2)
        .join(other,
        Function<Int, Observable<Long>>{
            Observable.timer(300, TimeUnit.MILLISECONDS)
        },
        Function<String, Observable<Long>>{
            Observable.timer(200, TimeUnit.MILLISECONDS)
        },
        BiFunction<Int, String, String>{ t1, t2 -> "$t1-$t2"})
        .subscribe(object: Observer<String> {
            override fun onComplete() {}
            override fun onSubscribe(d: Disposable) {}
            override fun onNext(t: String) { Log.e("RX", "$t") }
            override fun onError(e: Throwable) {}
        })

本身的发射未做延时,每个发射的数据都处于生命周期内,所以结果是

代码语言:javascript
复制
1-a
2-a
1-b
2-b
1-c
2-c
1-d
2-d
代码语言:javascript
复制
Observable.just(1,2)
        .map {
            Thread.sleep(250)
            Log.e("RX", "发射 $it")
            it
        }
        .join(other,
        Function<Int, Observable<Long>>{
            Log.e("RX", "leftEnd $it")
            Observable.timer(300, TimeUnit.MILLISECONDS)
        },
        Function<String, Observable<Long>>{
            Log.e("RX", "rightEnd $it")
            Observable.timer(200, TimeUnit.MILLISECONDS)
        },
        BiFunction<Int, String, String>{ t1, t2 -> "$t1-$t2"})
        .subscribe(object: Observer<String> {
            override fun onComplete() {}
            override fun onSubscribe(d: Disposable) {}
            override fun onNext(t: String) { Log.e("RX", "$t") }
            override fun onError(e: Throwable) {}
        })
代码语言:javascript
复制
05-16 18:10:40.393 7961-7961/pot.ner347.androiddemo E/RX: 发射 1
    leftEnd 1
05-16 18:10:40.679 7961-7961/pot.ner347.androiddemo E/RX: 发射 2
05-16 18:10:40.680 7961-7961/pot.ner347.androiddemo E/RX: leftEnd 2
05-16 18:10:40.682 7961-7961/pot.ner347.androiddemo E/RX: other 发射 a
05-16 18:10:40.683 7961-7961/pot.ner347.androiddemo E/RX: rightEnd a
05-16 18:10:40.684 7961-7961/pot.ner347.androiddemo E/RX: 1-a
05-16 18:10:40.685 7961-7961/pot.ner347.androiddemo E/RX: 2-a
    other 发射 b
05-16 18:10:40.986 7961-7961/pot.ner347.androiddemo E/RX: rightEnd b
05-16 18:10:40.988 7961-7961/pot.ner347.androiddemo E/RX: other 发射 c
05-16 18:10:41.289 7961-7961/pot.ner347.androiddemo E/RX: rightEnd c
05-16 18:10:41.291 7961-7961/pot.ner347.androiddemo E/RX: other 发射 d
05-16 18:10:41.592 7961-7961/pot.ner347.androiddemo E/RX: rightEnd d

现在看这日志,感觉首先源 Observable 先发送出来,发射一个,join 的第一个参数就收到一个,从最初到收到没有超过里面的 Observable 的生命周期就活着,对于 other,join 的第二个参数收到一个后,紧接着发下一个,如果收到时的间隔超过了回调里那个 Observable 的生命周期,就挂了。

groupJoin 和 join 只有第四个参数的第二,第三个泛型不同,groupJoin 是 Observable,导致最后 subscribe 的泛型也变了。

代码语言:javascript
复制
Observable.just(1,2)
     .map {
         Thread.sleep(250)
         Log.e("RX", "发射 $it")
         it
     }
     .groupJoin(other,
             Function<Int, Observable<Long>>{
                 Log.e("RX", "leftEnd $it")
                 Observable.timer(300, TimeUnit.MILLISECONDS)
             },
             Function<String, Observable<Long>>{
                 Log.e("RX", "rightEnd $it")
                 Observable.timer(200, TimeUnit.MILLISECONDS)
             },
             BiFunction<Int, Observable<String>, Observable<String>>{ t1, t2 ->
                 t2.map { "$t1-$it" }
             })
     .subscribe(object: Observer<Observable<String>> {
         override fun onComplete() {}
         override fun onSubscribe(d: Disposable) {}

        // 收到的是一个 Observable,需要再订阅
         override fun onNext(t: Observable<String>) {
             t.subscribe {Log.e("RX", "$it") }
         }
         override fun onError(e: Throwable) {}
     })

startWith/startWithArray

在前面插入数据。

代码语言:javascript
复制
public final Observable<T> startWith(T item)
public final Observable<T> startWith(Iterable<? extends T> items)
public final Observable<T> startWith(ObservableSource<? extends T> other)

public final Observable<T> startWithArray(T... items)
代码语言:javascript
复制
// 收到 10,1,2
Observable.just(1,2).startWith(10).subscribe(observerInt)

// 下面三个都是收到 10,11,12,1,2
Observable.just(1,2).startWith(listOf(10,11,12)).subscribe(observerInt)
Observable.just(1,2).startWith(Observable.just(10,11,12)).subscribe(observerInt)
Observable.just(1,2).startWithArray(10,11,12).subscribe(observerInt)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.06.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • combineLatest
  • combineLatestDelayError
  • withLatestFrom
  • merge/mergeDelayError/mergeArray/mergeArrayDelayError/mergeWith
  • switchOnNext/switchOnNextDelayError
  • zip/zipArray/zipIterable/zipWith
  • join/groupJoin
  • startWith/startWithArray
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档