首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rxjava2最全面的解析

Rxjava2最全面的解析

作者头像
我就是马云飞
发布2018-02-05 11:43:27
2.3K0
发布2018-02-05 11:43:27
举报
文章被收录于专栏:我就是马云飞我就是马云飞

前言

由于公司重新规划的部门,我调到了另外一个部门,所以负责的项目也换了,仔细看了下整体的项目,rxjava+retrofit。整体的一套。众所周知,rxjava+retrofit是目前网上最流行的网络解析框架。而目前网络上的文章大多还是关于rxjava1的。关于RxJava2的少之又少,于是,便有了此文。

此文的目的有三个: 1. 给对 RxJava2感兴趣的人一些入门的指引 2. 给正在使用 RxJava2但仍然心存疑惑的人一些更深入的解析 3.给想从RxJava1替换成RxJava2的人给出直接的对比。

RxJava概念介绍

RxJava=reactive+extension。 那么接下来我会分别对这两点进行整体的介绍。reactive又称reactive programming。也就是响应式编程。在往简单的说,rxjava可以很方便的处理线程切换的问题。说到这个,我们就会想到异步操作。handler?AsyncTask?但你要知道,随着请求的数量越来越多,代码逻辑将会变得越来越复杂。而rxjava却仍可以保持清晰的逻辑。它的原理就是创建一个Observable对象来搞事情。然后使用各种操作符通过建造者模式建立成一系列的链式操作。就如流水线一样,把事情搞完。然后发射给Observer进行处理。

观察者模式

rxjava的实现主要是通过观察者模式实现的。那么什么是观察者模式,我这边做一个简单的介绍。

栗子:观察者对被观察者进行一个简单,当被观察者被改变时,要立即做出反应。比如,你认为隔壁老王和你媳妇有一腿,但却没证据,此时,只要当隔壁老王进了你媳妇房门的时候,你就要去捕获他。在这个例子中,你是观察者,老王是被观察者。(记得当初我经常搞反了)。那么,观察者模式是否是一对一呢?很明显不是的,就上面的例子,你可以叫三千城管监听着老王。只要他有不轨之心。就打断他的第三条腿。也就是说多个观察者对应一个被观察者。字看累了来看图:

其实在android中也有很多自带的观察者模式。最明显的莫过于点击事件。说个最简单的例子,点击按钮后弹一个吐司。那么,我们在点击按钮的时候,告知系统,此时,我需要弹一个吐司。那么就这么弹出来了。那么,这个时候问题来了。我是否需要实时去监听这个按钮呢?答案是不需要的。这就和前面的举例有的差距了。换句话说。我只要在此按钮进行点击时进行监听就可以了。这种操作被称为订阅。也就是说Button通过setOnClickListener对OnclickListener进行了订阅了操作,来监听onclick方法。

extension
  • 不仅支持事件序列,还支持数据流。事件-->动态的,无法预知,例如:事件点击,服务器的推送等等 数据流-->静态的,可预知的,例如:读取本地文件,播放音视频等等。
  • 通过操作符对中间事件的处理。
  • 线程操作的便捷。关于这些具体的实现。我会在后面一一举例。

RxJava1与RxJava2的区别

说到区别,可能有的小伙伴会问,我没看过rxjava1。可以直接看rxjava2么。个人觉得不必要,因为 rxjava2.x 是按照 Reactive-Streams specification 规范完全的重写的,完全独立于 rxjava1.x 而存在,它改变了以往 rxjava1的用法。换句话说,我学java需不需要先学C语言一样。

那么两者的区别体现在哪呢?主要是如下几个方面:

  • 空指针问题这应该是一个很大的变化,用过rxjava1的人都知道,我们可以在发射事件的时候传入NULL。但这在rxjava2中是不存在的。不信你试试?分分钟给你来一个NullPointerExpection。
  • Function相关的在rxjava1中,我们有各种Func1,Func2......,但在rxjava2中只有Function了。依旧记得看凯哥的文章的时候把我整蒙了。愣是没发现,后来才注意到被替换了。并且,他们都增加了throw exception。
  • 背压—backpressure 关于backpressure,这个就厉害了。厉害到我都不懂了。好了,开个玩笑,我们继续说。我们知道在Rxjava1中Observable对backpressure是支持的。但在Rxjava2中Observable取消了对backpressure的支持。并且引进了一个叫做Flowable的来支持backpressure。

