我对反应性编程非常陌生。虽然我非常熟悉函数式编程和kotlin协同器,但我仍然无法理解如何使用反应性编程范例来重构普通嵌套CRUD代码,特别是那些具有嵌套异步操作的代码。
例如,下面是一个基于Java 8 CompletableFuture的简单异步CRUD代码片段
getFooAsync(id)
.thenAccept(foo -> {
if (foo == null) {
insertFooAsync(id, new Foo());
} else {
getBarAsync(foo.bar)
.thenAccept(bar -> {
updateBarAsync(foo, bar);
});
}
});用kotlin协同器重构它非常容易,这使它在不失去异步性的情况下更加可读性。
val foo = suspendGetFoo(id)
if(foo==null) {
suspendInsertFoo(id, Foo())
} else {
val bar = suspendGetBar(foo.bar)
suspendUpdateBar(foo, bar);-
}然而,这样的代码是否适合于反应式编程?
如果是这样的话,给一个Flux<String> idFlux,如何用反应堆3重构它?
将每个CompletableFuture替换为Mono是个好主意吗?
发布于 2019-04-26 08:07:19
这样的代码适合于反应式编程吗?
IMHO、Kotlin协同器更适合这个用例,并产生了更清晰的代码。
但是,您可以在反应性流中这样做。
用Mono替换每个CompletableFuture是个好主意吗?
我发现反应性流非常好地处理了很多异步用例(例如工程反应堆实例)。但是,肯定有一些用例不太适合。因此,我不能推荐用反应性流替换每个CompletableFuture的策略。
然而,当你需要背压的时候,你必须放弃CompletableFuture。
关于使用什么异步模式的决定在很大程度上取决于您正在使用的语言/框架/工具/库,以及您和您的队友对它们的满意程度。如果您使用的库具有良好的Kotlin支持,并且您的团队熟悉Kotlin,那么使用coroutines。反应流也是如此。
给一个
Flux<String> idFlux,如何用反应堆3重构它?
在考虑此用例的反应性流时,请记住以下几点:
null。相反,通常使用空的Mono。(从技术上讲,你也可以使用Mono<Optional<...>>,但在那个时候,你只是在伤害你的大脑,乞求虫子)Mono为空时,不会调用传递给处理onNext信号(例如.map、.flatMap、.handle等)的任何操作符的lambda。记住,您正在处理的是数据流(而不是命令式控制流).switchIfEmpty或.defaultIfEmpty运算符可以在空的Mono上操作。但是,它们不提供else条件。下游运算符不知道流以前是空的(除非从发送给.switchIfEmpty的发布服务器发出的元素很容易识别).flatMap、.flatMapSequential和.concatMap。您将需要使用这些链接对以前异步操作的输出进行操作的异步操作。Mono<Void>。尽管如此,这里还是尝试将您的示例转换为反应堆3(但有一些注意事项):
Mono<Void> updateFoos(Flux<String> idFlux) {
return idFlux // Flux<String>
.flatMap(id -> getFoo(id) // Mono<Foo>
/*
* If a Foo with the given id is not found,
* create a new one, and continue the stream with it.
*/
.switchIfEmpty(insertFoo(id, new Foo())) // Mono<Foo>
/*
* Note that this is not an "else" condition
* to the above .switchIfEmpty
*
* The lambda passed to .flatMap will be
* executed with either:
* A) The foo found from getFoo
* OR
* B) the newly inserted Foo from insertFoo
*/
.flatMap(foo -> getBar(foo.bar) // Mono<Bar>
.flatMap(bar -> updateBar(foo, bar)) // Mono<Bar>
.then() // Mono<Void>
) // Mono<Void>
) // Flux<Void>
.then(); // Mono<Void>
}
/*
* @return the Foo with the given id, or empty if not found
*/
abstract Mono<Foo> getFoo(String id);
/*
* @return the Bar with the given id, or empty if not found
*/
abstract Mono<Bar> getBar(String id);
/*
* @return the Foo inserted, never empty
*/
abstract Mono<Foo> insertFoo(String id, Foo foo);
/*
* @return the Bar updated, never empty
*/
abstract Mono<Bar> updateBar(Foo foo, Bar bar);下面是一个更复杂的示例,它使用Tuple2<Foo,Boolean>来指示是否找到了原始Foo (这在语义上应该等同于您的示例):
Mono<Void> updateFoos(Flux<String> idFlux) {
return idFlux // Flux<String>
.flatMap(id -> getFoo(id) // Mono<Foo>
/*
* Map to a Tuple2 whose t2 indicates whether the foo was found.
* In this case, it was found.
*/
.map(foo -> Tuples.of(foo, true)) // Mono<Tuple2<Foo,Boolean>>
/*
* If a Foo with the given id is not found,
* create a new one, and continue the stream with
* a Tuple2 indicating it wasn't originally found
*/
.switchIfEmpty(insertFoo(id, new Foo()) // Mono<Foo>
/*
* Foo was not originally found, so t2=false
*/
.map(foo -> Tuples.of(foo, false))) // Mono<Tuple2<Foo,Boolean>>
/*
* The lambda passed to .flatMap will be
* executed with either:
* A) t1=foo found from getFoo, t2=true
* OR
* B) t1=newly inserted Foo from insertFoo, t2=false
*/
.flatMap(tuple2 -> tuple2.getT2()
// foo originally found
? getBar(tuple2.getT1().bar) // Mono<Bar>
.flatMap(bar -> updateBar(tuple2.getT1(), bar)) // Mono<Bar>
.then() // Mono<Void>
// foo originally not found (new inserted)
: Mono.empty() // Mono<Void>
)
) // Flux<Void>
.then(); // Mono<Void>
}https://stackoverflow.com/questions/55861289
复制相似问题