首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RxJava,一个可观察的多个订阅者: publish().autoConnect()

RxJava,一个可观察的多个订阅者: publish().autoConnect()
EN

Stack Overflow用户
提问于 2017-01-28 22:50:52
回答 2查看 17.8K关注 0票数 21

我在玩rxJava/rxAndroid,有些非常基本的东西不像我期望的那样表现。我有一个可观察的和两个订户:

代码语言:javascript
运行
复制
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));

这是输出:

代码语言:javascript
运行
复制
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3

现在,我知道通过使用publish().autoConnect()可以避免重复计数,但我首先尝试理解这种默认行为。每当有人订阅可观察到的内容时,它就开始发出数字序列。我明白了。因此,当Subscriber 1连接时,它开始释放项。Subscriber 2马上就连接起来,为什么它没有得到值呢?

从可观察的角度来看,我就是这样理解的:

  1. 有人订阅了我,我应该开始发邮件了 用户: 1
  2. 向订阅者发送“1”项 用户: 1
  3. 其他人订阅了我,当我完成时,我会再次发出1,2,3 订户:1& 2
  4. 向订阅者发送项“2” 订户:1& 2
  5. 向订阅者发送项“3” 订户:1& 2
  6. 向订阅者发送“1”项 订户:1& 2
  7. ..。

但这不是它的工作方式。就好像它们是一个单独的可观测的物体。这让我很困惑,为什么他们不把这些东西给所有的订户呢?

奖金:

publish().autoConnect()是如何解决这个问题的?让我们把它拆开。publish()给了我一个可观察到的连接。一个可连接的可观测就像一个可观察的规则,但你可以告诉它何时连接。然后我继续告诉它通过调用autoConnect()立即连接

这样做..。我不是得到了和我开始一样的东西吗?普通的可观察到的。操作符似乎相互取消.

我可以闭嘴用publish().autoconnect()。但我想了解更多关于可观察性是如何工作的。

谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-01-28 23:33:02

这是因为事实上,这是两个独立的观测数据。当您调用subscribe()时,它们将被“生成”。因此,您所提供的步骤是不正确的,因为步骤3和步骤4只是1&2,但是在不同的可观察范围内。

但是您可以看到它们是11,12,2,2,2,因为日志记录是在线程上进行的。如果您要删除observeOn()部分,那么您将以一种相互交织的方式看到排放。若要查看以下运行代码,请执行以下操作:

代码语言:javascript
运行
复制
@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    Observable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation());
                    //.observeOn(single);

    dataStream.subscribe(i -> System.out.println("1  " + Thread.currentThread().getName() + " " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + Thread.currentThread().getName() + " " + (i - l)));

    Thread.sleep(1000);
}

输出,至少在我运行时是这样的(注意线程名):

代码语言:javascript
运行
复制
1  RxComputationThreadPool-1 135376988
2  RxComputationThreadPool-2 135376988
1  RxComputationThreadPool-1 135486815
2  RxComputationThreadPool-2 135537383
1  RxComputationThreadPool-1 135560691
2  RxComputationThreadPool-2 135617580

如果应用observeOn(),它将变成:

代码语言:javascript
运行
复制
1  RxSingleScheduler-1 186656395
1  RxSingleScheduler-1 187919407
1  RxSingleScheduler-1 187923753
2  RxSingleScheduler-1 186656790
2  RxSingleScheduler-1 187860148
2  RxSingleScheduler-1 187864889

正如您正确指出的,要获得所需的东西,需要publish().refcount()或简单的share()(它是一个别名)操作符。

这是因为publish()创建了一个ConnectableObservable,它在通过connect()方法被告知之前不会开始释放条目。在这种情况下,如果您这样做:

代码语言:javascript
运行
复制
@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    ConnectableObservable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation())
                    .observeOn(single)
                    .publish();

    dataStream.subscribe(i -> System.out.println("1  " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + (i - l)));

    Thread.sleep(1000);
    dataStream.connect();
    Thread.sleep(1000);

}

您将注意到,在第一秒钟(第一次Thread.sleep()调用),什么都不会发生,就在dataStream.connect()被调用之后,排放就会发生。

refCount()接收一个ConnectableObservable,并通过计算当前订阅的订户数量来隐藏调用connect()的需要。它所做的是在第一次订阅时,它调用connect(),在最后一次取消订阅之后,从原始的可观察到的订阅中取消订阅。

