前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava 1.x 笔记:组合型操作符

RxJava 1.x 笔记:组合型操作符

作者头像
张拭心 shixinzhang
发布2018-01-05 16:54:43
1.9K0
发布2018-01-05 16:54:43
举报

最近去检查眼睛,发现度数又涨了,唉,各位猿多注意保护自己的眼睛吧!

前面学了 RxJava 的三种关键操作符:

  1. 创建型操作符
  2. 过滤型操作符
  3. 变换型操作符

组合型操作符

组合型操作符即处理多个 Observable 数据源,在处理后将它们合并成一个 Observable。

Zip

Zip 操作符的作用是:通过一个特定的函数组合多个 Observable 发射的数据,然后发射每个组合的结果。

这里写图片描述
这里写图片描述

Zip 操作符对你选中的多个 Observable 发射的数据按顺序应用一个函数,然后返回一个 Observable,这个 Observable 发射函数的返回结果。

Zip 操作符对发射的数据的顺序很严格,如上图所示,它发射的第一个数据一定是 Observable A 发射的第一个数据和 Observable B 发射的第一个数据经过组合的结果;发射的第二个数据也一定是 A 发射的第二个数据和 B 发个的第二个数据的组合结果;一次类推,直到元素最少的 Observable 发射完元素。

RxJava 中对应的实现是 zipzipWith

zip

这里写图片描述
这里写图片描述

RxJava 中,zip() 的重载方法有 11 种:

这里写图片描述
这里写图片描述

前 2 个支持以 Iterable 或者数组的形式传入多个 Observable,后面 9 个分别支持从 1 到 9 个 Observable 作为参数,所有方法的最后一个参数是一个函数,它接收各个 Observable 按顺序发出的数据,然后对它们进行一个操作,将操作的结果发射出去。

代码语言:javascript
复制
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
    return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
}

使用例子:

代码语言:javascript
复制
private void zip() {
    Observable<String> observableA = Observable.just("A", "B", "C", "d", "E");
    Observable<Integer> observableB = Observable.just(1, 2, 3, 4);

    Observable
            .zip(observableA, observableB, new Func2<String, Integer, String>() {
                @Override
                public String call(final String s, final Integer integer) {
                    return s + "_" + integer;
                }
            })
            .subscribe(this.<String>getPrintSubscriber());
}

运行结果:

代码语言:javascript
复制
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: A_1
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: B_2
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: C_3
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: d_4
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onCompleted

可以看到,zip() 的确按顺序将 observableA 和 observableB 发射的数据组合了起来,然后发射了出去。当元素较少的一个 Observable 发射完后,zip 也就停止发射了。

zipWith

这里写图片描述
这里写图片描述

zipWith 也可以组合多个 Observable,不过和 zip 不同的是,zipWith 是非静态方法,它需要一个 Observable 来调用。

zipWith 两种重载:

代码语言:javascript
复制
public final <T2, R> Observable<R> zipWith(Observable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
    return (Observable<R>)zip(this, other, zipFunction);
}
public final <T2, R> Observable<R> zipWith(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
    return lift(new OperatorZipIterable<T, T2, R>(other, zipFunction));
}

第一个方法的参数是一个 Observable,它的作用是将当前 Observable 和参数 Observable 组合;第二个方法的参数是一个 Iterable,它的作用是将当前 Observable 和许多 Observable 组合。

使用例子:

代码语言:javascript
复制
private void zipWith() {
    Observable<String> observableA = Observable.just("A", "B", "C", "d", "E");
    Observable
            .just(1, 2, 3, 4)
            .zipWith(observableA, new Func2<Integer, String, String>() {
                @Override
                public String call(final Integer integer, final String s) {
                    return integer + ", " + s;
                }
            })
            .subscribe(this.<String>getPrintSubscriber());
}

运行结果:

代码语言:javascript
复制
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 1, A
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 2, B
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 3, C
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 4, d
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onCompleted

zip 很相似是吧。

CombineLatest

CombineLatest 操作符的作用是:当两个 Observable 中任意一个发射数据时,会结合另外一个 Observable 最近发射的数据进行一些函数操作,然后将操作的结果发射出去。