那么什么是背压: 听不懂的含义:上游的生产速度大于下游的处理速度,导致下游处理不急,这种操作被称为backpressure。

这种情况看似很常见,但实际上,这种情况并不常见,或者可以说成非常罕见。那么遇到了怎么办?如果它出现了,直接丢弃。what the fuck?你tm在逗我?但事实就是这样,如果我们在开发过程中,遇到了backpressure,我们就应该丢弃它。

听得懂的含义:对于可丢弃的事件,上游生产速度过快导致事件堆积,当堆积到超出buffer上限的时候,就叫做backpressure。

处理方案是什么: 1、丢弃新事件;2、不丢弃,继续堆积。(忽略了backpressure,相当于Observable)。

适合backpressure的情况: 在线直播流:比如说,正在直播的时候,突然网络出现了卡顿,页面卡住了。那么当网络好了之后肯定不会是在接着之前的页面继续的,就相当于,你网络卡了多久,他就丢弃了多长时间的数据。

backpressure的关键点是什么:不可控,可丢弃。

基本使用

讲了一大堆理念知识,接下来就是开工干活了。那么关于Rxjava2的基本实现主要是三点:创建Observable,创建Observer,进行绑定。那么我们一个个的看。

创建Observable

Observable是什么?观察者还是被观察者?我又忘了。哈哈。开个玩笑,当然是后者了。为什么是先创建Observable而不是Observer?当然了,先后顺序的无所谓的。但是考虑到后面的链式调用。所以我这边就先写了先创建Observable了。

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onNext("Rxjava2");
                emitter.onNext("My name is Silence");
                emitter.onNext("What's your name");
                //一旦调用onComplete,下面将不在接受事件
                emitter.onComplete();
            }
        });

现在我来解释一下上面的ObservableEmitter到底是什么。字面意思是可观察的发射器。没错,这个就是被观察者用来发送事件的。它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onError(Throwable error)和onComplete()就可以分别发出next事件、error事件和complete事件。至于这三个事件到底什么意思。不急,我们后面说。

创建Observer

现在我们来创建一个观察者,它决定了在观察中到底应该有着什么样的行为操作。

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe: " + d);
                result += "onSubscribe: " + d + "\n";
            }

            @Override
            public void onNext(String string) {
                Log.i(TAG, "onNext: " + string);
                result += "onNext: " + string + "\n";
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: " + e);
                result += "onError: " + e + "\n";
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: ");
                result += "onComplete: " + "\n";
            }
        };

其中onSubscribe、onNext、onError和onComplete是必要的实现方法,其含义如下:

  • onSubscribe:它会在事件还未发送之前被调用,可以用来做一些准备操作。而里面的Disposable则是用来切断上下游的关系的。
  • onNext:普通的事件。将要处理的事件添加到队列中。
  • onError:事件队列异常,在事件处理过程中出现异常情况时,此方法会被调用。同时队列将会终止,也就是不允许在有事件发出。
  • onComplete:事件队列完成。rxjava不仅把每个事件单独处理。而且会把他们当成一个队列。当不再有onNext事件发出时,需要触发onComplete方法作为完成标识。
进行Subscribe

订阅其实只需要一行代码就够了:

observerable.subscribe(Observer);

运行一个看看效果先:

和之前介绍的一样,先调用onSubscribe,然后走了onNext,最后以onComplete收尾。

神奇的操作符

对于rxjava来说,有一句话,我觉得说的很对,叫做:如果你每天研究一个操作符,最少一个半月,如果你想理解原理。最少半年。换句话说,有关rxjava的知识完全可以写一本书。那么本文肯定不会讲那么细。在这边我会给你们介绍一些常用的操作符。保证日常开发的流程足矣。

创建操作符

一般创建操作符是指,刚开始创建观察者的时候调用的。在基本使用中我已经介绍了create操作符,那么这边我们就要说到just,fromarray和interval了。

just

此操作符是将传入的参数依次发出来。

Observable observable = Observable.just("Hello", "Rxjava2", "My name is Silence","What's your name");
// 将会依次调用:
// onNext("Hello");
// onNext("Rxjava2");
// onNext("My name is Silence");
// onNext("What's your name");
// onCompleted();
fromarray

将传入的数组通过坐标一次发送出去。

