我已经了解到平面映射转换是异步的,在本例中,我在lambda定义中打印线程的名称。它打印的线程与订阅源代码的线程相同。根据我的理解,它应该打印一个不同的线程名称-而不是在订阅的源代码中,因为这个转换必须在不同的线程中执行。
Flux.just(1, -2, 3, 4, -5, 6)
.flatMap(element -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " element: " + element);
return Flux.just(element);
})
.subscribe()发布于 2019-07-29 21:04:32
它是异步的这一事实并不一定意味着它是并行运行的,这似乎是您在这里所期望的。但是,您可以将Flux转换为ParallelFlux并指定并行调度程序:
Flux.just(1, -2, 3, 4, -5, 6)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element
-> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ " element: " + element);
return Flux.just(element);
})
.subscribe();
Thread.currentThread().join(); //Just a hack to keep the program alive.另一方面,如果您不希望它并行运行,而只是在主线程的单独线程上运行,那么不需要将其转换为并行Flux -只需提供一个.subscribeOn(Schedulers.elastic())调用或类似的调用。
发布于 2019-08-01 01:39:04
不,它不会打印不同的线程名称。默认情况下,Reactive在单个线程上运行-订阅发生在该线程上。在本例中,订阅发生在调用线程上,因此,所有元素都在该线程上发出,并在该线程上进一步处理。
让我们先来了解一下。flatMap是如何工作的:
对于每个上游事件,您可以从it.
flatMap创建一个流,然后热切地订阅每个内部流(这就是它们甚至被触发的原因)。(在这里意味着它一次订阅所有内部流,而不等待其他内部流完成(与concatMap).
请注意,每个内部流毕竟是一个流。除非您告诉它使用不同的线程,否则每个线程都将在同一个线程上发出(调用线程,在您的情况下发生了什么)。如果您想并行处理它们,您可以为它们中的每一个执行一个.subscribeOn(Schedulers.elastic) (或schedulers.parallel()):
Flux.just(1, -2, 3, 4, -5, 6)
.flatMap(
element -> {
//this will always print the same thread - the calling thread basically
System.out.println(Thread.currentThread().getName() + " element: " + element);
return Mono.just(element)
.subscribeOn(Schedulers.parallel())
.doOnNext(
a ->
System.out.println(
a + " emitted on thread: " + Thread.currentThread().getName()));
})
.subscribe();.flatMap并不关心元素进入哪个线程,以及它离开哪个线程--它所要做的就是订阅内部流,并在事件到来时合并它们。记住,每一个内在的流都是一个“承诺”。它最终将完成(使用onComplete)信号,但您不知道何时完成。flatMap仍然不在乎。然而,concatMap在订阅下一个流之前,会等待一个流完成。这就是它如何维护原始流的顺序。
在这里阅读更多(我自己在flatMap上的文章):
https://medium.com/swlh/understanding-reactors-flatmap-operator-a6a7e62d3e95
发布于 2019-07-29 20:59:55
使用flatMap不会影响执行它的线程。可以使用subscribeOn来影响将在其上执行的线程:
Flux.just(1, -2, 3, 4, -5, 6)
.flatMap(element ->
{
try { Thread.sleep(1000);
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +
" element: " + element);
return Flux.just(element);
})
.subscribeOn(Schedulers.elastic())
.subscribe();根据您希望行为是什么,您可以使用以下任何一种- Schedulers.elastic()、Schedulers.single()、Schedulers.parallel()、Schedulers.immeadiate()
https://stackoverflow.com/questions/57251171
复制相似问题