首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >反应器自动调速

反应器自动调速
EN

Stack Overflow用户
提问于 2020-07-18 05:33:34
回答 3查看 852关注 0票数 2

TL;DR;

是否有一种方法可以根据下游健康情况自动调整项目反应堆各元件之间的延迟?

详细信息

我有一个应用程序从Kafka主题读取记录,为每个记录发送一个HTTP请求,并将结果写入另一个Kafka主题。从/写入Kafka是快速和容易的,但是第三方HTTP服务很容易不堪重负,所以我使用来自属性文件的值的delayElements(),这意味着这个值在应用程序运行时不会改变。下面是一个代码示例:

代码语言:javascript
运行
复制
kafkaReceiver.receiveAutoAck()
            .concatMap(identity())
            .delayElements(ofMillis(delayElement))
            .flatMap(message -> recordProcessingFunction.process(message.value()), messageRate)
            .onErrorContinue(handleError())
            .map(this::getSenderRecord)
            .flatMap(kafkaSender::send)

然而,第三方服务可能执行不同的加班,我希望能够相应地调整这个延迟。比方说,如果我看到超过5%的请求在10秒内失败,我会增加延迟。如果它在超过10秒的时间内低于5%,那么我将再次减少延迟。

反应堆中是否存在这方面的机制?我可以从我身边想出一些创造性的解决方案,但我想知道他们(或其他人)是否已经实现了这一点。

EN

回答 3

Stack Overflow用户

发布于 2020-07-23 07:58:38

我不认为任何HTTP客户端(包括netty )都提供了反向压力。一种选择是切换到RSocket,但是如果您正在调用第三方服务,我想这可能不是一个选项。您可以调优一天中大部分时间起作用的速率,并使用doOnError或类似的方法将错误的消息发送到另一个主题。另一个接收方可以以更高的延迟处理这些消息,如果再次出现错误,则使用重试计数将消息重新放到相同的主题上,这样您就可以最终停止处理它们。

票数 0
EN

Stack Overflow用户

发布于 2020-07-25 02:11:12

如果要查找延迟元素取决于元素的处理速度,则可以使用delayUntil

代码语言:javascript
运行
复制
  Flux.range(1, 100)
      .doOnNext(i -> System.out.println("Kafka Receive :: " + i))
      .delayUntil(i -> Mono.fromSupplier(() -> i)
                            .map(k -> {
                                // msg processing
                                return k * 2;
                            })
                            .delayElement(Duration.ofSeconds(1)) // msg processing simulation
                            .doOnNext(k -> System.out.println("Kafka send :: " + k)))
      .subscribe();
票数 0
EN

Stack Overflow用户

发布于 2020-08-27 20:56:16

您可以添加带指数退避的重试。就像这样:

代码语言:javascript
运行
复制
influx()
.flatMap(x -> Mono.just(x)
    .map(data -> apiCall(data))
    .retryWhen(
            Retry.backoff(Integet.MAX_VALUE, Duration.ofSeconds(30))
                .filter(err -> err instanceof RuntimeException)
                .doBeforeRetry(
                    s -> log.warn("Retrying for err {}", s.failure().getMessage()))
                .onRetryExhaustedThrow((spec, sig) -> new RuntimeException("ex")))
                .onErrorResume(err -> Mono.empty()),
        concurrency_val,
        prefetch_val)

这将重试失败的请求Integet.MAX_VALUE次数,每次重试之间的最短时间为30次。随后的重试实际上被可配置的抖动因子(默认值= 0.5)抵消,导致连续重试之间的持续时间增加。

关于Retry.backoff的文档说:

为具有抖动的指数后退策略预先配置的RetryBackoffSpec,给定最大重试次数和退避的最小持续时间。

另外,由于整个操作都是在flatMap中映射的,所以可以更改它的默认concurrencyprefetch值,以便在整个管道等待RetryBackOffSpec成功完成时,在任何给定时间内可能失败的请求的最大数量。

最坏的情况是,您的concurrency_val请求数已失败,并等待30+秒才能进行重试。整个操作可能会停止(仍在等待下游的成功),如果下游系统不能及时恢复,这可能是不可取的。最好将backOff限制从Integer.MAX_VALUE替换为可管理的东西,然后再记录错误并继续下一个事件。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62965034

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档