String[] words = {"Hello", "Rxjava2", "My name is Silence","What's your name"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Rxjava2");
// onNext("My name is Silence");
// onNext("What's your name");
// onCompleted();
interval

这个其实就是定时器,用了它你可以抛弃CountDownTimer了。现在我们看看怎么用:

Observable.interval(2, TimeUnit.SECONDS).subscribe(
                new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.i(TAG, "accept: "+aLong.intValue());
                    }
                }
        );

我们看看结果:

上面就是我们每隔2s打印一次long的值。

变换操作符

变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后讲变换后的数据发射出去。变换操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。这里我们会讲解最常用的map,flatMap、concatMap以及compose。

map

map操作符通过指定一个Function对象,将Observable转换为一个新的Observable对象并发射,观察者将收到新的Observable处理。直接上代码:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is result " + integer + "\n";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String str) throws Exception {
                Log.i("--->", "accept: "+str);
                string += str;
            }
        });
        tv_first.setText(string);

输入结果如下:

仔细看,map()方法中,我们把一个integer对象转换成了一个String对象。然后当map()调用结束时,事件的参数类型也从integer转换成了String。这就是最常见的变换操作。

flatMap

flatmap的操作符是将Observable发射的数据集合变成一个Observable集合。也就是说它可以讲一个观察对象变换成多个观察对象,但是并不能保证事件的顺序。想保证事件的顺序?那你过会看下面降到的concatMap。

那么什么叫作数据集合变成一个Observable集合呢?还是用上面的例子,我有一组integer集合。我想转换成string集合怎么办?那就继续看代码:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer + "\n");
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("--->", "accept: "+s);
                string += s;
            }
        });
        tv_first.setText(string);

我们来看结果:

打住打住,是不是有问题?WTF?有啥问题?还记不记得我上面说过flatMap不能保证事件执行顺序。那么这边事件为什么都是按顺序执行的?不急,我们在发射事件的时候给他加一个延迟在看看结果:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer + "\n");
                }
                return Observable.fromIterable(list).delay(100,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("--->", "accept: "+s);
                string += s;
            }
        });
        tv_first.setText(string);

我们在当他发射事件的时候给他加一个100ms的延迟看看结果:

看到没有,我说啥的?不能保证执行顺序。所以万事容我慢慢道来。先喝杯茶压压惊。我们在接着往下讲。

concatMap

上面我也介绍了concatMap。除了保证了执行顺序,其他都和concatMap一毛一样。你说保证就保证啊。您先喝杯茶,接着往下看:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer + "\n");
                }
                return Observable.fromIterable(list).delay(1000,TimeUnit.MILLISECONDS);
//                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("--->", "accept: "+s);
                string += s;
            }
        });
        tv_first.setText(string);

为了我们能看的更明显一点,我这边直接设置了一秒钟的延迟。下面我们来看效果图:

可以从执行顺序和打印时间看出,的的确确是延迟了一秒钟。

compose

这个操作符就很厉害了。他的变换是怎么做的呢?我们知道rxjava是通过建造者的模式通过链式来调用起来的。那么多个链式就需要多个Observable。而这个操作符就是把多个Observable转化成一个Observable。听起来是不是很厉害~。具体如何操作,我们接着看:

public  <T> ObservableTransformer<T, T> applyObservableAsync() {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

上面代码可以看出,我把子线程和主线程进行了一个封装,然后返回了一个ObservableTransformer对象。那么我们只要这边做就可以了:

Observable.just(1, 2, 3, 4, 5, 6)
                .compose(this.<Integer>applyObservableAsync())
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer strings) throws Exception {
                Log.i("-->", "accept: " + strings);
                string += strings;
            }
        });
        tv_first.setText(string);
过滤操作符

过滤操作符用于过滤和选择Observable发射的数据序列。让Observable只返回满足我们条件的数据。过滤操作符有buffer,filter,skip,take,skipLast,takeLast等等,这边我会介绍到filter,buffer,skip,take,distinct。

filter

filter操作符是对源Observable产生的结果进行有规则的过滤。只有满足规则的结果才会提交到观察者手中。例如:

Observable.just(1,2,3).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer < 3;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer s) throws Exception {
                Log.i("--->", "accept: " + s);
                string += s;
            }
        });
        tv_first.setText(string);
    }

代码很简单,我们发送1,2,3;但是我们加上一个filter操作符,让它只返回小于3的的内容。那么我们来看一下结果:

distinct