这里写图片描述
这里写图片描述

CombineLatestZip 有点相似,都是将两个 Observable 发射的数据结合起来,不同的是,每个 Observable 都发射了新元素后, Zip 才进行操作然后发射操作结果;而 CombineLatest 在每个 Observable 都发射一个数据后,只要有一个 Observable 发射数据,CombineLatest 就会进行操作然后发射操作结果。

当任何一个 Observable 发射了新数据,CombineLatest 会将这个新数据与另外的 Observable 之前发射的最新数据进行一个函数操作。

RxJava 中有两种实现:combineLatest, withLatestFrom

combineLatest

这里写图片描述
这里写图片描述

RxJava 中的 combineLatest() 有 10 种重载:

这里写图片描述
这里写图片描述

不同的地方基本就是接收 Observable 的个数不同。

代码语言:javascript
复制
public static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
    return (Observable<R>)combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
}
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
    return unsafeCreate(new OnSubscribeCombineLatest<T, R>(sources, combineFunction));
}

使用例子:

代码语言:javascript
复制
/***
 * 将 A 发射的数据与 B 之前发射最新的数据结合,进行函数操作
 */
private void combineLatest() {
    Observable<Long> observableA = Observable.interval(3, TimeUnit.SECONDS);
    Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS);

    Observable
            .combineLatest(observableA, observableB, new Func2<Long, Long, String>() {
                @Override
                public String call(final Long itemA, final Long itemB) {
                    return "combine result: " + itemA + "/" + itemB;
                }
            }).subscribe(this.<String>getPrintSubscriber());
}

在上面的代码中我们创建了两个 Observable,发射速率分别为 3 秒和 2 秒。运行结果:

代码语言:javascript
复制
07-24 15:38:53.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 0/0
07-24 15:38:54.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 0/1
07-24 15:38:56.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/1
07-24 15:38:56.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/2
07-24 15:38:58.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/3
07-24 15:38:59.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 2/3
07-24 15:39:00.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 2/4
07-24 15:39:02.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/4
07-24 15:39:02.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/5
07-24 15:39:04.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/6
07-24 15:39:05.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 4/6
07-24 15:39:06.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 4/7
07-24 15:39:08.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/7
07-24 15:39:08.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/8
07-24 15:39:10.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/9
07-24 15:39:11.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 6/9
07-24 15:39:12.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 6/10
07-24 15:39:14.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/10
07-24 15:39:14.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/11
07-24 15:39:16.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/12
07-24 15:39:17.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 8/12
07-24 15:39:18.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 8/13
07-24 15:39:20.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/13
07-24 15:39:20.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/14
07-24 15:39:22.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/15
07-24 15:39:23.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 10/15

可以看到,有元素重复多次,说明一个 Observable 还没发射数据,另一个 Observable 发射数据就会出发 combineLatest

默认不在任何调度器上。

withLatestFrom

这里写图片描述
这里写图片描述

withLatestFromcombineLatest 很相似,不同之处在于,它不是静态方法,必须通过一个 Observable 对象进行调用。而他的作用就是:只有在这个 Observable 对象发射数据时,才结合其他 Observable 发射的最新数据进行相关的函数操作。

也就是说把组合的主动权都交给了调用对象。

使用例子:

代码语言:javascript
复制
private void withLatestFrom() {
    Observable<Long> observableA = Observable.interval(3, TimeUnit.SECONDS);
    Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS);

    observableB.withLatestFrom(observableA, new Func2<Long, Long, String>() {
        @Override
        public String call(final Long itemA, final Long itemB) {
            return "withLatestFrom: " + itemA + "/" + itemB;
        }
    }).subscribe(this.<String>getPrintSubscriber());
}

运行结果:

代码语言:javascript
复制
07-24 15:56:13.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 1/0
07-24 15:56:15.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 2/1
07-24 15:56:17.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 3/1
07-24 15:56:19.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 4/2
07-24 15:56:21.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 5/3
07-24 15:56:23.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 6/3
07-24 15:56:25.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 7/4
07-24 15:56:27.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 8/5
07-24 15:56:29.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 9/5
07-24 15:56:31.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 10/6
07-24 15:56:33.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 11/7
07-24 15:56:35.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 12/7

