首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >具有自定义计数标准的RxJava缓冲区/窗口

具有自定义计数标准的RxJava缓冲区/窗口
EN

Stack Overflow用户
提问于 2018-10-13 17:53:10
回答 1查看 818关注 0票数 2

我有一个Observable,它发出许多对象,我希望使用windowbuffer操作对这些对象进行分组。但是,我希望能够使用自定义条件,而不是指定count参数来确定窗口中应该有多少个对象。

例如,假设observable正在发出如下所示的Message类的实例。

代码语言:javascript
复制
class Message(
   val int size: Int
)

我希望根据消息实例的size变量来缓冲或窗口消息实例,而不仅仅是它们的计数。例如,获取总大小不超过5000的消息窗口。

代码语言:javascript
复制
// Something like this
readMessages()
    .buffer({ message -> message.size }, 5000)

有什么简单的方法可以做到这一点吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-10-14 02:06:07

首先,我必须承认,我不是RxJava专家。我只是觉得你的问题很有挑战性,并试图找到一个解决方案。

有一个带有参数boundaryIndicatorwindow()函数。如果达到窗口大小,则必须创建一个发出项的Publisher/ Flowable

在本例中,我创建了一个用作boundaryIndicator的对象windowManager。在onNext回调中,我调用了windowManager,并让它有机会打开一个新窗口。

代码语言:javascript
复制
val windowManager = object {
    lateinit var emitter: FlowableEmitter<Unit>
    var windowSize: Long = 0

    fun createEmitter(emitter: FlowableEmitter<Unit>) {
        this.emitter = emitter
    }

    fun openWindowIfRequired(size: Long) {
        windowSize += size
        if (windowSize > 5) {
            windowSize = 0
            emitter.onNext(Unit)
        }
    }
}

val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)

Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
    it.doOnNext {
        windowManager.openWindowIfRequired(it)
    }.doOnSubscribe {
        println("Open window")
    }.doOnComplete {
        println("Close window")
    }.subscribe {
        println(it)
    }
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52791691

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档