前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava 算术和聚合操作符

RxJava 算术和聚合操作符

作者头像
三流编程
发布2018-09-11 15:51:47
1.1K0
发布2018-09-11 15:51:47
举报

concat/concatDelayError/concatArray/concatArrayDelayError

将多个被观察者按先后顺序串联起来。

代码语言:javascript
复制
// 前面有 2-4 个 ObservableSource 参数,内部调用的都是 concatArray
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)

// 数组和集合,意思一样
public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)            
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
代码语言:javascript
复制
Observable.concat(observableStr, observableInt, observableBoolean)
           .subscribe(Consumer<Any> {
               textView.text = "${textView.text}\nonNext $it"
           })

将依次连续发送 observableStr,observableInt,observableBoolean 里的 12 个数据。


代码语言:javascript
复制
// 区别在于有个参数 prefetch,和 combineLatest 的 buffersize 作用一样
public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch)

这两个重载方法,不知道怎么用,试验了若干次,终于不崩溃了。

Java 语言版本:

代码语言:javascript
复制
Observable observable = Observable.create(new ObservableOnSubscribe<ObservableSource>() {
        @Override
        public void subscribe(ObservableEmitter emitter) {
            emitter.onNext(observableInt);
            emitter.onNext(observableStr);
        }
    });

Observable.concat(observable).subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) {
        Log.e("RX", o.toString());
    }
});

Kotlin 语言版本

代码语言:javascript
复制
val observable = Observable.create<ObservableSource<*>> { emitter ->
    emitter.onNext(observableInt)
    emitter.onNext(observableStr)
}

Observable.concat<Any>(observable)
        .subscribe({ o -> Log.e("RX", o!!.toString()) })

最终打出的日志按顺序是 1,2,3,4,5,a,b,c

concatDelayError 和 concatArrayDelayError 是推迟发射 onError。

concatEager/concatArrayEager

代码语言:javascript
复制
public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)
public static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources)

// 集合和上面的可变参数可以看成一样的
public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch)

public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch)

注释:

Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them in order, each one after the previous one completes.

意思大概是说,一旦有观察者订阅了之后,会先将被观察者发射的数据缓存起来,然后将缓存的数据一个接一个的发射出去。

对于外部调用来说,结果和 concat 作用没什么区别。它是并发处理多个 Observable,而不像 concat 那样串行处理,但是又能够保证最终的顺序。

代码语言:javascript
复制
Observable.concatArrayEager(observableStr, observableInt, observableBoolean).subscribe({
    Log.e("RX", it.toString())
})

依次发射三个 Observable 的数据。


有重载方法的参数是 ObservableSource<? extends ObservableSource<? extends T>> sources 应该和 concat 里用这个参数的重载方法差不多,就不写代码测试了。

concatWith

代码语言:javascript
复制
public final Observable<T> concatWith(ObservableSource<? extends T> other) {
    ObjectHelper.requireNonNull(other, "other is null");
    return concat(this, other);
}

非静态,自己和别人 concat。

collect/collectInto

收集发射的数据到一个数据结构里,然后将这个结构作为一个整体发射出去。

代码语言:javascript
复制
Observable.just(18, "China", "Ma")
    .collect(Callable<MutableList<Any>> {
        arrayListOf()
    }, BiConsumer<MutableList<Any>, Any> {
        t1, t2 ->  t1.add(t2)
    }).subscribe(Consumer<MutableList<Any>> {
        Log.e("RX", "$it")
    })

将三个零散的数据收集到一个列表里,最后收到 [18, China, Ma]

collectInto 是将数据结构直接作为第一个参数传进去,而不需要通过回调提供一个数据结构。

代码语言:javascript
复制
Observable.just(18, "China", "Ma")
    .collectInto(arrayListOf()
    , BiConsumer<MutableList<Any>, Any> {
        t1, t2 ->  t1.add(t2)
    }).subscribe(Consumer<MutableList<Any>> {
        Log.e("RX", "$it")
    })

count

返回发射的数目,并且将这个数目作为一个 64 位的 long 型值以 Single 发射出来。

代码语言:javascript
复制
Observable.just(20,30,40).count()
        .subscribe(object : SingleObserver<Long> {
            override fun onSuccess(t: Long) {
                textView.text = "${textView.text}\n $t"
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onError(e: Throwable) {
            }
})

reduce/reduceWith

代码语言:javascript
复制
// 返回 Maybe
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
// 有初始值,返回 Single
public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer)
// 通过一个回调获取初始值
public final <R> Single<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer)

一种累计运算。

代码语言:javascript
复制
// 算乘积,结果是 6
Observable.just(1, 2, 3)
  .reduce({ t1, t2 -> t1 * t2 }).subscribe {
      textView.text = "${textView.text}\n $it"
  }

// 有初始值 10,结果是 60
Observable.just(1, 2, 3)
    .reduce(10, { t1, t2 -> t1 * t2 })
    .subscribe(object: SingleObserver<Int> {
        override fun onSuccess(t: Int) {textView.text = "${textView.text}\n $t"}
        override fun onSubscribe(d: Disposable) {}
        override fun onError(e: Throwable) {}
    })

// 有初始值 8,结果是 48
Observable.just(1, 2, 3)
    .reduceWith({ 8 }, { t1, t2 -> t1 * t2 })
    .subscribe(object: SingleObserver<Int> {
        override fun onSuccess(t: Int) {textView.text = "${textView.text}\n $t"}
        override fun onSubscribe(d: Disposable) {}
        override fun onError(e: Throwable) {}
    })
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.06.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • concat/concatDelayError/concatArray/concatArrayDelayError
  • concatEager/concatArrayEager
  • concatWith
  • collect/collectInto
  • count
  • reduce/reduceWith
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档