的确如我们所想,只有 observableB 发射数据时,才进行组合操作。

Join

Join 操作符的作用是:在一个 Observable 发射的一条数据的时间窗口内,另外一个 Observable 发射了一条数据,就组合这两条数据。

这里写图片描述
这里写图片描述

如上图所示,我们可以给两个 Observable 各自定义它们发射的数据的时间窗口(可以理解为生命周期),在 Observable A 发射一个元素 a 后,在 a 的生命周期内,Observable B 只要发射了数据,就会和 a 组合。如果 A 定义的时间窗口比发射速率久,就会出现 B 发射的数据跟 A 的多个数据组合;反过来也一样,在 B 发射的元素时间窗口内,A 发射数据也会和 B 的元素组合。

Join 的概念不是很容易理解,这个操作符需要多费点心。

RxJava 中的实现有两种:join()groupJoin()

join

这里写图片描述
这里写图片描述

API 对 join()的介绍是:

Correlates the items emitted by two Observables based on overlapping durations.

即:根据重叠持续时间将两个 Observable 发出的项关联起来。

join() 方法如下:

代码语言:javascript
复制
public final <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Observable<TRight> right, Func1<T, Observable<TLeftDuration>> leftDurationSelector,
        Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
        Func2<T, TRight, R> resultSelector) {
    return unsafeCreate(new OnSubscribeJoin<T, TRight, TLeftDuration, TRightDuration, R>(this, right, leftDurationSelector, rightDurationSelector, resultSelector));
}

它接收 4 个参数:

  1. right:将要和当前 Observable 元素组合的另一个 Observable
  2. leftDurationSelector:定义当前 Observable 发射元素的时间窗口函数,返回一个 Observable
  3. rightDurationSelector:定义 right Observable 发射元素的时间窗口函数
  4. resultSelector:在这个函数中做两个 Observable 元素的组合操作

使用例子:

代码语言:javascript
复制
private void join() {
    //产生 0 2 4 6 8
    Observable<Long> observableA = Observable.interval(1, TimeUnit.SECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(final Long aLong) {
                    return aLong * 2;
                }
            })
            .take(5);

    //产生 0 3 6 9 12
    Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(final Long aLong) {
                    return aLong * 3;
                }
            })
            .take(5);

    observableA.join(observableB,
            new Func1<Long, Observable<Long>>() {       //定义源 Observable 发射数据的时间窗口
                @Override
                public Observable<Long> call(final Long aLong) {
                    System.out.println("A:" + aLong);
                    return Observable.just(aLong).delay(2000 , TimeUnit.MILLISECONDS);   //延迟 500 毫秒后发射,即声明周期为 1000毫秒
                }
            }, new Func1<Long, Observable<Long>>() {    //定义第二个 Observable 发射数据的时间窗口
                @Override
                public Observable<Long> call(final Long aLong) {
                    System.out.println("B:" + aLong);
                    return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
                }
            }, new Func2<Long, Long, String>() {    //组合两个 Observable 发射的数据的函数
                @Override
                public String call(final Long aLong, final Long aLong2) {
                    return "join result:" + aLong + "/" + aLong2;
                }
            })
            .subscribe(this.<String>getPrintSubscriber());
}

在上面的代码中,我们创建了 2 个 Observable,同时调用了 join() 方法,传入的参数中,第一个函数中定义 observableA 发射元素的时间窗口,这里定义为 2 s;第二个函数中定义了 observableB 的时间创建,为 1 s。

我们先根据下面的图理解一下:

这里写图片描述
这里写图片描述

observableA 中每隔一秒发射一个元素,时间窗口为 2s,在图上表现为两个大格子; observableB 每隔两秒发射一个元素,时间窗口为 1s,在图上表现为一个大格子。

我们可以看到,A 先发出 0 ,这时 B 还没有发射元素,所以无法结合。第二秒时 A 发出 2,B 发出 0,这时 A 发出的 0 时间窗口还没关闭,因此 A 的 0 和 2 都和 B 的 0 结合。以此类推,直到一方(这里是 A) 发射完元素后,停止结合。

