Cold Observable 和 Hot Observable

Observable的分类

Observable 有 Cold 和 Hot 之分。

hot&cold observable.jpg

Hot Observable 无论有没有 Subscriber 订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时,Hot Observable 与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。

然而,Cold Observable 只有 Subscriber 订阅时,才开始执行发射数据流的代码。并且 Cold Observable 和 Subscriber 只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对 Cold Observable 而言,有多个Subscriber的时候,他们各自的事件是独立的。

如果上面的解释有点枯燥的话,那么下面会更加形象地说明 Cold 和 Hot 的区别:

Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song. A cold Observable is a music CD. Many people can buy it and listen to it independently. by Nickolay Tsvetinov

Cold Observable

Observable 的 just、creat、range、fromXXX 等操作符都能生成Cold Observable。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());

        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

执行结果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
   subscriber2: 3
subscriber1: 3
subscriber1: 4
   subscriber2: 4
   subscriber2: 5
subscriber1: 5
subscriber1: 6
   subscriber2: 6
subscriber1: 7
   subscriber2: 7
subscriber1: 8
   subscriber2: 8
subscriber1: 9
   subscriber2: 9

可以看出,subscriber1 和 subscriber2 的结果并不一定是相同的,二者是完全独立的。

尽管 Cold Observable 很好,但是对于某些事件不确定何时发生以及不确定 Observable 发射的元素数量,那还得使用 Hot Observable。比如:UI交互的事件、网络环境的变化、地理位置的变化、服务器推送消息的到达等等。

Cold Observable 如何转换成 Hot Observable?

1. 使用publish,生成 ConnectableObservable

使用 publish 操作符,可以让 Cold Observable 转换成 Hot Observable。它将原先的 Observable 转换成 ConnectableObservable。

来看看刚才的例子:

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };

        ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        observable.connect();

        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        observable.subscribe(subscriber3);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

注意,生成的 ConnectableObservable 需要调用connect()才能真正执行。

执行结果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
      subscriber3: 2
subscriber1: 3
   subscriber2: 3
      subscriber3: 3
subscriber1: 4
   subscriber2: 4
      subscriber3: 4
subscriber1: 5
   subscriber2: 5
      subscriber3: 5
subscriber1: 6
   subscriber2: 6
      subscriber3: 6
subscriber1: 7
   subscriber2: 7
      subscriber3: 7
subscriber1: 8
   subscriber2: 8
      subscriber3: 8
subscriber1: 9
   subscriber2: 9
      subscriber3: 9
subscriber1: 10
   subscriber2: 10
      subscriber3: 10
subscriber1: 11
   subscriber2: 11
      subscriber3: 11

可以看到,多个订阅的 Subscriber 共享同一事件。 在这里,ConnectableObservable 是线程安全的。

2. 使用Subject/Processor

Subject 和 Processor 的作用是相同的。Processor 是 RxJava2.x 新增的类,继承自 Flowable 支持背压控制。而 Subject 则不支持背压控制。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());

        PublishSubject<Long> subject = PublishSubject.create();
        observable.subscribe(subject);

        subject.subscribe(subscriber1);
        subject.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        subject.subscribe(subscriber3);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

执行结果跟上面使用 publish 操作符是一样的。

Subject 既是 Observable 又是 Observer(Subscriber)。这一点可以从 Subject 的源码上看到。

import io.reactivex.*;
import io.reactivex.annotations.*;

/**
 * Represents an Observer and an Observable at the same time, allowing
 * multicasting events from a single source to multiple child Subscribers.
 * <p>All methods except the onSubscribe, onNext, onError and onComplete are thread-safe.
 * Use {@link #toSerialized()} to make these methods thread-safe as well.
 *
 * @param <T> the item value type
 */
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    /**
     * Returns true if the subject has any Observers.
     * <p>The method is thread-safe.
     * @return true if the subject has any Observers
     */
    public abstract boolean hasObservers();

    /**
     * Returns true if the subject has reached a terminal state through an error event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through an error event
     * @see #getThrowable()
     * &see {@link #hasComplete()}
     */
    public abstract boolean hasThrowable();

    /**
     * Returns true if the subject has reached a terminal state through a complete event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through a complete event
     * @see #hasThrowable()
     */
    public abstract boolean hasComplete();

    /**
     * Returns the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet.
     * <p>The method is thread-safe.
     * @return the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet
     */
    @Nullable
    public abstract Throwable getThrowable();

    /**
     * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized subject
     */
    @NonNull
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }
}

当 Subject 作为 Subscriber 时,它可以订阅目标 Cold Observable 使对方开始发送事件。同时它又作为Observable 转发或者发送新的事件,让 Cold Observable 借助 Subject 转换为 Hot Observable。

