对于以下代码的输出,我有一些疑问:
Flux.just("a", "b", "c", "d")
.log(null, Level.INFO, true) // line: 18
.flatMap(value ->
Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO, true) // line: 21
.take(3)
.log(null, Level.INFO, true) // line: 23
.subscribe(x ->
System.out.println("Thread: " + Thread.currentThread().getName() +
" , " + x));
Thread.sleep(1000 * 1000);输出:
1. 11:29:11 [main] INFO - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) Flux.log(App.java:18)
2. 11:29:11 [main] INFO - onSubscribe(FluxFlatMap.FlatMapMain) Flux.log(App.java:21)
3. 11:29:11 [main] INFO - onSubscribe(FluxTake.TakeSubscriber) Flux.log(App.java:23)
4. 11:29:11 [main] INFO - request(unbounded) Flux.log(App.java:23)
5. 11:29:11 [main] INFO - request(unbounded) Flux.log(App.java:21)
6. 11:29:11 [main] INFO - | request(2) Flux.log(App.java:18)
7. 11:29:11 [main] INFO - | onNext(a) Flux.log(App.java:18)
8. 11:29:11 [main] INFO - | onNext(b) Flux.log(App.java:18)
9. 11:29:11 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
10. 11:29:11 [elastic-2] INFO - onNext(A) Flux.log(App.java:23)
11. Thread: elastic-2 , A
12. 11:29:11 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13. 11:29:11 [main] INFO - | onNext(c) Flux.log(App.java:18)
14. 11:29:11 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
15. 11:29:11 [elastic-3] INFO - onNext(B) Flux.log(App.java:23)
16. Thread: elastic-3 , B
17. 11:29:11 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
18. 11:29:11 [elastic-3] INFO - | onNext(d) Flux.log(App.java:18)
19. 11:29:11 [elastic-3] INFO - | onComplete() Flux.log(App.java:18)
20. 11:29:11 [elastic-3] INFO - onNext(C) Flux.log(App.java:21)
21. 11:29:11 [elastic-3] INFO - onNext(C) Flux.log(App.java:23)
22. Thread: elastic-3 , C
23. 11:29:11 [elastic-3] INFO - cancel() Flux.log(App.java:21)
24. 11:29:11 [elastic-3] INFO - onComplete() Flux.log(App.java:23)
25. 11:29:11 [elastic-3] INFO - | cancel() Flux.log(App.java:18)问题:每个问题都是关于输出中的特定行的(不是代码中的一行)。我也把我的答案加入其中一些,但我不确定我是正确的。
unbounded数量。那么为什么会发生这样的事件:request(unbounded)正在下降而不是上升呢?我的回答是:unbounded金额的请求将上升到take,然后take再次发送。flatMap发送cancel信号。为什么take不发送它呢?最后一个问题:输出中有一个以上的终端信号。这不是一种反应流规范的累积吗?
发布于 2018-02-17 10:30:11
在这种情况下,只会产生一个终端信号。
Flux.just("a", "b", "c", "d")
.log(null, Level.INFO, true) // line: 18
.flatMap(value ->
Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO, true) // line: 21
.take(3)
.log(null, Level.INFO, true) // line: 23
.subscribe(x ->
System.out.println("Thread: " + Thread.currentThread().getName() +
" , " + x), t -> {}, () -> System.out.println("Completed ""Only Once"));这里的棘手之处在于,每个3号反应堆操作人员都有自己的生命周期,他们都遵循相同的规则--发射onComplete通知下游操作人员不再有数据了。
由于您有.log()运算符和三个不同的点,因此您将从.just、.flatMap和.take(3)观察到三个独立的onComplete信号。
首先,您将从onComplete中看到.just,因为.flatMap的默认行为是'ok,让我们尝试请求第一个concurrency元素,然后看看它是如何进行的‘,因为.just可能只生成4个元素(在您的例子中是并发级别),请求请求它将发出2个onNext,在两个request(1)之后,您将看到onComplete。反过来,发出的onComplete让.flatMap知道,当4个分层流发出它们的.onComplete信号时,它将被允许向下游发射自己的onComplete。下游是.take(3)操作符,在前三个元素之后,它将发出自己的onComplete信号,而无需等待上游的onComplete。由于有.log操作符后的.take,这个信号也将被记录下来。最后,在您的流程中,您有3个独立的日志操作符,它将从3个独立的操作符中记录3个独立的onComplete,但是尽管如此,最终的终端.subscribe将只从第一个操作符接收到流的一个onComplete。
关于.take行为的小更新
.take的核心思想是接受元素,直到剩余的计数得到满足为止。由于上游可能产生比所要求的更多的结果,我们需要有一种机制来防止发送更多的数据。反应流规范提供给我们的机制之一是通过Subscription进行协作。订阅有两种主要方法-- request --显示需求和cancel --表明即使请求的需求没有得到满足,也不再需要数据。对于.take算子,初始需求是Long.MAX_VALUE,它认为需求是无限的。因此,停止使用潜在的不定式数据流的唯一方法是使用取消订阅,或者换句话说,取消订阅。
希望它能帮助你:)
https://stackoverflow.com/questions/48840149
复制相似问题