我想要做的是在Webflux中的单声道上有条件地重复,webclient.The的情况如下:
我们有一些业务rest服务返回一个生成的文档。此文档的生成是从在此服务之前调用的另一个服务触发的。但是,回到业务上:文档生成服务需要10-30秒。我们要做的是: 10秒后检查是否生成了文档(Mono)。如果是这样的话,一切都很好。如果没有,请在5秒后重复(或重试)并检查是否生成了文档。依此类推,直到(最坏的情况) 30秒后超时。这个是可能的吗?一些(伪)代码:
return this.webClient.post().uri(SERVICE_URL)).
body(BodyInserters.fromObject(docRequest)).retrieve().
bodyToMono(Document.class).
delaySubscription(Duration.ofSeconds(10)).
repeat5TimesWithDynamicTimeDelayUntil(!document.isEmpty()).
subscribe();
Greetz Bernardo
发布于 2019-05-01 12:56:35
是的,这是可能的。
Mono
有两个用于重新订阅(从而重新触发请求)的概念。
re-subscribe
如果上游成功完成,则重新订阅=re-subscribe
对于不同的用例,每个概念在Mono
上都有多个重载方法。查找retry*
和repeat*
方法。例如,要在没有延迟的情况下重试最大次数,请使用retry(int numRetries)
。
通过retryWhen
和repeatWhen
方法可以支持更复杂的用例,如以下示例所示。
retryWhen
如果单声道已完成但出现异常,则重试次数最多为5次,每次尝试间隔5秒:
// From reactor-core >= v3.3.4.RELEASE
import reactor.util.retry.Retry;
this.webClient
.post()
.uri(SERVICE_URL)
.body(BodyInserters.fromValue(docRequest))
.retrieve()
.bodyToMono(Document.class)
.retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
.delaySubscription(Duration.ofSeconds(10))
重试构建器支持其他退避策略(例如指数)和其他选项,以完全自定义重试。
注意,上面使用的retryWhen(Retry)
方法是在reactor-Corev3.3.4.RELEASE中添加的,retryWhen(Function)
方法已被弃用。在reactor-CoreV3.3.4.RELEASE之前,您可以使用reactor-extras项目中的重试函数构建器来创建一个要传递给retryWhen(Function)
的Function
。
repeatWhen
如果您需要在成功时重复,则使用.repeatWhen
或.repeatWhenEmpty
而不是上面的.retryWhen
。
使用reactor-extras项目中的repeat函数构建器创建repeat Function
,如下所示:
// From reactor-extras
import reactor.retry.Repeat;
this.webClient
.post()
.uri(SERVICE_URL)
.body(BodyInserters.fromValue(docRequest))
.retrieve()
.bodyToMono(Document.class)
.filter(document -> !document.isEmpty())
.repeatWhenEmpty(Repeat.onlyIf(repeatContext -> true)
.exponentialBackoff(Duration.ofSeconds(5), Duration.ofSeconds(10))
.timeout(Duration.ofSeconds(30)))
.delaySubscription(Duration.ofSeconds(10))
如果您想在成功或失败时都重新订阅,也可以使用.repeat*
链接.retry*
。
https://stackoverflow.com/questions/55923326
复制相似问题