首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >反应堆信号的基本问题

反应堆信号的基本问题
EN

Stack Overflow用户
提问于 2018-02-17 10:06:52
回答 1查看 1.5K关注 0票数 1

对于以下代码的输出,我有一些疑问:

代码语言:javascript
运行
复制
Flux.just("a", "b", "c", "d")
        .log(null, Level.INFO, true) // line: 18
        .flatMap(value ->
                Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
        .log(null, Level.INFO, true) // line: 21
        .take(3)
        .log(null, Level.INFO, true) // line: 23
        .subscribe(x -> 
             System.out.println("Thread: " + Thread.currentThread().getName() +
                               " , " + x));

Thread.sleep(1000 * 1000);

输出:

代码语言:javascript
运行
复制
1. 11:29:11 [main] INFO  - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)    Flux.log(App.java:18)
2. 11:29:11 [main] INFO  - onSubscribe(FluxFlatMap.FlatMapMain)     Flux.log(App.java:21)
3. 11:29:11 [main] INFO  - onSubscribe(FluxTake.TakeSubscriber)     Flux.log(App.java:23)
4. 11:29:11 [main] INFO  - request(unbounded)   Flux.log(App.java:23)
5. 11:29:11 [main] INFO  - request(unbounded)   Flux.log(App.java:21)
6. 11:29:11 [main] INFO  - | request(2)     Flux.log(App.java:18)
7. 11:29:11 [main] INFO  - | onNext(a)  Flux.log(App.java:18)
8. 11:29:11 [main] INFO  - | onNext(b)  Flux.log(App.java:18)
9. 11:29:11 [elastic-2] INFO  - onNext(A)   Flux.log(App.java:21)
10. 11:29:11 [elastic-2] INFO  - onNext(A)  Flux.log(App.java:23)
11. Thread: elastic-2 , A
12. 11:29:11 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13. 11:29:11 [main] INFO  - | onNext(c)     Flux.log(App.java:18)
14. 11:29:11 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:21)
15. 11:29:11 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:23)
16. Thread: elastic-3 , B
17. 11:29:11 [elastic-3] INFO  - | request(1)   Flux.log(App.java:18)
18. 11:29:11 [elastic-3] INFO  - | onNext(d)    Flux.log(App.java:18)
19. 11:29:11 [elastic-3] INFO  - | onComplete()     Flux.log(App.java:18)
20. 11:29:11 [elastic-3] INFO  - onNext(C)  Flux.log(App.java:21)
21. 11:29:11 [elastic-3] INFO  - onNext(C)  Flux.log(App.java:23)
22. Thread: elastic-3 , C
23. 11:29:11 [elastic-3] INFO  - cancel()   Flux.log(App.java:21)
24. 11:29:11 [elastic-3] INFO  - onComplete()   Flux.log(App.java:23)
25. 11:29:11 [elastic-3] INFO  - | cancel()     Flux.log(App.java:18)

问题:每个问题都是关于输出中的特定行的(不是代码中的一行)。我也把我的答案加入其中一些,但我不确定我是正确的。

  1. 订阅时,订阅操作请求元素的unbounded数量。那么为什么会发生这样的事件:request(unbounded)正在下降而不是上升呢?我的回答是:unbounded金额的请求将上升到take,然后take再次发送。
  2. flatMap发送cancel信号。为什么take不发送它呢?

最后一个问题:输出中有一个以上的终端信号。这不是一种反应流规范的累积吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-02-17 10:30:11

在这种情况下,只会产生一个终端信号。

代码语言:javascript
运行
复制
Flux.just("a", "b", "c", "d")
            .log(null, Level.INFO, true) // line: 18
            .flatMap(value ->
                    Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
            .log(null, Level.INFO, true) // line: 21
            .take(3)
            .log(null, Level.INFO, true) // line: 23
            .subscribe(x ->
                    System.out.println("Thread: " + Thread.currentThread().getName() +
 " , " + x), t -> {}, () -> System.out.println("Completed ""Only Once"));

这里的棘手之处在于,每个3号反应堆操作人员都有自己的生命周期,他们都遵循相同的规则--发射onComplete通知下游操作人员不再有数据了。

由于您有.log()运算符和三个不同的点,因此您将从.just.flatMap.take(3)观察到三个独立的onComplete信号。

首先,您将从onComplete中看到.just,因为.flatMap的默认行为是'ok,让我们尝试请求第一个concurrency元素,然后看看它是如何进行的‘,因为.just可能只生成4个元素(在您的例子中是并发级别),请求请求它将发出2个onNext,在两个request(1)之后,您将看到onComplete。反过来,发出的onComplete.flatMap知道,当4个分层流发出它们的.onComplete信号时,它将被允许向下游发射自己的onComplete。下游是.take(3)操作符,在前三个元素之后,它将发出自己的onComplete信号,而无需等待上游的onComplete。由于有.log操作符后的.take,这个信号也将被记录下来。最后,在您的流程中,您有3个独立的日志操作符,它将从3个独立的操作符中记录3个独立的onComplete,但是尽管如此,最终的终端.subscribe将只从第一个操作符接收到流的一个onComplete

关于.take行为的小更新

.take的核心思想是接受元素,直到剩余的计数得到满足为止。由于上游可能产生比所要求的更多的结果,我们需要有一种机制来防止发送更多的数据。反应流规范提供给我们的机制之一是通过Subscription进行协作。订阅有两种主要方法-- request --显示需求和cancel --表明即使请求的需求没有得到满足,也不再需要数据。对于.take算子,初始需求Long.MAX_VALUE,它认为需求是无限的。因此,停止使用潜在的不定式数据流的唯一方法是使用取消订阅,或者换句话说,取消订阅。

希望它能帮助你:)

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48840149

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档