首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何用反应式编程实现嵌套异步代码?

如何用反应式编程实现嵌套异步代码?
EN

Stack Overflow用户
提问于 2019-04-26 05:28:03
回答 1查看 1.1K关注 0票数 1

我对反应性编程非常陌生。虽然我非常熟悉函数式编程和kotlin协同器,但我仍然无法理解如何使用反应性编程范例来重构普通嵌套CRUD代码,特别是那些具有嵌套异步操作的代码。

例如,下面是一个基于Java 8 CompletableFuture的简单异步CRUD代码片段

代码语言:javascript
运行
复制
        getFooAsync(id)
                .thenAccept(foo -> {
                    if (foo == null) {
                        insertFooAsync(id, new Foo());
                    } else {
                        getBarAsync(foo.bar)
                                .thenAccept(bar -> {
                                   updateBarAsync(foo, bar);
                                });
                    }
                });

用kotlin协同器重构它非常容易,这使它在不失去异步性的情况下更加可读性。

代码语言:javascript
运行
复制
 val foo = suspendGetFoo(id)
 if(foo==null) {
   suspendInsertFoo(id, Foo())
 } else {
   val bar = suspendGetBar(foo.bar)
   suspendUpdateBar(foo, bar);-
}

然而,这样的代码是否适合于反应式编程?

如果是这样的话,给一个Flux<String> idFlux,如何用反应堆3重构它?

将每个CompletableFuture替换为Mono是个好主意吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-26 08:07:19

这样的代码适合于反应式编程吗?

IMHO、Kotlin协同器更适合这个用例,并产生了更清晰的代码。

但是,您可以在反应性流中这样做。

用Mono替换每个CompletableFuture是个好主意吗?

我发现反应性流非常好地处理了很多异步用例(例如工程反应堆实例)。但是,肯定有一些用例不太适合。因此,我不能推荐用反应性流替换每个CompletableFuture的策略。

然而,当你需要背压的时候,你必须放弃CompletableFuture

关于使用什么异步模式的决定在很大程度上取决于您正在使用的语言/框架/工具/库,以及您和您的队友对它们的满意程度。如果您使用的库具有良好的Kotlin支持,并且您的团队熟悉Kotlin,那么使用coroutines。反应流也是如此。

给一个Flux<String> idFlux,如何用反应堆3重构它?

在考虑此用例的反应性流时,请记住以下几点:

  1. 反应性流不能发出null。相反,通常使用空的Mono。(从技术上讲,你也可以使用Mono<Optional<...>>,但在那个时候,你只是在伤害你的大脑,乞求虫子)
  2. Mono为空时,不会调用传递给处理onNext信号(例如.map.flatMap.handle等)的任何操作符的lambda。记住,您正在处理的是数据流(而不是命令式控制流)
  3. .switchIfEmpty.defaultIfEmpty运算符可以在空的Mono上操作。但是,它们不提供else条件。下游运算符不知道流以前是空的(除非从发送给.switchIfEmpty的发布服务器发出的元素很容易识别)
  4. 如果您有一个由多个操作符组成的流,并且多个操作符可能导致流变为空,那么下游操作符很难/不可能确定流为什么变为空的。
  5. 允许处理从上游运算符发出的值的主要异步运算符是.flatMap.flatMapSequential.concatMap。您将需要使用这些链接对以前异步操作的输出进行操作的异步操作。
  6. 由于用例不返回值,因此反应性流实现将返回一个Mono<Void>

尽管如此,这里还是尝试将您的示例转换为反应堆3(但有一些注意事项):

代码语言:javascript
运行
复制
    Mono<Void> updateFoos(Flux<String> idFlux) {
        return idFlux                                         // Flux<String>
            .flatMap(id -> getFoo(id)                         // Mono<Foo>
                /*
                 * If a Foo with the given id is not found,
                 * create a new one, and continue the stream with it.
                 */
                .switchIfEmpty(insertFoo(id, new Foo()))      // Mono<Foo>
                /*
                 * Note that this is not an "else" condition
                 * to the above .switchIfEmpty
                 *
                 * The lambda passed to .flatMap will be
                 * executed with either:
                 * A) The foo found from getFoo
                 *    OR
                 * B) the newly inserted Foo from insertFoo
                 */
                .flatMap(foo -> getBar(foo.bar)               // Mono<Bar>
                    .flatMap(bar -> updateBar(foo, bar))      // Mono<Bar>
                    .then()                                   // Mono<Void>
                )                                             // Mono<Void>
            )                                                 // Flux<Void>
            .then();                                          // Mono<Void>
    }

    /*
     * @return the Foo with the given id, or empty if not found
     */
    abstract Mono<Foo> getFoo(String id);

    /*
     * @return the Bar with the given id, or empty if not found
     */
    abstract Mono<Bar> getBar(String id);

    /*
     * @return the Foo inserted, never empty
     */
    abstract Mono<Foo> insertFoo(String id, Foo foo);

    /*
     * @return the Bar updated, never empty
     */
    abstract Mono<Bar> updateBar(Foo foo, Bar bar);

下面是一个更复杂的示例,它使用Tuple2<Foo,Boolean>来指示是否找到了原始Foo (这在语义上应该等同于您的示例):

代码语言:javascript
运行
复制
    Mono<Void> updateFoos(Flux<String> idFlux) {
        return idFlux                                         // Flux<String>
            .flatMap(id -> getFoo(id)                         // Mono<Foo>
                /*
                 * Map to a Tuple2 whose t2 indicates whether the foo was found.
                 * In this case, it was found.
                 */
                .map(foo -> Tuples.of(foo, true))             // Mono<Tuple2<Foo,Boolean>>
                /*
                 * If a Foo with the given id is not found,
                 * create a new one, and continue the stream with 
                 * a Tuple2 indicating it wasn't originally found
                 */
                .switchIfEmpty(insertFoo(id, new Foo())       // Mono<Foo>
                    /*
                     * Foo was not originally found, so t2=false
                     */
                    .map(foo -> Tuples.of(foo, false)))       // Mono<Tuple2<Foo,Boolean>>
                /*
                 * The lambda passed to .flatMap will be
                 * executed with either:
                 * A) t1=foo found from getFoo, t2=true
                 *    OR
                 * B) t1=newly inserted Foo from insertFoo, t2=false
                 */
                .flatMap(tuple2 -> tuple2.getT2()
                    // foo originally found 
                    ? getBar(tuple2.getT1().bar)              // Mono<Bar>
                        .flatMap(bar -> updateBar(tuple2.getT1(), bar)) // Mono<Bar>
                        .then()                               // Mono<Void>
                    // foo originally not found (new inserted)
                    : Mono.empty()                            // Mono<Void>
                )
            )                                                 // Flux<Void>
            .then();                                          // Mono<Void>
    }
票数 10
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55861289

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档