假设我有一个事件发射数据源,我想将其转换为反应性流。数据源被资源绑定(例如,定期发送更新状态的套接字),因此我希望共享该资源的单个订阅。使用replay (对于新订阅者立即获得当前值)和refCount操作符似乎非常适合于此。例如,这是他的MyDataProvider单例的样子:
private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
// Open my resource here and emit data into observable
})
.doOnDispose(() -> {
// Close my resource here
})
.replay(1)
.refCount();
public Observable<MyData> getMyDataObservable() {
return myDataObservable;
}但是,现在假设我有另一个数据源,它需要第一个数据源的结果来计算它自己的值:
private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
.flatMap(myData -> {
// Call another data source and return the result here
})
public Observable<AnotherData> getAnotherDataObservable() {
return anotherDataObservable;
}在这里我的装置开始崩溃。第一个可观测到的多播只能在refCount运算符之前工作。在那之后,一切又变成了单播。这意味着,如果对anotherDataProvider进行了两个单独的订阅,flatMap运算符将被调用两次。我看到了两种解决办法,但我不喜欢这两种方法:
1.在组播发生前先进行可观测的变换。
最简单的解决方法似乎是在进行多播操作之前,将myDataObservable的单播变体保存到某个位置,然后在anotherDataObservable中执行多播操作。然而,如果这两个可观察到的数据位于不同的模块中,则此解决方案将使代码变得非常不优雅,要求MyDataProvider公开两个似乎返回相同数据的不同可观测值。
2.只需使用重复的多播运营商。
第二个解决办法似乎是在replay和refCount操作符中再次应用anotherDataObservable。但是这造成了低效率,因为myDataObservable中的第一个多播运营商已经被应用了,但是现在除了浪费内存和CPU周期之外,什么也不做。
这两个解决方案还涉及到AnotherDataProvider与MyDataProvider的耦合。如果将来不再需要MyDataProvider更改和多播,我还必须更新AnotherDataProvider以删除多播操作符。
怎样才能更好地解决这个问题呢?我能不能更好地解决这个问题呢?
发布于 2018-06-01 08:33:47
关于第一种方法,在当前的设置中,您的anotherDataObservable使用myDataObservable,据我所知,它们在逻辑上是耦合的,因为它们使用相同的源。因此,您需要为它们提供一些基本的共享逻辑。我将它提取到一个公共模块中,该模块将公开可观察到的单播版本,然后让myDataObservable和anotherDataObservable在不同的模块中使用它--每个模块都添加多播逻辑。
另一个选项是有一个类来监视您的资源,方法是像在myDataObservable中那样订阅资源,在onNext中进行处理,并使用主题发布映射的结果,也就是说,如果您希望始终能够访问最后发布的值,那么使用另一个主题来发布原始结果。客户端将订阅该主题,并将获得仅在监视类中计算一次的映射值或原始值。
请记住,在订阅之前,要给你的主题添加背压策略。
如果这些选项不适合您,那么考虑一下避免多次调用flatMap是否真的很重要?您的代码非常简单,而且是一个重要的度量标准。如果flatMap不重,您可以让它多次运行。
https://stackoverflow.com/questions/50419068
复制相似问题