首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spring webflux:扁平图异步转换

Spring webflux:扁平图异步转换
EN

Stack Overflow用户
提问于 2019-07-29 17:45:38
回答 3查看 1.5K关注 0票数 2

我已经了解到平面映射转换是异步的,在本例中,我在lambda定义中打印线程的名称。它打印的线程与订阅源代码的线程相同。根据我的理解,它应该打印一个不同的线程名称-而不是在订阅的源代码中,因为这个转换必须在不同的线程中执行。

代码语言:javascript
运行
复制
Flux.just(1, -2, 3, 4, -5, 6)
    .flatMap(element -> { 
        try { 
            Thread.sleep(1000);
        } 
        catch (InterruptedException e) {
            e.printStackTrace(); 
        } 

        System.out.println(Thread.currentThread().getName() + " element: " + element); 
        return Flux.just(element);
    })
    .subscribe()
EN

回答 3

Stack Overflow用户

发布于 2019-07-29 21:04:32

它是异步的这一事实并不一定意味着它是并行运行的,这似乎是您在这里所期望的。但是,您可以将Flux转换为ParallelFlux并指定并行调度程序:

代码语言:javascript
运行
复制
Flux.just(1, -2, 3, 4, -5, 6)
        .parallel()
        .runOn(Schedulers.elastic())
        .flatMap(element
                -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) { // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName()
                    + " element: " + element);

            return Flux.just(element);
        })
        .subscribe();
Thread.currentThread().join(); //Just a hack to keep the program alive.

另一方面,如果您不希望它并行运行,而只是在主线程的单独线程上运行,那么不需要将其转换为并行Flux -只需提供一个.subscribeOn(Schedulers.elastic())调用或类似的调用。

票数 2
EN

Stack Overflow用户

发布于 2019-08-01 01:39:04

不,它不会打印不同的线程名称。默认情况下,Reactive在单个线程上运行-订阅发生在该线程上。在本例中,订阅发生在调用线程上,因此,所有元素都在该线程上发出,并在该线程上进一步处理。

让我们先来了解一下。flatMap是如何工作的:

对于每个上游事件,您可以从it.

  • flatMap创建一个流,然后热切地订阅每个内部流(这就是它们甚至被触发的原因)。(
  1. )请注意,

在这里意味着它一次订阅所有内部流,而不等待其他内部流完成(与concatMap).

  1. Does不同,它是所有内部流的动态合并,因为它们会发出事件。

请注意,每个内部流毕竟是一个流。除非您告诉它使用不同的线程,否则每个线程都将在同一个线程上发出(调用线程,在您的情况下发生了什么)。如果您想并行处理它们,您可以为它们中的每一个执行一个.subscribeOn(Schedulers.elastic) (或schedulers.parallel()):

代码语言:javascript
运行
复制
Flux.just(1, -2, 3, 4, -5, 6)
    .flatMap(
        element -> {
          //this will always print the same thread - the calling thread basically
          System.out.println(Thread.currentThread().getName() + " element: " + element);
          return Mono.just(element)
              .subscribeOn(Schedulers.parallel())
              .doOnNext(
                  a ->
                      System.out.println(
                          a + " emitted on thread: " + Thread.currentThread().getName()));
        })
    .subscribe();

.flatMap并不关心元素进入哪个线程,以及它离开哪个线程--它所要做的就是订阅内部流,并在事件到来时合并它们。记住,每一个内在的流都是一个“承诺”。它最终将完成(使用onComplete)信号,但您不知道何时完成。flatMap仍然不在乎。然而,concatMap在订阅下一个流之前,会等待一个流完成。这就是它如何维护原始流的顺序。

在这里阅读更多(我自己在flatMap上的文章):

https://medium.com/swlh/understanding-reactors-flatmap-operator-a6a7e62d3e95

票数 1
EN

Stack Overflow用户

发布于 2019-07-29 20:59:55

使用flatMap不会影响执行它的线程。可以使用subscribeOn来影响将在其上执行的线程:

代码语言:javascript
运行
复制
        Flux.just(1, -2, 3, 4, -5, 6)
                .flatMap(element ->
                {
                    try { Thread.sleep(1000);
                    } catch (InterruptedException e) { // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                    System.out.println(Thread.currentThread().getName() +
                            " element: " + element);
                    return Flux.just(element);
                })
                .subscribeOn(Schedulers.elastic())
                .subscribe();

根据您希望行为是什么,您可以使用以下任何一种- Schedulers.elastic()Schedulers.single()Schedulers.parallel()Schedulers.immeadiate()

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57251171

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档