我在玩rxJava/rxAndroid,有些非常基本的东西不像我期望的那样表现。我有一个可观察的和两个订户:
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));
Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
这是输出:
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
现在,我知道通过使用publish().autoConnect()
可以避免重复计数,但我首先尝试理解这种默认行为。每当有人订阅可观察到的内容时,它就开始发出数字序列。我明白了。因此,当Subscriber 1
连接时,它开始释放项。Subscriber 2
马上就连接起来,为什么它没有得到值呢?
从可观察的角度来看,我就是这样理解的:
但这不是它的工作方式。就好像它们是一个单独的可观测的物体。这让我很困惑,为什么他们不把这些东西给所有的订户呢?
奖金:
publish().autoConnect()
是如何解决这个问题的?让我们把它拆开。publish()
给了我一个可观察到的连接。一个可连接的可观测就像一个可观察的规则,但你可以告诉它何时连接。然后我继续告诉它通过调用autoConnect()
立即连接
这样做..。我不是得到了和我开始一样的东西吗?普通的可观察到的。操作符似乎相互取消.
我可以闭嘴用publish().autoconnect()
。但我想了解更多关于可观察性是如何工作的。
谢谢!
发布于 2017-01-28 23:33:02
这是因为事实上,这是两个独立的观测数据。当您调用subscribe()
时,它们将被“生成”。因此,您所提供的步骤是不正确的,因为步骤3和步骤4只是1&2,但是在不同的可观察范围内。
但是您可以看到它们是11,12,2,2,2,因为日志记录是在线程上进行的。如果您要删除observeOn()
部分,那么您将以一种相互交织的方式看到排放。若要查看以下运行代码,请执行以下操作:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
Observable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation());
//.observeOn(single);
dataStream.subscribe(i -> System.out.println("1 " + Thread.currentThread().getName() + " " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + Thread.currentThread().getName() + " " + (i - l)));
Thread.sleep(1000);
}
输出,至少在我运行时是这样的(注意线程名):
1 RxComputationThreadPool-1 135376988
2 RxComputationThreadPool-2 135376988
1 RxComputationThreadPool-1 135486815
2 RxComputationThreadPool-2 135537383
1 RxComputationThreadPool-1 135560691
2 RxComputationThreadPool-2 135617580
如果应用observeOn()
,它将变成:
1 RxSingleScheduler-1 186656395
1 RxSingleScheduler-1 187919407
1 RxSingleScheduler-1 187923753
2 RxSingleScheduler-1 186656790
2 RxSingleScheduler-1 187860148
2 RxSingleScheduler-1 187864889
正如您正确指出的,要获得所需的东西,需要publish().refcount()
或简单的share()
(它是一个别名)操作符。
这是因为publish()
创建了一个ConnectableObservable
,它在通过connect()
方法被告知之前不会开始释放条目。在这种情况下,如果您这样做:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
ConnectableObservable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation())
.observeOn(single)
.publish();
dataStream.subscribe(i -> System.out.println("1 " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + (i - l)));
Thread.sleep(1000);
dataStream.connect();
Thread.sleep(1000);
}
您将注意到,在第一秒钟(第一次Thread.sleep()
调用),什么都不会发生,就在dataStream.connect()
被调用之后,排放就会发生。
refCount()
接收一个ConnectableObservable,并通过计算当前订阅的订户数量来隐藏调用connect()
的需要。它所做的是在第一次订阅时,它调用connect()
,在最后一次取消订阅之后,从原始的可观察到的订阅中取消订阅。
至于publish().autoConnect()
的相互取消,之后您确实会得到一个可观察的,但是它有一个特殊的属性,比如最初的可观察性在因特网上执行一个API调用(持续10秒),当您使用它而不使用share()
时,您将得到与在这10秒内有订阅一样多的对服务器的并行查询。另一方面,使用share()
,您将只有一个电话。
如果一个可观察的共享完成其工作非常快(如just(1,2,3)
),您将不会看到它的任何好处。
autoConnect()
/refCount()
为您提供了一个可观察的中间值,您可以订阅它,而不是原始的可观察到的。
如果你对这本书感兴趣的话:基于RxJava的反应性编程
发布于 2017-01-29 19:27:29
定期(寒冷)可观察的
Observable
的核心是subscribe
函数。每次新观察者订阅时,都会将其作为参数传递给此函数。这个函数所做的是将数据输入到单个观察者中。它通过调用observer.onNext
方法来做到这一点。它可能会立即这样做(就像just
那样),或者通过一些调度程序(例如。interval
),或者从后台线程或回调(例如。通过启动某个异步任务)。
我在上面突出显示了单字,因为这是该函数在调用时唯一知道的观察者。如果您多次订阅这样一个可观察到的功能,则会为每个订阅者调用它的subscribe
函数。
像这样的数据源称为冷可观测。
调度器
应用subscribeOn
运算符在subscribe
调用和原始可观测的subscribe
函数之间添加中间步骤。您不再直接调用它,而是通过指定的调度程序来调度您的调用。
observeOn
向观察者的所有onNext
调用添加了类似的中间步骤。
在您的示例中,subscribe
函数被调用两次,即生成两次数据序列。调用是通过多线程io
调度程序调度的,因此这些调用不是在主线程上,而是在其他两个线程上,几乎同时发生。两个线程都开始调用两个订阅服务器的onNext
方法。请记住,每个线程只知道自己的订阅服务器。onNext
调用由mainThread
调度程序调度,它是单线程的,也就是说,它们不能同时发生,但需要以某种方式排队。严格地说,不能保证这些调用的顺序,。它取决于各种因素,并且是具体实施的。尝试将just
替换为interval
(这将在消息之间引入延迟),您将看到消息将以不同的顺序到达。
热可观测
publish
运算符使您的可观察到的热,也就是可连接。它将中间步骤添加到subscribe
函数(这只被调用一次)和onNext
方法中--这些步骤被传播到所有订阅的可观测值。换句话说,它允许多个订阅者共享单个订阅。
准确地说,subscribe
函数是在调用connect
方法时调用的。有两个操作符为您自动调用connect
:
autoConnect
调用connect
方法。不过,它从不断开连接。refCount
在第一个订阅者进来时调用connect
,当最后一个订户取消订阅时会自动断开连接。当新的订阅者进来时,它将重新连接(再次调用subscribe
函数)。publish().refCount()
是流行的组合,所以它有捷径:share()
。
对于您的教育,请尝试使用下面的代码和不含share
的代码
Observable<Long> dataStream = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.share();
System.out.println("subscribing A");
dataStream.subscribe(v -> System.out.println("A got " + v));
TimeUnit.MILLISECONDS.sleep(150);
System.out.println("subscribing B");
dataStream.subscribe(v -> System.out.println("B got " + v));
TimeUnit.SECONDS.sleep(1);
对原始问题的回答
1)冷观测总是处理单用户问题。所以你的时间图应该是这样的:
subscribed first subscriber
[SUBSCRIBER: 1][ITEMS TO EMIT: 1,2,3]
subscribed second subscriber
[SUBSCRIBER: 1][ITEMS TO EMIT: 1,2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 1,2,3]
emit "1" to subscriber 1
[SUBSCRIBER: 1][ITEMS TO EMIT: 2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 1,2,3]
emit "1" to subscriber 2
[SUBSCRIBER: 1][ITEMS TO EMIT: 2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 2,3]
...
虽然订单没有保证,因为多线程比赛。
( 2) publish
和autoConnect
不相互取消。他们只会加。
dataSource = ...;
dataSourceShared = dataSource.publish().autoConnect();
现在,当您订阅多个dataSourceShared
订阅者时,这只会导致对原始dataSource
的一个订阅。也就是说,您不必为每个新订户发出新的消息系列。
https://stackoverflow.com/questions/41915738
复制相似问题