这个操作符其实就更简单了。比如说,我要在一组数据中去掉重复的内容,就要用到它。也就是去重。它只允许还没有发射的数据项通过。发射过的数据项直接pass。

Observable.just(1,2,3,4,2,3,5,6,1,3)
                .distinct().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer s) throws Exception {
                Log.i("--->", "accept: " + s);
                string += s;
            }
        });
        tv_first.setText(string);

那么输出结果就很简单了:

buffer

这个其实也不难,主要是缓存,把源Observable转换成一个新的Observable。这个新的Observable每次发射的是一组List,而不是单独的一个个的发送数据源。

Observable.just(1,2,3,4,5,6)
                .buffer(2).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> strings) throws Exception {
                for (Integer integer : strings) {
                    Log.i("-->", "accept: "+integer);
                    string+=strings;
                }
                Log.i("-->", "accept: ----------------------->");
            }
        });
        tv_first.setText(string);

我们让他每次缓存2个,下面我们来看结果:

skip 、take

skip操作符将源Observable发射过的数据过滤掉前n项,而take操作则只取前n项;另外还有skipLast和takeLast则是从后往前进行过滤。先来看看skip操作符。

Observable.just(1, 2, 3, 4, 5, 6)
                .skip(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer strings) throws Exception {
                Log.i("-->", "accept: " + strings);
                string += strings;
            }
        });
        tv_first.setText(string);

结果如下:

接下来我们把skip换成take看看。

Observable.just(1, 2, 3, 4, 5, 6)
                .take(3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer strings) throws Exception {
                Log.i("-->", "accept: " + strings);
                string += strings;
            }
        });
        tv_first.setText(string);

结果如下:

组合操作符
merge

merge是将多个操作符合并到一个Observable中进行发射,merge可能让合并到Observable的数据发生错乱。(并行无序)

Observable<Integer> observable1=Observable.just(1,2,3);
        Observable<Integer> observable2=Observable.just(1,2,3);
        Observable.merge(observable1,observable2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "accept: "+integer);
            }
        });

结果如下:

concat

将多个Observable发射的数据进行合并并且发射,和merge不同的是,merge是无序的,而concat是有序的。(串行有序)没有发射完前一个它一定不会发送后一个。

Observable<Integer> observable1=Observable.just(1,2,3);
        Observable<Integer> observable2=Observable.just(4,5,6);
        Observable.concat(observable1,observable2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "accept: "+integer);
            }
        });

结果如下:

zip

此操作符和合并多个Observable发送的数据项,根据他们的类型就行重新变换,并发射一个新的值。

Observable<Integer> observable1=Observable.just(1,2,3);
        Observable<String> observable2=Observable.just("a","b","c");
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {

            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer+s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, "apply: "+s);
            }
        });

结果如下:

concatEager

前面说道串行有序,而concatEager则是并行且有序。我们来看看如果修改:

Observable<Integer> observable1=Observable.just(1,2,3);
        Observable<String> observable2=Observable.just("a","b","c");
        Observable.concatEager(Observable.fromArray(observable1,observable2)).subscribe(new Consumer<Serializable>() {
            @Override
            public void accept(Serializable serializable) throws Exception {
                Log.i(TAG, "accept: "+serializable);
            }
        });

结果如下:

线程控制

其实线程控制也是一种操作符。但它不属于创建、变换、过滤。所以我这边把它单独拉出来讲讲。

subscribeOn是指上游发送事件的线程。说白了也就是子线程。多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略。

observerOn是指下游接受事件的线程。也就是主线程。多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次。

举个栗子:

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

在RxJava中, 已经内置了很多线程选项供我们选择, 例如有

  • Schedulers.io() :I/O操作(读写文件、数据库,及网络交互等)所使用的Scheduler。行为模式和newThread()差不多。区别在于io()的内部实现是用一个无数量上限的线程池。可以重用空闲的线程。因此多数情况下io()比newThread()更有效率。
  • Schedulers.immediate(): 直接在当前线程运行。
  • Schedulers.computation() :计算所使用的Scheduler,例如图形的计算。这个Scheduler使用固定线程池,大小为CPU核数。不要把I/O操作放在computation中。否则I/O操作的等待会浪费CPU。
  • Schedulers.newThread():代表一个常规的新线程
  • Schedulers.trampoline(): 当我们想在线程执行一个任务时(不是立即执行),可以用此方法将它加入队列。这个调度器将会处理它的队列并且按序执行队列中的每一个任务。
  • AndroidSchedulers.mainThread() :代表Android的主线程

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高。