运行结果:

代码语言:javascript
复制
07-24 18:14:17.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:0
07-24 18:14:18.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:2
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:0
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:0/0
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:2/0
07-24 18:14:19.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:4
07-24 18:14:19.735 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:4/0
07-24 18:14:20.745 715-769/top.shixinzhang.rxjavademo I/System.out: A:6
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:3
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:4/3
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:6/3
07-24 18:14:21.745 715-769/top.shixinzhang.rxjavademo I/System.out: A:8
07-24 18:14:21.745 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:8/3
07-24 18:14:22.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:6
07-24 18:14:22.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:8/6
07-24 18:14:23.745 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onCompleted

groupJoin

这里写图片描述
这里写图片描述

API 对 groupJoin() 的介绍是:

Returns an Observable that correlates two Observables when they overlap in time and groups the results.

即:将两个 Observable 发射的、时间上重叠的数据关联起来,然后分组结果。

groupJoin() 方法如下:

代码语言:javascript
复制
public final <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1<? super T, ? extends Observable<D1>> leftDuration,
        Func1<? super T2, ? extends Observable<D2>> rightDuration,
        Func2<? super T, ? super Observable<T2>, ? extends R> resultSelector) {
    return unsafeCreate(new OnSubscribeGroupJoin<T, T2, D1, D2, R>(this, right, leftDuration, rightDuration, resultSelector));
}

前三个参数和 join() 一致,不同之处在与第四个参数:

代码语言:javascript
复制
new Func2<Long, Observable<Long>, Observable<String>>() {
    @Override
    public Observable<String> call(final Long itemA, final Observable<Long> longObservable) {
        return longObservable.map(new Func1<Long, String>() {
            @Override
            public String call(final Long itemB) {
                return "groupJoin result:" + itemA + "/" + itemB;
            }
        });
    }
}

它返回的是一个 Observable,而不是一个 Object。

使用例子:

代码语言:javascript
复制
private void groupJoin() {

    //产生 0 2 4 6 8
    Observable<Long> observableA = Observable.interval(1, TimeUnit.SECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(final Long aLong) {
                    return aLong * 2;
                }
            })
            .take(5);

    //产生 0 3 6 9 12
    Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(final Long aLong) {
                    return aLong * 3;
                }
            })
            .take(5);

    observableA.groupJoin(observableB,
            new Func1<Long, Observable<Long>>() {       //定义源 Observable 发射数据的时间窗口
                @Override
                public Observable<Long> call(final Long aLong) {
                    System.out.println("A:" + aLong);
                    return Observable.just(aLong).delay(2000, TimeUnit.MILLISECONDS);   //延迟 500 毫秒后发射,即声明周期为 1000毫秒
                }
            }, new Func1<Long, Observable<Long>>() {    //定义第二个 Observable 发射数据的时间窗口
                @Override
                public Observable<Long> call(final Long aLong) {
                    System.out.println("B:" + aLong);
                    return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
                }
            }, new Func2<Long, Observable<Long>, Observable<String>>() {
                @Override
                public Observable<String> call(final Long itemA, final Observable<Long> longObservable) {
                    return longObservable.map(new Func1<Long, String>() {
                        @Override
                        public String call(final Long itemB) {
                            return "groupJoin result:" + itemA + "/" + itemB;
                        }
                    });
                }
            })
            .subscribe(new Action1<Observable<String>>() {
                @Override
                public void call(final Observable<String> observable) {
                    observable.subscribe(new Action1<String>() {
                        @Override
                        public void call(final String s) {
                            System.out.println("onNext:" + s);
                        }
                    });
                }
            });
}

运行结果:

代码语言:javascript
复制
07-26 10:46:35.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:0
07-26 10:46:36.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:0
07-26 10:46:36.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:0/0
07-26 10:46:36.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:2
07-26 10:46:36.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:2/0
07-26 10:46:37.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:4
07-26 10:46:37.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:4/0
07-26 10:46:38.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:6
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:3
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:4/3
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:6/3
07-26 10:46:39.725 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:8
07-26 10:46:39.725 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:8/3
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:6
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:8/6
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:6/6
07-26 10:46:42.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:9
07-26 10:46:44.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:12