注意,Subject 并不是线程安全的,如果想要其线程安全需要调用toSerialized()方法。(在RxJava1.x的时代还可以用 SerializedSubject 代替 Subject,但是在RxJava2.x以后SerializedSubject不再是一个public class) 然而,很多基于 EventBus 改造的 RxBus 并没有这么做,包括我以前也写过这样的 RxBus :( 。这样的做法是非常危险的,因为会遇到并发的情况。

Hot Observable 如何转换成 Cold Observable?

1. ConnectableObservable的refCount操作符

reactivex官网的解释是

make a Connectable Observable behave like an ordinary Observable

RefCount.png

RefCount操作符把从一个可连接的 Observable 连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。

如果所有的订阅者都取消订阅了,则数据流停止。如果重新订阅则重新开始数据流。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        connectableObservable.connect();

        Observable<Long> observable = connectableObservable.refCount();

        Disposable disposable1 = observable.subscribe(subscriber1);
        Disposable disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        disposable1.dispose();
        disposable2.dispose();

        System.out.println("重新开始数据流");

        disposable1 = observable.subscribe(subscriber1);
        disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

执行结果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
重新开始数据流
subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1

如果不是所有的订阅者都取消了订阅,只取消了部分。部分的订阅者重新开始订阅,则不会从头开始数据流。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };

        ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        connectableObservable.connect();

        Observable<Long> observable = connectableObservable.refCount();

        Disposable disposable1 = observable.subscribe(subscriber1);
        Disposable disposable2 = observable.subscribe(subscriber2);
        observable.subscribe(subscriber3);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        disposable1.dispose();
        disposable2.dispose();

        System.out.println("subscriber1、subscriber2 重新订阅");

        disposable1 = observable.subscribe(subscriber1);
        disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

执行结果:

subscriber1: 0
   subscriber2: 0
      subscriber3: 0
subscriber1: 1
   subscriber2: 1
      subscriber3: 1
subscriber1、subscriber2 重新订阅
      subscriber3: 2
subscriber1: 2
   subscriber2: 2
      subscriber3: 3
subscriber1: 3
   subscriber2: 3
      subscriber3: 4
subscriber1: 4
   subscriber2: 4

在这里,subscriber1和subscriber2先取消了订阅,subscriber3并没有取消订阅。之后,subscriber1和subscriber2又重新订阅。最终subscriber1、subscriber2、subscriber3的值保持一致。

2. Observable的share操作符

share操作符封装了publish().refCount()调用,可以看其源码。

    /**
     * Returns a new {@link ObservableSource} that multicasts (shares) the original {@link ObservableSource}. As long as
     * there is at least one {@link Observer} this {@link ObservableSource} will be subscribed and emitting data.
     * When all subscribers have disposed it will dispose the source {@link ObservableSource}.
     * <p>
     * This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}.
     * <p>
     * ![](http://upload-images.jianshu.io/upload_images/2613397-81dcef165b69aca2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code share} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @return an {@code ObservableSource} that upon connection causes the source {@code ObservableSource} to emit items
     *         to its {@link Observer}s
     * @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX operators documentation: RefCount</a>
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> share() {
        return publish().refCount();
    }

总结

理解了 Hot Observable 和 Cold Observable 的区别才能够写出更好Rx代码。同理,也能理解Hot & Cold Flowable。再者,在其他语言的Rx版本中包括 RxSwift、RxJS 等也存在 Hot Observable 和 Cold Observable 这样的概念。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏技术博文

PHP的几个常用加密函数

在php的开发过程中,常常需要对部分数据(如用户密码)进行加密 一、加密类型: 1.单向散列加密   就是把任意长度的信息进行散列计算,得到固定长度的输出,这个...

4118
来自专栏杨逸轩 ' sBlog

Java实现微信跳一跳抓包修改分数

3696
来自专栏Java与Android技术栈

RxJava1 升级到 RxJava2 所踩过的坑

RxJava2 发布已经有一段时间了,是对 RxJava 的一次重大的升级,由于我的一个库cv4j使用了 RxJava2 来尝鲜,但是 RxJava2 跟 Rx...

1373
来自专栏小工匠技术圈

【小工匠聊密码学】--消息摘要--HMAC算法

  HMAC是密钥相关的消息认证码,HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出。

871
来自专栏三流程序员的挣扎

RxJava 条件和布尔操作符

多个 Observable 中,无论发射的是 onNext 还是 onComplete 或者 onError,只接受第一个发射数据的那个 Observable,...

861
来自专栏小工匠技术圈

【小工匠聊密码学】--对称加密--3DES

  3DES(或称为Triple DES)是三重数据加密算法(TDEA,Triple Data Encryption Algorithm)块密码的通称。它相当于...

1032
来自专栏编程微刊

三分钟教你学会如何将密文解码成明文

1704
来自专栏小工匠技术圈

【小工匠聊密码学】--消息摘要--SHA算法

1795
来自专栏三流程序员的挣扎

RxJava 连接操作符

看注释意思是将所有数据按原来的顺序缓存起来,就是不知道观察者什么时候订阅,什么时候解除订阅,所以缓存起来,以后直接用。

2002
来自专栏Hongten

PBE_Password-based encryption(基于密码加密)_项目中你也可以有

中说道了PBE——Password-based encryption(基于密码加密)。我也做测试了一下,现在把我做的效果给大家演示一下:

941

扫码关注云+社区