首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >通量在“那么”之前不会等待元素的完成。

通量在“那么”之前不会等待元素的完成。
EN

Stack Overflow用户
提问于 2020-06-26 09:11:24
回答 1查看 12.1K关注 0票数 1

我不明白这个问题,我不知道我做错了什么

我想等待通量结束,然后返回serverResponse的mono

我已经附加了代码片段,doOnNext将填充categoryIdToPrintRepository

我已经了解了如何在通量结束后返回mono,并在处理onNextSite之前找到了“那时”,但仍在执行“then”方法,这会导致错误:

代码语言:javascript
运行
复制
java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry

我做错了什么?

代码语言:javascript
运行
复制
 public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) {
        return Mono.just("start").flatMap(id ->
                Flux.fromIterable(appSettings.getSites())
                        .subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
                        .doOnNext(this::onNextSite)
                        .then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));

    }

    private void onNextSite(Integer siteId) {
        IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> {
            Optional<SiteCatalogCategoryDTO> cacheData =
                    siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
            cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> {/*do nothing already exist in cache*/},
                    () -> {
                    Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
                            .get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
                    catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
                            handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
            });
        });
    }


    private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
                                          int siteId, int catalogId) {
        if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
            Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
                    .subscribe(mapSCC -> {
                        categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
                                "Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
                                        mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
                        siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
                    });
    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-06-26 10:08:33

您做了几件不应该在应用程序中使用subscribe的错误操作,并且您有一些无效方法,除非在特定的地方,否则这些方法不应该用于反应性编程。

下面是一些示例代码:

代码语言:javascript
运行
复制
// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething() {
    Mono.just("Foo");
}

// complier error
doSomething().subscribe( ... );

您的应用程序是publisher --主叫客户端,是订阅服务器,这就是为什么我们向主叫客户端返回Mono或Flux的原因,它们是subscribe

你就这样解决了这个问题:

代码语言:javascript
运行
复制
private void doSomething() {
    Mono.just("Foo").subscribe( ... );
}

doSomething();

现在,您正在订阅自己,以使事物运行,这不是正确的方式,正如前面提到的,主叫客户是订户,而不是你。

正确方式:

代码语言:javascript
运行
复制
private Mono<String> doSomething() {
    return Mono.just("Foo");
}

// This is returned out to the calling client, they subscribe
return doSomething();

当Mono/Flux完成时,它将发出一个信号,这个信号将触发链中的下一个、下一个和下一个。

因此,我对你们需要做的事情的看法如下:

client.

  • Remove
  • Remove all subscribes,如果您想要做的事情有:flatmapmapdoOnSuccess等,保持链一直运行到 all voidE 217函数,确保它们返回Fluxd19,如果您想要E 120而不是E 221通过使用Mono.empty()函数返回一个d22,以使链完成。

一旦您使用Mono/Flux,您就需要处理返回,以便其他人可以继续使用。

更新:

为了使then触发,您必须返回一些内容,当前一个单/通量完成时,它将返回。

示例:

代码语言:javascript
运行
复制
private Flux<String> doSomething() {
    return Flux.just("Foo", "Bar", "FooBar")
               .doOnNext(string -> {
                   return // return something
               });
}

// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );
票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62591575

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档