前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava 连接操作符

RxJava 连接操作符

作者头像
三流之路
发布2018-09-11 15:50:32
8541
发布2018-09-11 15:50:32
举报
文章被收录于专栏:三流程序员的挣扎

ReactiveX 系列文章目录


cache/cacheWithInitialCapacity

看注释意思是将所有数据按原来的顺序缓存起来,就是不知道观察者什么时候订阅,什么时候解除订阅,所以缓存起来,以后直接用。

代码语言:javascript
复制
val list = arrayListOf(1,2,3)
val ob = Observable.fromIterable(list)
//      .cache()
list.clear()
ob.subscribe(observerInt)

假设向上面这般用法,无论有没有 cache,Observer 收到的都是只有一个 onComplete。

代码语言:javascript
复制
val list = arrayListOf(1,2,3)
val ob = Observable.fromIterable(list)
        .cache()
ob.subscribe(observerInt)
list.clear()
ob.subscribe(observerInt)

现在如果没有 cache,第一个会收到三次 onNext 和一次 onComplete,clear 之后由于数据清空,只会收到 onComplete。而有了 cache,两个订阅得到的结果都是三次 onNext 和一次 onComplete。

这说明是在有了一个观察者订阅之后,会把被观察者发射的数据缓存起来,这适合多个观察者存在时,其它还没有立刻订阅的观察者也能通过缓存拿到最初的数据。


cacheWithInitialCapacity 的参数表示内部用的缓冲区大小,对外界使用没区别,cache 方法用的是 16.

publish

将普通的 Observable 变成可连接的 ConnectableObservable,它不会在被订阅时发射数据,而是直到使用了connect 操作符时才开始。用这种方法,可以控制在任何时候让 Observable 开始发射数据。

代码语言:javascript
复制
public final ConnectableObservable<T> publish()
// 用 Function 转换源 Observable 发射的数据
public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends ObservableSource<R>> selector)
代码语言:javascript
复制
val ob = Observable.just(1,2,3)
        .doOnSubscribe { Log.e("RX", "onSubscribe")}
        .publish()
Log.e("RX", "subscribe")
ob.subscribe(observerInt)
Thread.sleep(2000)
ob.connect()

2 秒后执行 connect 在 onSubscribe。

replay

ConnectableObservable 和普通的 Observable 最大的区别就是,调用 connect 操作符开始发射数据,后面的订阅者会丢失之前发射过的数据。

代码语言:javascript
复制
var ob = Observable.interval(1, 100, TimeUnit.MILLISECONDS).take(6)
ob = ob.publish()

ob.subscribe{ Log.e("RX", "observer 1 onNext $it") }
ob.connect()

Thread.sleep(400)
ob.subscribe{ Log.e("RX", "observer 2 onNext $it") }
ob.connect()

日志:

代码语言:javascript
复制
observer 1 onNext 0
observer 1 onNext 1
observer 1 onNext 2
observer 1 onNext 3
observer 1 onNext 4
observer 1 onNext 5
observer 2 onNext 5

可见 observer2 丢了 0-4,使用 replay 返回的 ConnectableObservable 会缓存订阅者订阅之前已经发射的数据,可以指定缓存的大小或者时间,这样能避免耗费太多内存。

代码语言:javascript
复制
public final ConnectableObservable<T> replay()
public final ConnectableObservable<T> replay(final int bufferSize)
public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit)
public final ConnectableObservable<T> replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler)
public final ConnectableObservable<T> replay(final int bufferSize, final Scheduler scheduler)
public final ConnectableObservable<T> replay(long time, TimeUnit unit)
public final ConnectableObservable<T> replay(final long time, final TimeUnit unit, final Scheduler scheduler)
public final ConnectableObservable<T> replay(final Scheduler scheduler)

public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, int bufferSize, long time, TimeUnit unit)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler)
public final <R> Observable<R> replay(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final Scheduler scheduler)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, long time, TimeUnit unit)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler)
public final <R> Observable<R> replay(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final Scheduler scheduler)

重载方法很多,但大致可分为两类,一类返回 ConnectableObservable,一类有参数 selector,可以变换源 Observable 发射的数据,然后将这些数据放到一个 Observable 中,方法返回 Observable。

缓存 2 个数

代码语言:javascript
复制
ob = ob.replay(2)

日志

代码语言:javascript
复制
observer 1 onNext 0
observer 1 onNext 1
observer 1 onNext 2
observer 1 onNext 3
observer 1 onNext 4
observer 2 onNext 3
observer 2 onNext 4
observer 1 onNext 5
observer 2 onNext 5

可见 observer2 还取到了被缓存的 3 和 4。

缓存 300ms 内的数据

代码语言:javascript
复制
ob = ob.replay(300, TimeUnit.MILLISECONDS)

日志:

代码语言:javascript
复制
observer 1 onNext 0
observer 1 onNext 1
observer 1 onNext 2
observer 1 onNext 3
observer 1 onNext 4
observer 2 onNext 2
observer 2 onNext 3
observer 2 onNext 4
observer 1 onNext 5
observer 2 onNext 5

收到了前 300ms 缓存的 3,4,5。


其中第二类看源码内部也调用了第一种的 replay,subscribe 时内部会自动执行 connect。

代码语言:javascript
复制
val ob2 = ob.replay({
    it.map { it*10 }
}, 2)

ob2.subscribe{ Log.e("RX", "observer 1 onNext $it") }
Thread.sleep(400)
ob2.subscribe{ Log.e("RX", "----------------observer 2 onNext $it") }

但两个观察者都收到了所有数据,和想象不同。

它不像第一类,它是每次 subscribe 时内部都对普通的 Observable 执行第一类的 replay,再往内部走是 new 了一个 ConnectableObservable。所以两次 subscribe 内部用的是两个 ConnectableObservable 对象。

暂不清楚它的应用场景在哪里。

share

代码语言:javascript
复制
public final Observable<T> share() {
    return publish().refCount();
}

refCount() 把 ConnectableObservable 变为一个普通的 Observable 但又保持了 ConnectableObservable 的特性。如果出现第一个 Observer,它就会自动调用 connect(),如果所有的 Observer 全部 dispose,那么它也会停止接受上游 Observable 的数据。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.06.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • cache/cacheWithInitialCapacity
  • publish
  • replay
  • share
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档