至于publish().autoConnect()的相互取消,之后您确实会得到一个可观察的,但是它有一个特殊的属性,比如最初的可观察性在因特网上执行一个API调用(持续10秒),当您使用它而不使用share()时,您将得到与在这10秒内有订阅一样多的对服务器的并行查询。另一方面,使用share(),您将只有一个电话。

如果一个可观察的共享完成其工作非常快(如just(1,2,3)),您将不会看到它的任何好处。

autoConnect()/refCount()为您提供了一个可观察的中间值,您可以订阅它,而不是原始的可观察到的。

如果你对这本书感兴趣的话:基于RxJava的反应性编程

票数 15
EN

Stack Overflow用户

发布于 2017-01-29 19:27:29

定期(寒冷)可观察的

Observable的核心是subscribe函数。每次新观察者订阅时,都会将其作为参数传递给此函数。这个函数所做的是将数据输入到单个观察者中。它通过调用observer.onNext方法来做到这一点。它可能会立即这样做(就像just那样),或者通过一些调度程序(例如。interval),或者从后台线程或回调(例如。通过启动某个异步任务)。

我在上面突出显示了单字,因为这是该函数在调用时唯一知道的观察者。如果您多次订阅这样一个可观察到的功能,则会为每个订阅者调用它的subscribe函数。

像这样的数据源称为冷可观测

调度器

应用subscribeOn运算符在subscribe调用和原始可观测的subscribe函数之间添加中间步骤。您不再直接调用它,而是通过指定的调度程序来调度您的调用。

observeOn向观察者的所有onNext调用添加了类似的中间步骤。

在您的示例中,subscribe函数被调用两次,即生成两次数据序列。调用是通过多线程io调度程序调度的,因此这些调用不是在主线程上,而是在其他两个线程上,几乎同时发生。两个线程都开始调用两个订阅服务器的onNext方法。请记住,每个线程只知道自己的订阅服务器。onNext调用由mainThread调度程序调度,它是单线程的,也就是说,它们不能同时发生,但需要以某种方式排队。严格地说,不能保证这些调用的顺序,。它取决于各种因素,并且是具体实施的。尝试将just替换为interval (这将在消息之间引入延迟),您将看到消息将以不同的顺序到达。

热可观测

publish运算符使您的可观察到的热,也就是可连接。它将中间步骤添加到subscribe函数(这只被调用一次)和onNext方法中--这些步骤被传播到所有订阅的可观测值。换句话说,它允许多个订阅者共享单个订阅。

准确地说,subscribe函数是在调用connect方法时调用的。有两个操作符为您自动调用connect

  • 当第一个订阅服务器进来时,autoConnect调用connect方法。不过,它从不断开连接。
  • refCount在第一个订阅者进来时调用connect,当最后一个订户取消订阅时会自动断开连接。当新的订阅者进来时,它将重新连接(再次调用subscribe函数)。

publish().refCount()是流行的组合,所以它有捷径:share()

对于您的教育,请尝试使用下面的代码和不含share的代码

代码语言:javascript
运行
复制
Observable<Long> dataStream = Observable.interval(100, TimeUnit.MILLISECONDS)
        .take(3)
        .share();
System.out.println("subscribing A");
dataStream.subscribe(v -> System.out.println("A got " + v));
TimeUnit.MILLISECONDS.sleep(150);
System.out.println("subscribing B");
dataStream.subscribe(v -> System.out.println("B got " + v));
TimeUnit.SECONDS.sleep(1);

对原始问题的回答

1)冷观测总是处理单用户问题。所以你的时间图应该是这样的:

代码语言:javascript
运行
复制
subscribed first subscriber
[SUBSCRIBER: 1][ITEMS TO EMIT: 1,2,3]
subscribed second subscriber
[SUBSCRIBER: 1][ITEMS TO EMIT: 1,2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 1,2,3]
emit "1" to subscriber 1
[SUBSCRIBER: 1][ITEMS TO EMIT: 2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 1,2,3]
emit "1" to subscriber 2
[SUBSCRIBER: 1][ITEMS TO EMIT: 2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 2,3]
...

虽然订单没有保证,因为多线程比赛。

( 2) publishautoConnect不相互取消。他们只会加。

代码语言:javascript
运行
复制
dataSource = ...;
dataSourceShared = dataSource.publish().autoConnect();

现在,当您订阅多个dataSourceShared订阅者时,这只会导致对原始dataSource的一个订阅。也就是说,您不必为每个新订户发出新的消息系列。

票数 12
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41915738

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档