我对vertx和异步编程很陌生。
我有两个眩晕通过事件总线通信,如下所示:
//API垂直线
public class SearchAPIVerticle extends AbstractVerticle {
public static final String GET_USEARCH_DOCS = "get.usearch.docs";
@Autowired
private Integer defaultPort;
private void sendSearchRequest(RoutingContext routingContext) {
final JsonObject requestMessage = routingContext.getBodyAsJson();
final EventBus eventBus = vertx.eventBus();
eventBus.request(GET_USEARCH_DOCS, requestMessage, reply -> {
if (reply.succeeded()) {
Logger.info("Search Result = " + reply.result().body());
routingContext.response()
.putHeader("content-type", "application/json")
.setStatusCode(200)
.end((String) reply.result().body());
} else {
Logger.info("Document Search Request cannot be processed");
routingContext.response()
.setStatusCode(500)
.end();
}
});
}
@Override
public void start() throws Exception {
Logger.info("Starting the Gateway service (Event Sender) verticle");
// Create a Router
Router router = Router.router(vertx);
//Added bodyhandler so we can process json messages via the event bus
router.route().handler(BodyHandler.create());
// Mount the handler for incoming requests
// Find documents
router.post("/api/search/docs/*").handler(this::sendSearchRequest);
// Create an HTTP Server using default options
HttpServer server = vertx.createHttpServer();
// Handle every request using the router
server.requestHandler(router)
//start listening on port 8083
.listen(config().getInteger("http.port", 8083)).onSuccess(msg -> {
Logger.info("*************** Search Gateway Server started on "
+ server.actualPort() + " *************");
});
}
@Override
public void stop(){
//house keeping
}
}
//下面是目标顶点,应该进行多个web客户端调用并合并响应。
@Component
public class SolrCloudVerticle extends AbstractVerticle {
public static final String GET_USEARCH_DOCS = "get.usearch.docs";
@Autowired
private SearchRepository searchRepositoryService;
@Override
public void start() throws Exception {
Logger.info("Starting the Solr Cloud Search Service (Event Consumer) verticle");
super.start();
ConfigStoreOptions fileStore = new ConfigStoreOptions().setType("file")
.setConfig(new JsonObject().put("path", "conf/config.json"));
ConfigRetrieverOptions configRetrieverOptions = new ConfigRetrieverOptions()
.addStore(fileStore);
ConfigRetriever configRetriever = ConfigRetriever.create(vertx, configRetrieverOptions);
configRetriever.getConfig(ar -> {
if (ar.succeeded()) {
JsonObject configJson = ar.result();
EventBus eventBus = vertx.eventBus();
eventBus.<JsonObject>consumer(GET_USEARCH_DOCS).handler(getDocumentService(searchRepositoryService, configJson));
Logger.info("Completed search service event processing");
} else {
Logger.error("Failed to retrieve the config");
}
});
}
private Handler<Message<JsonObject>> getDocumentService(SearchRepository searchRepositoryService, JsonObject configJson) {
return requestMessage -> vertx.<String>executeBlocking(future -> {
try {
//I need to incorporate the logic here that adds futures to list and composes the compositefuture
/*
//Below is my logic to populate the future list
WebClient client = WebClient.create(vertx);
List<Future> futureList = new ArrayList<>();
for (Object collection : searchRepositoryService.findAllCollections(configJson).getJsonArray(SOLR_CLOUD_COLLECTION).getList()) {
Future<String> future1 = client.post(8983, "127.0.0.1", "/solr/" + collection + "/query")
.expect(ResponsePredicate.SC_OK)
.sendJsonObject(requestMessage.body())
.map(HttpResponse::bodyAsString).recover(error -> {
System.out.println(error.getMessage());
return Future.succeededFuture();
});
futureList.add(future1);
}
//Below is the CompositeFuture logic, but the logic and construct does not make sense to me. What goes as first and second argument of executeBlocking method
/*CompositeFuture.join(futureList)
.onSuccess(result -> {
result.list().forEach( x -> {
if(x != null){
requestMessage.reply(result.result());
}
}
);
})
.onFailure(error -> {
System.out.println("We should not fail");
})
*/
future.complete("DAO returns a Json String");
} catch (Exception e) {
future.fail(e);
}
}, result -> {
if (result.succeeded()) {
requestMessage.reply(result.result());
} else {
requestMessage.reply(result.cause()
.toString());
}
});
}
}
我能够使用org.springframework.web.reactive.function.client.WebClient调用从多个web客户端调用组合我的搜索结果,而不是使用Future和CompositeFuture。我试图避免将Springboot和Vertx混为一谈,但遗憾的是,Vertx CompositeFuture在这里没有工作:
//This method supplies the parameter for the future.complete(..) line in getDocumentService(SearchRepository,JsonObject)
private List<JsonObject> findByQueryParamsAndDataSources(SearchRepository searchRepositoryService,
JsonObject configJson,
JsonObject requestMessage)
throws SolrServerException, IOException {
List<JsonObject> searchResultList = new ArrayList<>();
for (Object collection : searchRepositoryService.findAllCollections(configJson).getJsonArray(SOLR_CLOUD_COLLECTION).getList()) {
searchResultList.add(new JsonObject(doSearchPerCollection(collection.toString(), requestMessage.toString())));
}
return aggregateMultiCollectionSearchResults(searchResultList);
}
public String doSearchPerCollection(String collection, String message) {
org.springframework.web.reactive.function.client.WebClient client =
org.springframework.web.reactive.function.client.WebClient.create();
return client.post()
.uri("http://127.0.0.1:8983/solr/" + collection + "/query")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(message.toString()))
.retrieve()
.bodyToMono(String.class)
.block();
}
private List<JsonObject> aggregateMultiCollectionSearchResults(List<JsonObject> searchList){
//TODO: Search result aggregation
return searchList;
}
我的用例是第二个顶点,应该进行多个vertx客户端调用,并将响应组合起来。如果API调用失败,我希望记录错误,并继续处理和合并来自其他调用的响应。请提供任何关于我上面的代码如何能够适应处理用例的帮助吗?
我看的是vertx CompositeFuture,但是还没有看到任何进展或有用的例子!
发布于 2022-05-20 18:35:50
您所要寻找的可以通过一些附加的处理来完成未来协调:
CompositeFuture.join(future1, future2, future3).onComplete(ar -> {
if (ar.succeeded()) {
// All succeeded
} else {
// All completed and at least one failed
}
});
连接组合等待到所有的未来完成,要么成功,要么失败。CompositeFuture.join使用多个期货参数(最多6个)并返回一个在所有期货都成功时成功的未来,以及当所有期货完成且其中至少一个失败时失败的未来。
使用join,您将等待所有期货完成,问题是,如果其中一个失败了,您将无法从其他人获得响应,因为CompositeFuture将失败。为了避免这种情况,您应该在每个期货中添加Future<T> recover(Function<Throwable, Future<T>> mapper)
,其中您应该记录错误并传递一个空的响应,以便未来不会失败。
下面是一个简短的例子:
Future<String> response1 = client.post(8887, "localhost", "work").expect(ResponsePredicate.SC_OK).send()
.map(HttpResponse::bodyAsString).recover(error -> {
System.out.println(error.getMessage());
return Future.succeededFuture();
});
Future<String> response2 = client.post(8887, "localhost", "error").expect(ResponsePredicate.SC_OK).send()
map(HttpResponse::bodyAsString).recover(error -> {
System.out.println(error.getMessage());
return Future.succeededFuture();
});
CompositeFuture.join(response2, response1)
.onSuccess(result -> {
result.list().forEach(x -> {
if(x != null) {
System.out.println(x);
}
});
})
.onFailure(error -> {
System.out.println("We should not fail");
});
编辑1:
CompositeFuture.join(Future...)
的限制是6期货,如果你需要更多,你可以使用:CompositeFuture.join(Arrays.asList(future1, future2, future3));
,你可以通过无限数量的期货。
https://stackoverflow.com/questions/72322827
复制相似问题