首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在执行Flux.map()时处理错误

如何在执行Flux.map()时处理错误
EN

Stack Overflow用户
提问于 2016-03-26 15:39:15
回答 5查看 30.4K关注 0票数 33

我正在试图找出在映射一个Flux中的元素时如何处理错误。

例如,我将CSV字符串解析为我的业务POJO之一:

代码语言:javascript
运行
复制
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));

其中一些行可能包含错误,所以我在日志中得到的是:

代码语言:javascript
运行
复制
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo

我在API中读到了一些错误处理方法,但大多数方法都是指返回一个“错误值”或使用回退Flux,如下所示:

代码语言:javascript
运行
复制
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);

然而,与我的myflux一起使用这意味着整个通量将再次被处理。

那么,是否有一种方法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的流量?

使用@akarnokd解决方案的更新

代码语言:javascript
运行
复制
public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list of raw quotes string in a new Flux<String>
    .flatMap(list -> Flux.fromIterable(list))
    // Convert the string to POJOs
    .flatMap(x -> {
            try {
                return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
            }
            catch (IllegalArgumentException ex){
                System.out.println("Error decoding stock quotation: " + x);
                return Flux.empty();
            }
    });

    return processingFlux;
}

但是,这是一种魅力,因为您可以看到代码没有以前那么优雅了。Flux没有任何方法来执行此代码所做的工作吗?

代码语言:javascript
运行
复制
retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2016-03-30 10:23:27

相反,您需要flatMap,如果处理失败,我们将返回一个空序列:

代码语言:javascript
运行
复制
myflux.flatMap(v -> {
    try {
        return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
    } catch (IllegalArgumentException ex) {
        return Flux.empty();
    }
});
票数 31
EN

Stack Overflow用户

发布于 2018-05-26 13:13:13

如果您想使用反应堆3的方法来处理异常,您可以使用Mono.fromCallable

代码语言:javascript
运行
复制
flatMap(x -> 
    Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
        .flux()
        .flatMap(Flux::fromIterable)
        .onErrorResume(Flux::empty)
)

不幸的是,没有Flux.fromCallable,因此假设可调用返回一个列表,则必须手动将其转换为Flux。

票数 21
EN

Stack Overflow用户

发布于 2018-05-22 15:23:47

随着目前版本的反应堆3,有相当多的方法已经被增加。所以我们可以这样做:

代码语言:javascript
运行
复制
Flux.onErrorResume(error -> { 
        System.out.println("Error decoding stock quotation: " + e);
        return Flux.empty();
    });

请参阅有关如何处理错误这里的更多信息

票数 9
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36237230

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档