假设我有一个事件发射数据源,我想将其转换为反应性流。数据源被资源绑定(例如,定期发送更新状态的套接字),因此我希望共享该资源的单个订阅。使用replay (对于新订阅者立即获得当前值)和refCount操作符似乎非常适合于此。例如,这是他的MyDataProvider单例的样子:
private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
// Open my resource here and emit data into observable
})
.doOnDispose(() -> {
// Close my resource here
})
.replay(1)
.refCount();
public Observable<MyData> getMyDataObservable() {
return myDataObservable;
}但是,现在假设我有另一个数据源,它需要第一个数据源的结果来计算它自己的值:
private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
.flatMap(myData -> {
// Call another data source and return the result here
})
public Observable<AnotherData> getAnotherDataObservable() {
return anotherDataObservable;
}在这里我的装置开始崩溃。第一个可观测到的多播只能在refCount运算符之前工作。在那之后,一切又变成了单播。这意味着,如果对anotherDataProvider进行了两个单独的订阅,flatMap运算符将被调用两次。我看到了两种解决办法,但我不喜欢这两种方法:
1.在组播发生前先进行可观测的变换。
最简单的解决方法似乎是在进行多播操作之前,将myDataObservable的单播变体保存到某个位置,然后在anotherDataObservable中执行多播操作。然而,如果这两个可观察到的数据位于不同的模块中,则此解决方案将使代码变得非常不优雅,要求MyDataProvider公开两个似乎返回相同数据的不同可观测值。
2.只需使用重复的多播运营商。
第二个解决办法似乎是在replay和refCount操作符中再次应用anotherDataObservable。但是这造成了低效率,因为myDataObservable中的第一个多播运营商已经被应用了,但是现在除了浪费内存和CPU周期之外,什么也不做。
这两个解决方案还涉及到AnotherDataProvider与MyDataProvider的耦合。如果将来不再需要MyDataProvider更改和多播,我还必须更新AnotherDataProvider以删除多播操作符。
怎样才能更好地解决这个问题呢?我能不能更好地解决这个问题呢?
发布于 2018-06-01 08:33:47
关于第一种方法,在当前的设置中,您的anotherDataObservable使用myDataObservable,据我所知,它们在逻辑上是耦合的,因为它们使用相同的源。因此,您需要为它们提供一些基本的共享逻辑。我将它提取到一个公共模块中,该模块将公开可观察到的单播版本,然后让myDataObservable和anotherDataObservable在不同的模块中使用它--每个模块都添加多播逻辑。
另一个选项是有一个类来监视您的资源,方法是像在myDataObservable中那样订阅资源,在onNext中进行处理,并使用主题发布映射的结果,也就是说,如果您希望始终能够访问最后发布的值,那么使用另一个主题来发布原始结果。客户端将订阅该主题,并将获得仅在监视类中计算一次的映射值或原始值。
请记住,在订阅之前,要给你的主题添加背压策略。
如果这些选项不适合您,那么考虑一下避免多次调用flatMap是否真的很重要?您的代码非常简单,而且是一个重要的度量标准。如果flatMap不重,您可以让它多次运行。
发布于 2018-05-18 23:54:36
您可以使用“发布().refCount()”串列来允许共享单个订阅服务器。由于它们经常被使用,所以它们有一个别名share()。
您还可以使用ConnectableObservable。但是在使用ConnectableObservables重放时要小心。
如果您将重放运算符应用于可观察的可观测对象,然后将其转换为可连接的可观测对象,则所得到的可连接可观测到的结果将始终向未来的任何观察者发出相同的完整序列,即使是那些在可连接可观测到的可观测到的可观察到的已开始向其他订阅的观察者发出项之后订阅的观察者。如文件所述:
发布于 2018-06-05 14:12:59
您可以拆分单播和多播流,但它是多余的。我认为第二种方法更好,顺便说一句,replay和refcount运算符实际上是在做事情,而不是浪费。
当您调用启用多播的Observable时,您正在将myDataObservable的replay(1)转换为ConenctableObservable。
然后,在使用refcount()时在内部订阅它,这也为后续订阅提供了一个点;在此之后,一切都是单播。
您真正想要在anotherDataObservable中实现的是相同的,所以,就像在myDataObservable中一样。
https://stackoverflow.com/questions/50419068
复制相似问题