首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何合并多个vertx web客户端响应

如何合并多个vertx web客户端响应
EN

Stack Overflow用户
提问于 2022-05-20 17:37:57
回答 1查看 307关注 0票数 0

我对vertx和异步编程很陌生。

我有两个眩晕通过事件总线通信,如下所示:

//API垂直线

代码语言:javascript
运行
复制
 public class SearchAPIVerticle extends AbstractVerticle {
    
    
        public static final String GET_USEARCH_DOCS = "get.usearch.docs";
    
        @Autowired
        private Integer defaultPort;
    
        private void sendSearchRequest(RoutingContext routingContext) {
    
            final JsonObject requestMessage = routingContext.getBodyAsJson();
    
            final EventBus eventBus = vertx.eventBus();
            eventBus.request(GET_USEARCH_DOCS, requestMessage, reply -> {
                if (reply.succeeded()) {
                    Logger.info("Search Result = " + reply.result().body());
                    routingContext.response()
                            .putHeader("content-type", "application/json")
                            .setStatusCode(200)
                            .end((String) reply.result().body());
                } else {
                    Logger.info("Document Search Request cannot be processed");
                    routingContext.response()
                            .setStatusCode(500)
                            .end();
                }
            });
        }
    
         
        @Override
        public void start() throws Exception {
            Logger.info("Starting the Gateway service (Event Sender) verticle");
            // Create a Router
            Router router = Router.router(vertx);
    
            //Added bodyhandler so we can process json messages via the event bus
            router.route().handler(BodyHandler.create());
    
            //    Mount the handler for incoming requests
            // Find documents
            router.post("/api/search/docs/*").handler(this::sendSearchRequest);
            
            // Create an HTTP Server using default options
            HttpServer server = vertx.createHttpServer();
            // Handle every request using the router
            server.requestHandler(router)
                    //start listening on port 8083
                    .listen(config().getInteger("http.port", 8083)).onSuccess(msg -> {
                        Logger.info("*************** Search Gateway Server started on "
                                + server.actualPort() + " *************");
                    });
        }
    
        @Override
        public void stop(){
           //house keeping
        }
    
    }

//下面是目标顶点,应该进行多个web客户端调用并合并响应。

代码语言:javascript
运行
复制
  @Component
        public class SolrCloudVerticle extends AbstractVerticle {
        
            public static final String GET_USEARCH_DOCS = "get.usearch.docs";
        
            @Autowired
            private SearchRepository searchRepositoryService;
        
            @Override
            public void start() throws Exception {
                Logger.info("Starting the Solr Cloud Search Service (Event Consumer) verticle");
        
                super.start();
        
                ConfigStoreOptions fileStore = new ConfigStoreOptions().setType("file")
                        .setConfig(new JsonObject().put("path", "conf/config.json"));
                ConfigRetrieverOptions configRetrieverOptions = new ConfigRetrieverOptions()
                        .addStore(fileStore);
                ConfigRetriever configRetriever = ConfigRetriever.create(vertx, configRetrieverOptions);
                configRetriever.getConfig(ar -> {
                    if (ar.succeeded()) {
                        JsonObject configJson = ar.result();
                        EventBus eventBus = vertx.eventBus();
        
                        eventBus.<JsonObject>consumer(GET_USEARCH_DOCS).handler(getDocumentService(searchRepositoryService, configJson));
               
                        Logger.info("Completed search service event processing");
        
                    } else {
                        Logger.error("Failed to retrieve the config");
                    }
                });
        
            }
        
      





      private Handler<Message<JsonObject>> getDocumentService(SearchRepository searchRepositoryService, JsonObject configJson) {
                           
                            return requestMessage -> vertx.<String>executeBlocking(future -> {
                               
                    
                                try {
            
            //I need to incorporate the logic here that adds futures to list and composes the compositefuture

/*
//Below is my logic to populate the future list
WebClient client = WebClient.create(vertx);
                List<Future> futureList = new ArrayList<>();
                for (Object collection : searchRepositoryService.findAllCollections(configJson).getJsonArray(SOLR_CLOUD_COLLECTION).getList()) {
                    Future<String> future1 = client.post(8983, "127.0.0.1", "/solr/" + collection + "/query")
                            .expect(ResponsePredicate.SC_OK)
                            .sendJsonObject(requestMessage.body())
                            .map(HttpResponse::bodyAsString).recover(error -> {
                                System.out.println(error.getMessage());
                                return Future.succeededFuture();
                            });
                    futureList.add(future1);
                }
//Below is the CompositeFuture logic, but the logic and construct does not make sense to me. What goes as first and second argument of executeBlocking method

 /*CompositeFuture.join(futureList)
                        .onSuccess(result -> {
                             result.list().forEach( x -> {
                                 if(x != null){
                                     requestMessage.reply(result.result());
                                 }
                             }
                             );
                        })
                        .onFailure(error -> {
                            System.out.println("We should not fail");
                        })

*/
        
                     future.complete("DAO returns a Json String");
                                 
             
                                } catch (Exception e) {
                                    future.fail(e);
                                }
                            }, result -> {
                                if (result.succeeded()) {
                                    requestMessage.reply(result.result());
                                } else {
                                    requestMessage.reply(result.cause()
                                            .toString());
                                }
                            });
                        }
                    
                   }

