前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava之concatMap系列转换操作符源码介绍

RxJava之concatMap系列转换操作符源码介绍

作者头像
103style
发布2022-12-19 13:25:15
3260
发布2022-12-19 13:25:15
举报
文章被收录于专栏:Android开发经验分享

转载请以链接形式标明出处: 本文出自:103style的博客

转换相关的操作符 以及 官方介绍

RxJavaconcatMap 系列 转换操作符 官方介绍 :Transforming Observables

以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析

concatMap

官方示例:

代码语言:javascript
复制
Observable.range(0, 5)
        .concatMap(i -> {
            long delay = Math.round(Math.random() * 2);

            return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i);
        })
        .blockingSubscribe(System.out::print);

输出:

代码语言:javascript
复制
01234

返回对象的 ObservableConcatMapsubscribeActual 方法: 单参数的concatMap操作符默认的 delayErrorsErrorMode.IMMEDIATE

代码语言:javascript
复制
public void subscribeActual(Observer<? super U> observer) {
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) {
        return;
    }
    if (delayErrors == ErrorMode.IMMEDIATE) {
        SerializedObserver<U> serial = new SerializedObserver<U>(observer);
        source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
    } else {
        source.subscribe(new ConcatMapDelayErrorObserver<T, U>(observer, mapper, bufferSize, delayErrors == ErrorMode.END));
    }
}

继续看 SourceObserveronNext(T t)

代码语言:javascript
复制
public void onNext(T t) {
    if (done) {
        return;
    }
    if (fusionMode == QueueDisposable.NONE) {
        queue.offer(t);
    }
    drain();
}

public void onComplete() {
    if (done) {
        return;
    }
    done = true;
    drain();
}

void drain() {
    ...
    for (;;) {
        ...
        if (!active) {
            ...
            if (!empty) {
                ObservableSource<? extends U> o;
                try {
                    //1.0
                    o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    dispose();
                    queue.clear();
                    downstream.onError(ex);
                    return;
                }
                active = true;
                o.subscribe(inner);//2.0
            }
        }
        ...
    }
}
  • (1.0) 在这里我们看到通过concatMap操作符传入Functionapply重新构建了一个 ObservableSource对象。
  • (2.0) 然后新建的 ObservableSource对象来 subscribe(observer)

concatMapXXX

concatMapCompletableconcatMapCompletableDelayErrorconcatMapDelayErrorconcatMapEagerconcatMapEagerDelayErrorconcatMapIterableconcatMapMaybeconcatMapMaybeDelayErrorconcatMapSingleconcatMapSingleDelayError 实现逻辑和concatMap类似,就不再赘述了。

官方示例:

concatMapCompletable

代码语言:javascript
复制
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletable(x -> {
    return Completable.timer(x, TimeUnit.SECONDS)
            .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
});
completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
        .blockingAwait();

输出:

代码语言:javascript
复制
Info: Processing of item "2" completed
Info: Processing of item "1" completed
Info: Processing of item "3" completed
Info: Processing of all items completed

concatMapCompletableDelayError

代码语言:javascript
复制
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletableDelayError(x -> {
    if (x.equals(2)) {
        return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
    } else {
        return Completable.timer(1, TimeUnit.SECONDS)
                .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
    }
});

completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
        .onErrorComplete()
        .blockingAwait();

输出:

代码语言:javascript
复制
Info: Processing of item "1" completed
Info: Processing of item "3" completed
Error: Processing of item "2" failed!

concatMapDelayError

代码语言:javascript
复制
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
        .concatMapDelayError(x -> {
            if (x.equals(1L))
                return Observable.error(new IOException("Something went wrong!"));
            else return Observable.just(x, x * x);
        })
        .blockingSubscribe(
                x -> System.out.println("onNext: " + x),
                error -> System.out.println("onError: " + error.getMessage()));

输出:

代码语言:javascript
复制
onNext: 2
onNext: 4
onNext: 3
onNext: 9
onError: Something went wrong!

concatMapEager

代码语言:javascript
复制
Observable.range(0, 5)
        .concatMapEager(i -> {
            long delay = Math.round(Math.random() * 3);

            return Observable.timer(delay, TimeUnit.SECONDS)
                    .map(n -> i)
                    .doOnNext(x -> System.out.println("Info: Finished processing item " + x));
        })
        .blockingSubscribe(i -> System.out.println("onNext: " + i));

