首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >反应堆并行服务呼叫,先使用优先顺序

反应堆并行服务呼叫,先使用优先顺序
EN

Stack Overflow用户
提问于 2022-04-06 22:02:18
回答 1查看 187关注 0票数 -1

我试图使用Reactor并行地进行多个服务调用,并根据结果的优先级进行第一个可用的结果。

也就是说,我想返回第一个可用的结果,而不是等待其他结果。另外,我希望对服务调用具有优先顺序。

那么,让我们说:

  1. I有三个服务要调用: service1、service2和service3。如果在1000 ms之后没有服务调用返回,我希望返回一个可选的空。
  2. 应该返回在总超时(1000 ms)内完成的第一个服务调用,这将产生一个非空结果。

因此,具体来说,如果service1在1000 ms超时下产生结果,并返回到service2、service3、.如果其他人不产生结果或超时,则为serviceN。

我的工作主要是与它返回第一个可用的警告。我不知道如何执行,我希望service1第一,service2第二,等等。

下面是我的代码,我们有3个处理程序:

代码语言:javascript
运行
复制
public interface Handler {
    Optional<QueryResult> select(QueryContext context);

    boolean isEligible(QueryContext context);
}

public class Service1 implements Handler {
    public Optional<QueryResult> select(QueryContext context) {
        // artificially sleep for 150ms to simulate a delay
        Thread.sleep(150ms);
        return Optional.of(new QueryResult("1"));
    }
}

public class Service2 implements Handler {
    public Optional<QueryResult> select(QueryContext context) {
        return Optional.of(new QueryResult("2"));
    }
}

public class Service3 implements Handler {
    public Optional<QueryResult> select(QueryContext context) {
        return Optional.of(new QueryResult("3"));
    }
}

public class QueryEngine implements Query {

    private final List<QueryHandler> handlers;
    private final Scheduler scheduler;

    @Override
    public Optional<QueryResult> select(final QueryContext context) {

        try {
            return Flux.fromIterable(handlers)                
                .filter(handler -> handler.isEligible(context))
                .flatMap(eligibleHandler -> Mono.just(eligibleHandler)
                        .publishOn(scheduler)
                        .timeout(Duration.ofMillis(1000))
                        .map(handler -> handler.select(context))
                )
                .defaultIfEmpty(Optional.empty())
                .filter(Optional::isPresent)
                // return empty if no results are produced
                .defaultIfEmpty(Optional.empty())
                .blockFirst(Duration.ofMillis(1000));

        } catch (final IllegalStateException e) {
            log.error("Flux encountered a timeout and was unable to produce a result.");
        }

        return Optional.empty();
    }
}

下面是我的测试运行程序,以验证它是否正常工作:

代码语言:javascript
运行
复制
@RunWith(MockitoJUnitRunner.class)
public class QueryEngineTest {

    @Mock
    private QueryContext context;

    @Test
    public void testFirstEligibleHandler_returns() {

        final List<QueryHandler> handlers = ImmutableList.of(
            new Service1(), 
            new Service2(), 
            new Service3());

        final QueryEngine queryEngine = new QueryEngine(handlers, Schedulers.boundedElastic());

        
        assertThat(queryEngine.select(context))
            .isEqualTo(Optional.of(new QueryResult("1")));        
    }
}

但是,上面的代码失败了,因为来自Service2Service3的结果在Service1的结果之前返回。没有调用超时,但是由于我使用的是blockFirst,所以无法决定返回哪个。

有人能提供一个建议,如何重构这一点,以支持基于优先级的结果使用反应堆代码?

EN

回答 1

Stack Overflow用户

发布于 2022-04-07 01:57:00

您可以使用订阅所有提供的发布服务器并发出第一个可用值的firstWithValue。如果没有发出值,则需要处理NoSuchElementException。注意,所有其他发布者都将被取消,结果将被忽略。

代码语言:javascript
运行
复制
@Test
void firstWithValue() {
    var service1 = Mono.just(1)
            .delaySubscription(Duration.ofMillis(2000))
            .timeout(Duration.ofMillis(1000), Mono.empty());
    var service2 = Mono.just(2)
            .delaySubscription(Duration.ofMillis(100))
            .timeout(Duration.ofMillis(1000), Mono.empty());
    var service3 = Mono.just(3)
            .delaySubscription(Duration.ofMillis(200))
            .timeout(Duration.ofMillis(1000), Mono.empty());

    var res = Flux.firstWithValue(service1, service2, service3)
            .onErrorResume(NoSuchElementException.class, e -> Mono.empty());

    StepVerifier.create(res)
            .expectNext(2)
            .verifyComplete();
}

如果您希望等待所有发布者进行定义的超时,然后根据可用的结果作出决定,则可以将mergetimeout结合使用。

代码语言:javascript
运行
复制
@Test
void mergeWithTimeout() {
    var service1 = Mono.just(1)
            .delaySubscription(Duration.ofMillis(2000))
            .timeout(Duration.ofMillis(1000), Mono.empty());
    var service2 = Mono.just(2)
            .delaySubscription(Duration.ofMillis(100))
            .timeout(Duration.ofMillis(1000), Mono.empty());
    var service3 = Mono.just(3)
            .delaySubscription(Duration.ofMillis(200))
            .timeout(Duration.ofMillis(1000), Mono.empty());

    var res = Flux.merge(service1, service2, service3)
            .collectList()
            .map(list -> {
                // decide which one to take here
                return list.get(0);
            });

    StepVerifier.create(res)
            .expectNext(2)
            .verifyComplete();
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71774099

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档