我能够使用org.springframework.web.reactive.function.client.WebClient调用从多个web客户端调用组合我的搜索结果,而不是使用Future和CompositeFuture。我试图避免将Springboot和Vertx混为一谈,但遗憾的是,Vertx CompositeFuture在这里没有工作:

代码语言:javascript
运行
复制
//This method supplies the parameter for the future.complete(..) line in getDocumentService(SearchRepository,JsonObject) 
 private List<JsonObject> findByQueryParamsAndDataSources(SearchRepository searchRepositoryService,
                                                   JsonObject configJson,
                                                   JsonObject requestMessage)
            throws SolrServerException, IOException {
        List<JsonObject> searchResultList = new ArrayList<>();
        for (Object collection : searchRepositoryService.findAllCollections(configJson).getJsonArray(SOLR_CLOUD_COLLECTION).getList()) {
           
            searchResultList.add(new JsonObject(doSearchPerCollection(collection.toString(), requestMessage.toString())));
        }
        return aggregateMultiCollectionSearchResults(searchResultList);
    }

public String doSearchPerCollection(String collection, String message) {

        org.springframework.web.reactive.function.client.WebClient client =
                org.springframework.web.reactive.function.client.WebClient.create();

        return client.post()
                .uri("http://127.0.0.1:8983/solr/" + collection + "/query")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromValue(message.toString()))
                .retrieve()
                .bodyToMono(String.class)
                .block();
    }

    private List<JsonObject> aggregateMultiCollectionSearchResults(List<JsonObject> searchList){
        //TODO: Search result aggregation
        return searchList;
    }

我的用例是第二个顶点,应该进行多个vertx客户端调用,并将响应组合起来。如果API调用失败,我希望记录错误,并继续处理和合并来自其他调用的响应。请提供任何关于我上面的代码如何能够适应处理用例的帮助吗?

我看的是vertx CompositeFuture,但是还没有看到任何进展或有用的例子!

EN

回答 1

Stack Overflow用户

发布于 2022-05-20 18:35:50

您所要寻找的可以通过一些附加的处理来完成未来协调

代码语言:javascript
运行
复制
CompositeFuture.join(future1, future2, future3).onComplete(ar -> {
    if (ar.succeeded()) {
        // All succeeded
    } else {
        // All completed and at least one failed
    }
});

连接组合等待到所有的未来完成,要么成功,要么失败。CompositeFuture.join使用多个期货参数(最多6个)并返回一个在所有期货都成功时成功的未来,以及当所有期货完成且其中至少一个失败时失败的未来。

使用join,您将等待所有期货完成,问题是,如果其中一个失败了,您将无法从其他人获得响应,因为CompositeFuture将失败。为了避免这种情况,您应该在每个期货中添加Future<T> recover(Function<Throwable, Future<T>> mapper),其中您应该记录错误并传递一个空的响应,以便未来不会失败。

下面是一个简短的例子:

代码语言:javascript
运行
复制
Future<String> response1 = client.post(8887, "localhost", "work").expect(ResponsePredicate.SC_OK).send()
    .map(HttpResponse::bodyAsString).recover(error -> {
        System.out.println(error.getMessage());
        return Future.succeededFuture();
    });
Future<String> response2 = client.post(8887, "localhost", "error").expect(ResponsePredicate.SC_OK).send()
    map(HttpResponse::bodyAsString).recover(error -> {
        System.out.println(error.getMessage());
        return Future.succeededFuture();
    });

CompositeFuture.join(response2, response1)
    .onSuccess(result -> {
        result.list().forEach(x -> {
            if(x != null) {
                System.out.println(x);
            }
        });
    })
    .onFailure(error -> {
        System.out.println("We should not fail");
    });

编辑1:

CompositeFuture.join(Future...)的限制是6期货,如果你需要更多,你可以使用:CompositeFuture.join(Arrays.asList(future1, future2, future3));,你可以通过无限数量的期货。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72322827

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档