可以看到,groupJoin() 用起来比较费劲啊,目前我还没发现具体的用途。

Merge

Merge 操作符的作用正如它的名字一样,将多个 Observable 发射的数据组合到一个 Observable 中。

这里写图片描述
这里写图片描述

Merge 不保证元素发射的顺序,可能会导致顺序错乱(与之对应的是 Concat 操作符,它可以先按顺序发射一个 Observable 发射的数据,然后再按顺序发射下一个的 )。

在上面的图中,一旦有一个 Observable 发出 onError 事件,整个 merge 的过程也就结束了。

这里写图片描述
这里写图片描述

为了处理这种问题,在许多 ReactiveX 实现中提供了 MergeDelayError 操作符,它收到 onError 事件后会保留,直到所有 Observable 都发射完数据才传递给观察者。

RxJava 中这两个操作符的实现分别为 :mergemergeDelayError

merge

这里写图片描述
这里写图片描述

这个图可以比较清晰地表现出 merge 处理 onError 事件的方式。

merge 有 14 种重载,其中主要是接受参数个数的不同,结合前面的操作符可以看到,组合类的操作符很多都这样。

这里写图片描述
这里写图片描述

merge 源码:

代码语言:javascript
复制
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
    return merge(new Observable[] { t1, t2 });
}
public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
    return merge(from(sequences));
}
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
    if (source.getClass() == ScalarSynchronousObservable.class) {
        return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
    }
    return source.lift(OperatorMerge.<T>instance(false));
}

使用例子:

代码语言:javascript
复制
/**
 * 组合两个 Observable 发出的数据,不保证顺序
 */
private void merge() {
    Observable<Integer> observableA = Observable.range(0 , 5)   //在另外一个线程
            .subscribeOn(Schedulers.io());
    Observable<Integer> observableB = Observable.range(10, 5);

    Observable.merge(observableA, observableB)
            .subscribe(this.<Integer>getPrintSubscriber());
}

运行结果:

代码语言:javascript
复制

merge 是静态的类方法,RxJava 还提供了实例方法 mergeWith(),作用和 merge 一样:

代码语言:javascript
复制
public final Observable<T> mergeWith(Observable<? extends T> t1) {
    return merge(this, t1);
}

mergeDelayError

这里写图片描述
这里写图片描述

mergeDelayErrormerge 非常相似,不同之处就在于前面介绍的,对 onError 事件的处理。

这里写图片描述
这里写图片描述

Concat

官方文档中,Concat 属于算数聚合运算符,不输入组合型,但是为了方便介绍相关的操作符,我们在这篇一起了解了吧。

Merge 相似但又不同的是,Concat 会按顺序发射多个 Observable 发射的数据,重点就是 按顺序

这里写图片描述
这里写图片描述

Concat 操作符会将多个 Observable 发射的数据组合到一个 Observable 然后发射出去。第一个 Observable 发射的所有数据在第二个 Observable 发射数据之前发射,以此类推。

直到前面一个 Observable 终止,Concat 才会订阅后一个 Observable 。

注意: 如果你想连接一个”热的” Observable(即在创建后立即开始发射数据的 Observable,即使没有订阅者),Concat 将不会看到订阅前“热“ Observable 发射的任何数据。

在一些 ReactiveX 的实现中,还有一个 ConcatMap 操作符 (之前写的 concatMap 的链接),它会将源 Observable 发射的数据进行变换处理,拆分成多个 Observable,然后按顺序链接起来。

StartWith 操作符可以说是位置相反的 Concat

RxJava 中对应的实现是 concat

这里写图片描述
这里写图片描述

RxJava 中 concat() 是一个静态方法,有多种重载,区别就是拼接的 Observable 个数,concat() 会将参数中的 Observable 按在参数中的位置发射出去。

这里写图片描述
这里写图片描述

使用起来也很简单,没什么好说的了。

代码语言:javascript
复制
/**
 * 按顺序拼接
 */