输出:

代码语言:javascript
复制
Info: Finished processing item 2
Info: Finished processing item 3
Info: Finished processing item 1
Info: Finished processing item 0
Info: Finished processing item 4
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4

concatMapEagerDelayError

代码语言:javascript
复制
Observable<Integer> source = Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onError(new Error("Fatal error!"));
});

source.doOnError(error -> System.out.println("Info: Error from main source " + error.getMessage()))
        .concatMapEagerDelayError(x -> {
            return Observable.timer(1, TimeUnit.SECONDS).map(n -> x)
                    .doOnSubscribe(it -> System.out.println("Info: Processing of item \"" + x + "\" started"));
        }, true)
        .blockingSubscribe(
                x -> System.out.println("onNext: " + x),
                error -> System.out.println("onError: " + error.getMessage()));

输出:

代码语言:javascript
复制
Info: Processing of item "1" started
Info: Processing of item "2" started
Info: Error from main source Fatal error!
onNext: 1
onNext: 2
onError: Fatal error!

concatMapIterable

代码语言:javascript
复制
Observable.just("A", "B", "C")
        .concatMapIterable(item -> Arrays.asList(item, item, item))
        .subscribe(System.out::print);

输出:

代码语言:javascript
复制
AAABBBCCC

concatMapMaybe

代码语言:javascript
复制
Observable.just("5", "3,14", "2.71", "FF")
        .concatMapMaybe(v -> {
            return Maybe.fromCallable(() -> Double.parseDouble(v))
                    .doOnError(e -> System.out.println("Info: The value \"" + v + "\" could not be parsed."))
                    // Ignore values that can not be parsed.
                    .onErrorComplete();
        })
        .subscribe(x -> System.out.println("onNext: " + x));

输出:

代码语言:javascript
复制
onNext: 5.0
Info: The value "3,14" could not be parsed.
onNext: 2.71
Info: The value "FF" could not be parsed.

concatMapMaybeDelayError

代码语言:javascript
复制
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd.MM.uuuu");
Observable.just("04.03.2018", "12-08-2018", "06.10.2018", "01.12.2018")
        .concatMapMaybeDelayError(date -> {
            return Maybe.fromCallable(() -> LocalDate.parse(date, dateFormatter));
        })
        .subscribe(
                localDate -> System.out.println("onNext: " + localDate),
                error -> System.out.println("onError: " + error.getMessage()));

输出:

代码语言:javascript
复制
onNext: 2018-03-04
onNext: 2018-10-06
onNext: 2018-12-01
onError: Text '12-08-2018' could not be parsed at index 2

concatMapSingle

代码语言:javascript
复制
Observable.just("5", "3,14", "2.71", "FF")
        .concatMapSingle(v -> {
            return Single.fromCallable(() -> Double.parseDouble(v))
                    .doOnError(e -> System.out.println("Info: The value \"" + v + "\" could not be parsed."))

                    // Return a default value if the given value can not be parsed.
                    .onErrorReturnItem(42.0);
        })
        .subscribe(x -> System.out.println("onNext: " + x));

输出:

代码语言:javascript
复制
onNext: 5.0
Info: The value "3,14" could not be parsed.
onNext: 42.0
onNext: 2.71
Info: The value "FF" could not be parsed.
onNext: 42.0

concatMapSingleDelayError

代码语言:javascript
复制
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd.MM.uuuu");
Observable.just("24.03.2018", "12-08-2018", "06.10.2018", "01.12.2018")
        .concatMapSingleDelayError(date -> {
            return Single.fromCallable(() -> LocalDate.parse(date, dateFormatter));
        })
        .subscribe(
                localDate -> System.out.println("onNext: " + localDate),
                error -> System.out.println("onError: " + error.getMessage()));

输出:

代码语言:javascript
复制
onNext: 2018-03-24
onNext: 2018-10-06
onNext: 2018-12-01
onError: Text '12-08-2018' could not be parsed at index 2
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-06-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 转换相关的操作符 以及 官方介绍
    • concatMap
      • concatMapXXX
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档