首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >流量API中的subscription.request(n)是如何在任意n值下执行背压的?

流量API中的subscription.request(n)是如何在任意n值下执行背压的?
EN

Stack Overflow用户
提问于 2020-01-01 19:27:48
回答 2查看 307关注 0票数 4

我在玩Flow API,到目前为止,我知道request()方法是用于背压的。大多数文章都指出,这类似于控制消费速度。

但是,我看到的几乎每一个示例代码都将值1传递给request()方法,比如subscription.request(1)。但我不太明白request()方法是如何控制消费速度的。

我试图通过向发行者发送一组项目并打印线程名来运行测试,而且似乎每个onNext()都运行在同一个工作线程上,不管我使用的是request(1)还是request(50)

代码语言:javascript
运行
复制
@Override
public void onNext(T item) {
   System.out.println(Thread.getCurrent().getName());
   Thread.sleep(5000);
   subscription.request(50);
}

如果onNext()在不同的线程中运行,我可以理解传递给request(n)n值将如何影响并行处理这些项的速度(在n线程中运行)。但在我的测试中,情况似乎并非如此,因为它们都以相同的线程名运行。

在这种情况下,当request(1)request(50)仍然在同一个线程上一个接一个地运行时,它们之间有什么区别呢?那么,消费率就不会保持不变吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-01-02 10:54:40

n中的request指示订阅者可以接受多少个元素,并限制上游Publisher可以发出多少项。因此,这个生成器的减速不是每个单独的项目,而是每个批生成的平均时间被使用者的处理时间交织在一起。

onNext是以序列化的方式执行的,并且取决于上游,也取决于同一线程。因此,在那里调用request通常表示它可以调用相同的onNext,在当前调用结束之后调用,如果可用,则使用下一个值。也就是说,调用Thread.sleep将推迟下一次对onNext的调用。

通常,没有理由将requestonNext中称为终端用户,因为它是与其直接上游的Publisher同步运行的,并且单个request(Long.MAX_VALUE)与重复request(1)之间没有实际区别。

调用request的少数几个原因之一是,如果onNext分叉了异步工作本身,并且只有在该工作结束时才会请求更多的项:

代码语言:javascript
运行
复制
Executor executor = ...

Subscription upstream;

@Override public void onSubscribe(Subscription s) {
    this.upstream = s;
    executor.execute(() -> {
       Thread.sleep(5000);
       s.request(1);
       return null; // Callable
    });
}

@Override public void onNext(T item) {
    System.out.println("Start onNext");
    executor.execute(() -> {
       System.out.println("Run work");
       Thread.sleep(5000);
       System.out.println("Request more work");
       upstream.request(1);
       return null; // Callable
    });
    System.out.println("End onNext");
}

使用此设置,上游将调用onNext一次,并且只有在执行器执行的任务发出下一个请求时才调用它。但是请注意,除非Publisher从专用线程发出,否则上面的示例将最终将onNext调用拖到executor的线程上。

票数 1
EN

Stack Overflow用户

发布于 2020-01-02 18:43:17

onNext()的调用不应该并行运行。它们可以从不同的线程(取决于实现)运行,但总是按顺序运行。但是即使是顺序地,它们也可以以比用户所能处理的更高的速率被调用。因此,只有当用户呼叫有容纳n个传入项目的空间时,才请求(N)。通常它只有一个值的空间,所以当这个变量空闲时,它调用请求(1)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59555464

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档