private void concat() {

    Observable<Integer> observableA = Observable.range(0 , 5);
    Observable<Integer> observableB = Observable.range(10, 5);

    Observable.concat(observableB, observableA)
            .subscribe(this.<Integer>getPrintSubscriber());
}

还有一个 concatWith() 方法, Observable.concat(a,b) 等价于 a.concatWith(b).

StartWith

StartWith 操作符的作用和名字一样,在源 Observable 发射数据之前插入指定的数据。

这里写图片描述
这里写图片描述

如果你想要一个 Observable 在发射数据时先发射一些特定的数据,可以使用 StartWith; 如果你想要一个 Observable 在发射数据后再发射一些特定的数据,可以使用 Concat

RxJava 对应的实现是 startWith():

这里写图片描述
这里写图片描述

有很多重载,表示可以在前面插入的数据类型可以是 Observable, Iterable 或者直接是几个数据。

startWith() 的实现也是调用的 concat():

代码语言:javascript
复制
public final Observable<T> startWith(Observable<T> values) {
    return concat(values, this);
}

使用例子:

代码语言:javascript
复制
/**
 * 先发射前面的
 */
private void startWith() {
    Observable<Integer> observableA = Observable.range(0 , 5);
    Observable<Integer> observableB = Observable.range(10, 5);

    observableB.startWith(observableA)
            .subscribe(this.<Integer>getPrintSubscriber());
}

运行结果:

代码语言:javascript
复制

Switch

Switch 操作符的作用是:将一个发射多个 Observable 的 Observable 变成一个单独的 Observable,它发射那些中间 Observables 最近发射的数据项。

翻译的不太好理解,先上个图:

这里写图片描述
这里写图片描述

Switch 订阅一个发射多个 Observable 的 Observable。

每次源 Observable 发射新的 Observable,Switch 会解除对前一个 Observable 的订阅,转向订阅新的 Observable(注意,这个切换的过程发生在源 Observable 发射新的 Observable 时,而不是新 Observable 发射元素时)。

这意味着,在新 Observable 产生到它开始发射数据之前的这段时间里,前一个 Observable 发射的数据将被丢弃(就像上图里的那个黄色圆圈一样)。

RxJava 中对应的实现是 switchOnNext

这里写图片描述
这里写图片描述

switchOnNext() 是一个静态方法,参数是一个发射 Observable 的 Observable:

代码语言:javascript
复制
public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
    return sequenceOfSequences.lift(OperatorSwitch.<T>instance(false));
}

使用例子:

代码语言:javascript
复制
/**
 * 喜新厌旧,一发射新的 Observable,就取消订阅之前的
 */
private void switchOnNext() {
    Observable<Observable<Integer>> observableObservable = Observable.unsafeCreate(new Observable.OnSubscribe<Observable<Integer>>() {
        @Override
        public void call(final Subscriber<? super Observable<Integer>> subscriber) {
            for (int i = 0; i < 5; i++) {
                subscriber.onNext(Observable.range(1, 10).delay(i, TimeUnit.SECONDS));
            }
            subscriber.onCompleted();
        }
    });

    Observable.switchOnNext(observableObservable)
            .subscribe(this.<Integer>getPrintSubscriber());
}

运行结果:

代码语言:javascript
复制

还有一个类似的实现是 switchOnNextDelayError,和 switchOnNext() 的不同之处也是对 onError 事件的处理。

代码语言:javascript
复制
public static <T> Observable<T> switchOnNextDelayError(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
    return sequenceOfSequences.lift(OperatorSwitch.<T>instance(true));
}

Thanks

http://reactivex.io/documentation/operators.html https://github.com/mcxiaoke/RxDocs/blob/master/Operators.md http://blog.csdn.net/job_hesc/article/details/46612015 http://avenwu.net/2016/05/10/understand-the-join-operation-in-rx/

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-07-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 组合型操作符
    • Zip
      • zip
      • zipWith
    • CombineLatest
      • combineLatest
      • withLatestFrom
    • Join
      • join
      • groupJoin
    • Merge
      • merge
      • mergeDelayError
    • Concat
      • StartWith
        • Switch
        • Thanks
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档