我有一个Observable,它发出许多对象,我希望使用window
或buffer
操作对这些对象进行分组。但是,我希望能够使用自定义条件,而不是指定count
参数来确定窗口中应该有多少个对象。
例如,假设observable正在发出如下所示的Message
类的实例。
class Message(
val int size: Int
)
我希望根据消息实例的size
变量来缓冲或窗口消息实例,而不仅仅是它们的计数。例如,获取总大小不超过5000的消息窗口。
// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)
有什么简单的方法可以做到这一点吗?
发布于 2018-10-14 02:06:07
首先,我必须承认,我不是RxJava专家。我只是觉得你的问题很有挑战性,并试图找到一个解决方案。
有一个带有参数boundaryIndicator
的window()
函数。如果达到窗口大小,则必须创建一个发出项的Publisher
/ Flowable
。
在本例中,我创建了一个用作boundaryIndicator
的对象windowManager
。在onNext
回调中,我调用了windowManager
,并让它有机会打开一个新窗口。
val windowManager = object {
lateinit var emitter: FlowableEmitter<Unit>
var windowSize: Long = 0
fun createEmitter(emitter: FlowableEmitter<Unit>) {
this.emitter = emitter
}
fun openWindowIfRequired(size: Long) {
windowSize += size
if (windowSize > 5) {
windowSize = 0
emitter.onNext(Unit)
}
}
}
val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)
Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
it.doOnNext {
windowManager.openWindowIfRequired(it)
}.doOnSubscribe {
println("Open window")
}.doOnComplete {
println("Close window")
}.subscribe {
println(it)
}
}
https://stackoverflow.com/questions/52791691
复制相似问题