刚刚开始理解使用反应堆进行反应性编程,我在这里遇到了一个教程中的代码片段-- building-a-chat-application-with-angular-and-spring-reactive-websocket
class ChatSocketHandler(val mapper: ObjectMapper) : WebSocketHandler {
val sink = Sinks.replay<Message>(100);
val outputMessages: Flux<Message> = sink.asFlux();
override fun handle(session: WebSocketSession): Mono<Void> {
println("handling WebSocketSession...")
session.receive()
.map { it.payloadAsText }
.map { Message(id= UUID.randomUUID().toString(), body = it, sentAt = Instant.now()) }
.doOnNext { println(it) }
.subscribe(
{ message: Message -> sink.next(message) },
{ error: Throwable -> sink.error(error) }
);
return session.send(
Mono.delay(Duration.ofMillis(100))
.thenMany(outputMessages.map { session.textMessage(toJson(it)) })
)
}
fun toJson(message: Message): String = mapper.writeValueAsString(message)
}
我理解它的作用,但不明白为什么作者在订阅方法中使用使用者,而不是链接另一个doOnNext(使用者)。即。台词:
.doOnNext { println(it) }
.subscribe(
{ message: Message -> sink.next(message) },
{ error: Throwable -> sink.error(error) }
从反应堆文献中我读到了Flux.subscribe(消费者<?)(超级T>消费者):
订阅此Flux的使用者,它将消耗序列中的所有元素。它将请求一个无界需求(Long.MAX_VALUE)。有关观察和转发传入数据的被动版本,请参阅doOnNext(java.util.function.Consumer)。
然而,我不明白为什么其中一个会选择一个而另一个,在我看来,它们在功能上是相同的。
发布于 2021-05-20 13:21:06
这种区别更多的是常规的,而不是功能性的--区别是副作用与最终消费者的区别。
当反应链执行时,doOnXXX
系列方法是为用户设计的副作用而设计的--日志记录是其中最常见的,但您也可能有度量、分析等,它们需要在每个元素通过时看到一个视图。所有这些的关键是,将其中任何一个作为最终消费者(如上面示例中的println()
)没有多大意义。
相反,subscribe()
使用者应该是“最终消费者”,通常由框架(如Webflux)而不是用户代码调用--因此这种情况是该规则的一个例外。在这种情况下,他们会主动地将这个反应链中的消息传递到另一个接收器进行进一步处理--因此,将其作为“副作用”样式的方法没有多大意义,因为您不希望Flux在此之后继续下去。
(增编:如前所述,反应堆/网络流量的正常方式是让网络流量处理订阅,这不是这里发生的事情。)我还没有详细研究过在没有用户订阅的情况下是否有更明智的方法来实现这一点,但在我的经验中通常是这样的,因此手动调用订阅通常有点代码味。当然,您应该尽可能在自己的代码中避免这种情况。)
https://stackoverflow.com/questions/67619515
复制相似问题