与Retrofit结合

就目前开发角度而言,retrofit可以说是最火的网络框架。其原因我认为有两点,第一:可以和okhttp结合。第二:可以和rxjava结合。也就是说Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。

如果需要使用retrofit,我们需要在gradle的配置加上这句:

compile 'com.squareup.retrofit2:retrofit:2.0.1'

话不多说,直接上例子:

private static OkHttpClient mOkHttpClient;
    private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
    private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJavaCallAdapterFactory.create();
     public static BaseHttpApi getObserve() {

        if (baseHttpApi == null) {
            Retrofit retrofit = new Retrofit.Builder()
                    .addConverterFactory(gsonConverterFactory)
                    .addCallAdapterFactory(rxJavaCallAdapterFactory)
                    .client(mOkHttpClient)
                    .baseUrl(BaseUrl.WEB_BASE)
                    .build();
            baseHttpApi = retrofit.create(BaseHttpApi.class);
       }
        return baseHttpApi;

    }

如上代码,可以很清晰的看出,它通过2个工厂模式创建了gson和rxjava。并且通过了链式调用将他们进行了绑定。那么怎么通过链式调用实现网络请求呢?不急,我们喝杯茶,接着往下看。

比如,一个post请求,我们可以这么写:

public interface BaseHttpApi{  
    @FormUrlEncoded
    @POST("seller/cash_flow_log_detail.json")
    Observable<ServiceReward> serviceReward(@Field("requestmodel") String model);
}

敲黑板了。注意,我这边是interface而不是一个class。接下来就是日常调用了,代码如下:

Network.getObserve()
                .serviceReward(new Gson().toJson(map))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<ServiceReward>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(ServiceReward serviceReward) {
                        parseOrderDetail(serviceReward);
                    }
                });

看第二行,这就是为什么刚开始为什么要用工厂模式创建gson的原因。现在我们只要在parseOrderDetail方法中处理正常的逻辑就可以了。是不是看起来代码有点多?那么我们可以这样:

Network.getObserve()
                .serviceReward(new Gson().toJson(map))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(serviceReward ->{
                        parseOrderDetail(serviceReward);
                 });

一个lamada表达式,是不是感觉瞬间代码少了很多,不过有人要说,我加载的时候是一个弹窗显示的,如果加载失败了我这个弹窗岂不是影藏不了?不存在的,如果真有这种情况怎么做?我们接着看:

Network.getObserve()
                .serviceReward(new Gson().toJson(map))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(serviceReward ->{
                        parseOrderDetail(serviceReward);
                 },throwable ->{do something when net error...});

这么处理岂不是快哉。对于lamada,刚开始可能都是各种不习惯,不过用习惯了就会发现代码各种简洁(我最近也在适应中)。

最后

关于rxjava其实对我们来说很难上手。或者不能这么说,应该是rxjava的东西太深了,我们很难掌握透彻。所以我前面也说了如果你每天研究一个操作符,最少一个半月,如果你想理解原理。最少半年。换句话说,有关rxjava的知识完全可以写一本书。但日常开发中,此文中的内容基本可以解决大部分的日常需求。当然,如果你有心的话,你可以去尝试着了解rxjava底层的实现原理。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我就是马云飞 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • RxJava概念介绍
    • 观察者模式
      • extension
      • RxJava1与RxJava2的区别
      • 基本使用
        • 创建Observable
          • 创建Observer
            • 进行Subscribe
            • 神奇的操作符
              • 创建操作符
                • just
                • fromarray
                • interval
              • 变换操作符
                • map
                • flatMap
                • concatMap
                • compose
              • 过滤操作符
                • filter
                • distinct
                • buffer
                • skip 、take
              • 组合操作符
                • merge
                • concat
                • zip
                • concatEager
            • 线程控制
            • 与Retrofit结合
            • 最后
            相关产品与服务
            云直播
            云直播(Cloud Streaming Services,CSS)为您提供极速、稳定、专业的云端直播处理服务,根据业务的不同直播场景需求,云直播提供了标准直播、快直播、云导播台三种服务,分别针对大规模实时观看、超低延时直播、便捷云端导播的场景,配合腾讯云视立方·直播 SDK,为您提供一站式的音视频直播解决方案。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档