我是反应式编程的新手,我正在开发一个API,它聚合了其他3个API的响应。提供的接口如下:
public interface UserServiceApi {
Mono<UserDetailsResponse> getUserDetails(String userId);
}
. . .
public interface OrderServiceApi {
Mono<OrdersResponse> getOrdersByUserId(String userId);
}
. . .
public interface VoucherServiceApi {
Mono<VouchersResponse> getVouchers(String userId);
}
我正在工作的API应该获取用户详细信息,如果注册完成,则通过用户id获取订单,如果用户状态已验证,则获取用户的代金券。聚合所有这些数据,在需要的地方进行转换,然后返回一个Mono。
我想要调用用户详细信息api,一旦响应,如果在这里使异步调用订单服务和代金券服务。下面是代码现在的样子:
public Mono<SuperApiResponse> getDetails(final String userId) {
userServiceApi.getUserDetails(userId).flatMap(userDetailsResponse -> {
final SuperApiResponse response = new SuperApiResponse();
response.setUserDetails(userDetailsResponse);
if(userDetailsResponse.isSignupComplete) {
Mono<OrderResponse> orderResponse = ordersServiceApi.getOrdersByUserId(userId);
// have to transform this OrderResponse object to another type
}
if(userDetailsResponse.isVerified) {
Mono< VouchersResponse> orderResponse = voucherServiceApi.getVouchers(userId);
// have to transform this VouchersResponse object to another type
}
// how to populate response of these 2 apis in the final response mono object?
return Mono.just(response);
})
}
如何聚合这3个apis的响应,同时最大限度地利用响应式编程?如何异步进行这些调用?
这种情况下的最佳实践是什么?对于对反应式编程完全陌生的人来说,有什么好的、容易理解的阅读材料吗?
发布于 2021-04-02 02:52:12
就像Serg comment中所说的那样,我认为应该使用flatMap/zipWith/defaultIfEmpty的组合。
Reactor提供了许多工具,因此我将提供一个示例,允许通过使用缓存的monos而不是zipWith并发地触发代金券和订单请求:
package fr.amanin.stackoverflow;
import java.util.logging.Logger;
import reactor.core.publisher.Mono;
public class ReactorZipServiceResults {
static final Logger LOGGER = Logger.getLogger("");
static final UserServiceApi userApi = new UserServiceApi();
static final OrderServiceApi orderApi = new OrderServiceApi();
static final VoucherServiceApi voucherApi = new VoucherServiceApi();
public static void test(String userId) throws InterruptedException {
final PseudoTimer timer = new PseudoTimer();
// First, sign up user. Cache will cause any caller to get back result of any previous call on this Mono
final Mono<UserDetailsResponse> userDetails = userApi.getUserDetails(userId)
.doOnNext(next -> LOGGER.info("Queried user details at "+timer.time()))
.cache();
// Separately define pipeline branch that queries orders.
final Mono<OrdersResponse> userOrders = userDetails
.filter(it -> it.isSignupComplete())
.flatMap(it -> orderApi.getOrdersByUserId(userId))
.doOnNext(next -> LOGGER.info("Queried orders at "+timer.time()))
.cache();
// Pipeline branch serving for vouchers
final Mono<VouchersResponse> userVouchers = userDetails
.filter(it -> it.isVerified())
.flatMap(it -> voucherApi.getVouchers(userId))
.doOnNext(next -> LOGGER.info("Queried vouchers at "+timer.time()))
.cache();
// Prefetch orders and vouchers concurrently, so any following call will just get back cached values.
// Note that it will also trigger user detail query (as they're derived from it)
userOrders.subscribe();
userVouchers.subscribe();
// exagerate time lapse between request triggering and result assembly.
Thread.sleep(20);
LOGGER.info("Pipeline assembly at "+timer.time());
// Assemble entire pipeline by concatenating all intermediate results sequentially
userDetails
.map(details -> new UserProfile(userId).details(details))
.flatMap(profile -> userOrders.map(profile::orders).defaultIfEmpty(profile))
.flatMap(profile -> userVouchers.map(profile::vouchers).defaultIfEmpty(profile))
.subscribe(result -> {
LOGGER.info("result: " + result);
});
// Wait all operations to finish
Thread.sleep(100);
}
public static void main(String... args) throws InterruptedException {
LOGGER.warning("No call to order or voucher API");
test("00");
LOGGER.warning("One call to order and none to voucher API");
test("10");
LOGGER.warning("No call to order but one to voucher API");
test("01");
LOGGER.warning("Call both order and voucher API");
test("11");
}
private static class PseudoTimer {
final long start = System.currentTimeMillis();
public long time() {
return System.currentTimeMillis() - start;
}
}
/*
* Mock APIS
*/
/**
* Simulate user details service: expects user ids with at least 2 characters.
* If and only if the first character is 1, the user will be considered signed up.
* If and only if the second character is 1, the user will be considered verified.
*/
private static class UserServiceApi {
Mono<UserDetailsResponse> getUserDetails(String userId) {
return Mono.just(
new UserDetailsResponse(userId.charAt(0) == '1', userId.charAt(1) == '1')
);
}
}
private static class OrderServiceApi {
Mono<OrdersResponse> getOrdersByUserId(String userId) { return Mono.just(new OrdersResponse()); }
}
private static class VoucherServiceApi {
Mono<VouchersResponse> getVouchers(String userId) { return Mono.just(new VouchersResponse()); }
}
private static class UserDetailsResponse {
private final boolean isSignupComplete;
private final boolean isVerified;
private UserDetailsResponse(boolean isSignupComplete, boolean isVerified) {
this.isSignupComplete = isSignupComplete;
this.isVerified = isVerified;
}
boolean isSignupComplete() { return isSignupComplete; }
boolean isVerified() { return isVerified; }
public String toString() {
return String.format("signed: %b ; verified: %b", isSignupComplete, isVerified);
}
}
private static class OrdersResponse {}
private static class VouchersResponse {}
private static class UserProfile {
final String userId;
final UserDetailsResponse details;
final OrdersResponse orders;
final VouchersResponse vouchers;
public UserProfile(String userId) {
this(userId, null, null, null);
}
public UserProfile(final String userId, UserDetailsResponse details, OrdersResponse orders, VouchersResponse vouchers) {
this.userId = userId;
this.details = details;
this.orders = orders;
this.vouchers = vouchers;
}
public UserProfile details(final UserDetailsResponse details) {
return new UserProfile(userId, details, orders, vouchers);
}
public UserProfile orders(final OrdersResponse orders) {
return new UserProfile(userId, details, orders, vouchers);
}
public UserProfile vouchers(final VouchersResponse vouchers) {
return new UserProfile(userId, details, orders, vouchers);
}
public String toString() {
return String.format(
"User %s ; %s ; Orders fetched: %b ; Vouchers fetched: %b",
userId, details, orders != null, vouchers != null
);
}
}
}
输出结果为:
WARNING: No call to order or voucher API
INFO: Queried user details at 175
INFO: Pipeline assembly at 195
INFO: result: User 00 ; signed: false ; verified: false ; Orders fetched: false ; Vouchers fetched: false
WARNING: One call to order and none to voucher API
INFO: Queried user details at 0
INFO: Queried orders at 1
INFO: Pipeline assembly at 21
INFO: result: User 10 ; signed: true ; verified: false ; Orders fetched: true ; Vouchers fetched: false
WARNING: No call to order but one to voucher API
INFO: Queried user details at 0
INFO: Queried vouchers at 2
INFO: Pipeline assembly at 24
INFO: result: User 01 ; signed: false ; verified: true ; Orders fetched: false ; Vouchers fetched: true
WARNING: Call both order and voucher API
INFO: Queried user details at 0
INFO: Queried orders at 2
INFO: Queried vouchers at 3
INFO: Pipeline assembly at 25
INFO: result: User 11 ; signed: true ; verified: true ; Orders fetched: true ; Vouchers fetched: true
https://stackoverflow.com/questions/66884863
复制相似问题