前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava之flatMap系列转换操作符

RxJava之flatMap系列转换操作符

作者头像
103style
发布2022-12-19 13:27:47
4230
发布2022-12-19 13:27:47
举报
文章被收录于专栏:Android开发经验分享

转载请以链接形式标明出处: 本文出自:103style的博客

转换相关的操作符 以及 官方介绍

RxJavaflatMap 系列 转换操作符 官方介绍 :Transforming Observables

以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析

flatMap

官方示例:

代码语言:javascript
复制
Observable.just("A", "B", "C")
    .flatMap(new Function<String, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(final String a) throws Exception {
            return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
                    .map(new Function<Long, String>() {
                        @Override
                        public String apply(Long b) throws Exception {
                            return '(' + a + ", " + b + ')';
                        }
                    });
        }
    })
    .blockingSubscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println(o);
        }
    });

输出:

代码语言:javascript
复制
(A, 1)
(B, 1)
(C, 1)
(C, 2)
(B, 2)
(A, 2)
(A, 3)
(B, 3)
(C, 3)

返回对象的 ObservableFlatMapsubscribeActual 方法:

代码语言:javascript
复制
public void subscribeActual(Observer<? super U> t) {
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

MergeObserveronNext 方法:

代码语言:javascript
复制
public void onNext(T t) {
    // safeguard against misbehaving sources
    ...
    subscribeInner(p);
}

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) {
           ...
                    drain();
           ...
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}

InnerObserveronNext 方法:

代码语言:javascript
复制
public void onNext(U t) {
    if (fusionMode == QueueDisposable.NONE) {
        parent.tryEmit(t, this);
    } else {
        parent.drain();
    }
}

MergeObserverdrain() 方法:

代码语言:javascript
复制
void drain() {
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}

void drainLoop() {
    final Observer<? super U> child = this.downstream;//1.0
    int missed = 1;
    for (;;) {
        ...
        SimplePlainQueue<U> svq = queue;
        if (svq != null) {
            for (;;) {
                ...
                U o = svq.poll();
                if (o == null) {
                    break;
                }
                child.onNext(o);//1.1
            }
        }
        ...
        if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
            Throwable ex = errors.terminate();
            if (ex != ExceptionHelper.TERMINATED) {
                if (ex == null) {
                    child.onComplete();//1.2
                } else {
                    child.onError(ex);//1.3
                }
            }
            return;
        }
        ...
    }
}

最终还是调用了 传进来的 observer 的 对应方法。


flatMapCompletable

官方示例:

代码语言:javascript
复制
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(final Integer x) throws Exception {
        return Completable.timer(x, TimeUnit.SECONDS)
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("Info: Processing of item \"" + x + "\" completed");
                    }
                });
    }
});
completable.doOnComplete(
        new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("Info: Processing of all items completed");
            }
        })
        .blockingAwait();

输出:

代码语言:javascript
复制
Info: Processing of item "1" completed
Info: Processing of item "2" completed
Info: Processing of item "3" completed
Info: Processing of all items completed
flatMapIterable

官方示例:

代码语言:javascript
复制
Observable.just(1, 2, 3, 4)
        .flatMapIterable(new Function<Integer, Iterable<?>>() {
            @Override
            public Iterable<?> apply(Integer integer) throws Exception {
                switch (integer % 4) {
                    case 1:
                        return Arrays.asList("A");
                    case 2:
                        return Arrays.asList("B", "B");
                    case 3:
                        return Arrays.asList("C", "C", "C");
                    default:
                        return Arrays.asList();
                }
            }
        })
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println(o);
            }
        });

输出:

代码语言:javascript
复制
A
B
B
C
C
C

flatMapMaybe

官方示例:

代码语言:javascript
复制
Observable.just(9.0, 16.0, -4.0)
        .flatMapMaybe(new Function<Double, MaybeSource<?>>() {
            @Override
            public MaybeSource<?> apply(Double x) throws Exception {
                if (x.compareTo(0.0) < 0) {
                    return Maybe.empty();
                } else {
                    return Maybe.just(Math.sqrt(x));
                }
            }
        })
        .subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

代码语言:javascript
复制
3.0
4.0
onComplete

flatMapObservable

官方示例:

代码语言:javascript
复制
Single<String> source = Single.just("Kirk, Spock, Chekov, Sulu");
Observable<String> names = source.flatMapObservable(new Function<String, ObservableSource<? extends String>>() {
    @Override
    public ObservableSource<? extends String> apply(String text) throws Exception {
        return Observable.fromArray(text.split(","))
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s;
                    }
                });
    }
});
names.subscribe(new Consumer<String>() {
    @Override
    public void accept(String name) throws Exception {
        System.out.println("onNext: " + name);
    }
});

输出:

代码语言:javascript
复制
onNext: Kirk
onNext:  Spock
onNext:  Chekov
onNext:  Sulu

flatMapPublisher

官方示例:

代码语言:javascript
复制
Single<String> source = Single.just("Kirk, Spock, Chekov, Sulu");
Flowable<String> names = source.flatMapPublisher(new Function<String, Publisher<? extends String>>() {
    @Override
    public Publisher<? extends String> apply(String text) throws Exception {
        return Flowable.fromArray(text.split(","))
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s;
                    }
                });
    }
});
names.subscribe(new Consumer<String>() {
    @Override
    public void accept(String name) throws Exception {
        System.out.println("onNext: " + name);
    }
});

输出:

代码语言:javascript
复制
onNext: Kirk
onNext:  Spock
onNext:  Chekov
onNext:  Sulu

flatMapSingle

官方示例:

代码语言:javascript
复制
Observable.just(4, 2, 1, 3)
        .flatMapSingle(new Function<Integer, SingleSource<?>>() {
            @Override
            public SingleSource<?> apply(final Integer integer) throws Exception {
                return Single.timer(integer, TimeUnit.SECONDS)
                        .map(new Function<Long, Object>() {
                            @Override
                            public Object apply(Long aLong) throws Exception {
                                return integer;
                            }
                        });
            }
        })
        .blockingSubscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println(o);
            }
        });

输出:

代码语言:javascript
复制
1
2
3
4

flatMapSingleElement

官方示例:

代码语言:javascript
复制
Maybe<Integer> source = Maybe.just(-42);
Maybe<Integer> result = source.flatMapSingleElement(new Function<Integer, SingleSource<? extends Integer>>() {
    @Override
    public SingleSource<? extends Integer> apply(Integer x) throws Exception {
        return Single.just(Math.abs(x));
    }
});

result.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println(integer);
    }
});

输出:

代码语言:javascript
复制
42

以上

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-06-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 转换相关的操作符 以及 官方介绍
    • flatMap
      • flatMapCompletable
        • flatMapIterable
          • flatMapMaybe
            • flatMapObservable
              • flatMapPublisher
                • flatMapSingle
                  • flatMapSingleElement
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档