我正在学习反应性编程,但我有一个疑问,我正在尝试执行以下代码,但我不明白为什么在订阅之前添加delayElements,没有打印元素
如果没有delayElements,则会显示值
import reactor.core.publisher.Flux;
import java.time.Duration;
public class PlayWithFlux {
public static void main(String[] args){
Flux<Integer> flux = Flux.just(1, 2, 3);
flux.log().subscribe(System.out::println);
}
}
使用delayElements时,不会显示元素
import reactor.core.publisher.Flux;
import java.time.Duration;
public class PlayWithFlux {
public static void main(String[] args){
Flux<Integer> flux = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1));
flux.log().subscribe(System.out::println);
}
}
对这种行为有什么解释吗?
发布于 2022-08-26 07:26:29
这里的问题是delayElements
按它说的做,它在发射元素之前会延迟。
调用subscribe
不是阻塞事件,而是异步或“触发并忘记”操作符。
因此,实际上正在发生的是调用subscribe
,然后主流继续并结束程序,然后才有时间发出任何值。
要快速解决这个问题,只需延迟程序的结束,例如调用Thread.sleep()
public static void main(String[] args){
Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.subscribe(System.out::println);
// Delay program shutdown so we can watch the elements get printed
Thread.sleep(5000);
}
发布于 2022-08-27 01:44:14
如果您只使用Flux.just
和subscribe
,那么一切都在主线程上执行,所以程序只有在Flux
完成后才会终止。
一旦delayElements
进入图片,Flux
就不再在主线程上执行。在内部,delayElements
切换到一个所谓的并行调度程序,它基本上是一个线程池.由于现在Flux
运行在不同的线程上,主线程将继续执行,而无需等待Flux
完成,程序将在Flux
完成之前终止。
与其使用Thread.sleep
,不如在本例中使用block...
方法之一。这样,程序就可以在Flux
完成后立即完成:
public static void main(String[] args){
Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.doOnNext(System.out::println)
.blockLast();
}
https://stackoverflow.com/questions/73494306
复制