首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >防止超时后flux.bufferTimeout溢出

防止超时后flux.bufferTimeout溢出
EN

Stack Overflow用户
提问于 2019-01-11 17:32:32
回答 1查看 2.2K关注 0票数 8

我对反应性编程和反应堆还比较陌生。在这种情况下,我希望在流中bufferTimeout值,同时控制它(没有无限制的请求),因此我可以手动请求批值。

下面的示例说明了这一点:

代码语言:javascript
运行
复制
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

Flux<Object> flux = Flux.generate(sink -> {
    try {
        sink.next(queue.poll(10, TimeUnit.DAYS));
    }
    catch (InterruptedException e) {}
});

BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
    protected void hookOnSubscribe(Subscription subscription) {
        // Don't request unbounded
    }

    protected void hookOnNext(List<Object> value) {
        System.out.println(value);
    }
};

flux.subscribeOn(parallel())
        .log()
        .bufferTimeout(10, ofMillis(200))
        .subscribe(subscriber);

subscriber.request(1);

// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);

// Wait for timeout, expect [1, 2, 3, 4, 5] to be printed
Thread.sleep(500); 

// Offer more values
queue.offer(6);
queue.offer(7);
queue.offer(8);
queue.offer(9);
queue.offer(10);
Thread.sleep(1000);

这是输出:

代码语言:javascript
运行
复制
[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[ INFO] (main) request(10)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-1) onNext(4)
[ INFO] (parallel-1) onNext(5)
[1, 2, 3, 4, 5]
[ INFO] (parallel-1) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-1) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-1) onNext(10)
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests

我实际上是预料到了这一点,因为我理解缓冲区订阅者将向上游请求10个值,它不知道超时,并且将产生所有这些值。当超时完成后,就会完成一个和唯一的请求,因此以后提供的值仍然会产生并溢出。

我想知道是否有可能防止在超时结束后生成剩余的值,或者在不失去控制的情况下缓冲它们。我试过:

  • limitRate(1)bufferTimeout之前,试图使缓冲区请求值“按需”。它确实一个接一个地请求,但请求10次,因为缓冲区请求10个值。
  • onBackpressureBuffer(10),因为问题基本上是背压的定义,如果我正确的话。尝试缓冲超时请求中的溢出值,但这会请求无界值,这是我想避免的。

看起来我需要实现另一个bufferTimeout实现,但是我被告知编写发布者是很困难的。我是不是遗漏了什么?还是我做了反应错误?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-01-18 13:16:44

通过实现我自己的订阅服务器解决了这个问题:

https://gist.github.com/hossomi/5edf60acb534a16c025e12e4e803d014

它只请求所需的值,并在没有活动请求的情况下缓冲接收到的值。缓冲区是无界的,因此可能需要谨慎使用或更改它。

很可能不像标准的反应堆订户那么可靠,而是为我工作。欢迎提出建议!

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54151419

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档