Rxjava2-小白入门(二)

前言

上篇文章我们主要讲解了观察者模式。那么这节课我们主要讲解Rxjava2的基本使用和操作符。其实网上的关于Rxjava2的优秀文章有很多对我来说都是受益匪浅。可能我写的文章和他们比相差很远,但是我也不能灰心能帮助一个人是一个人就算不能帮助别人我也会把文章当作笔记,闲暇时好好阅读,毕竟人家写的是人家的自己总结的才是自己的。


上篇文章提到,说要简单介绍下Android中的观察者模式,其实在java中也有两个类Observer和Observable两个类其实他们的类的主要内容和我们上节课写的大致都是相同的,有兴趣的同学可以自己去了解下,在这里我就不加介绍了。这篇我们主要是讲解Rxjava2的简单用法已经场景


首先我们先在我们的项目中添加依赖

compile 'io.reactivex.rxjava2:rxjava:2.0.0-RC5'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'

这是我使用的Rxjava2版本最新的请查阅官网

RxJava:   https://github.com/ReactiveX/RxJava 
RxAndroid : https://github.com/ReactiveX/RxAndroid 

我把2者的官网发出来了大家有兴趣的可以大家了解 有的人会问RxAndroid是什么?不是讲Rxjava2吗?其实RxAndroid,这是一个扩展库,更好的兼容了Android特性,比如主线程,UI事件等。我在把Rxjava文档给大家发出来方便大家对Rxjava更多的了解

Rxjava所有操作符文档


Rxjava2的操作符

  • create
  • just
  • fromArray
  • map
  • flatMap
  • zip
  • filter
  • time
  • merge
  • retry
  • retryWhen
  • range
  • Interval
  • ...

Rxjava2的使用场景

  • 登陆后获取用户信息
  • 关键词搜索
  • 防止按钮重复点击
  • 购物车合并本地和网络数据
  • 发送验证码倒计时

了解本文的大致内容我们先一步一步来。首先我们先了解如何创建。

创建订阅关系

  1. Observable:被观察者(主题Subject)
  2. Observer/Subscriber :观察者
  3. Subscribe:订阅

Observable 和 Observer 通过 subscribe() 方法实现订阅关系

在了解关系后我们来学习几种创建方式,首先我们先学习一种最简单的创建方式:

package com.example.ggxiaozhi.rxjava;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class MainActivity extends AppCompatActivity {
    private static final String TAG = "MainActivity";

    private Button mButton;

    private TextView mTextView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        mButton = (Button) findViewById(R.id.btn);
        mButton.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Observable<String> observable = getObservable();
                Observer<String> observer = getObserver();
                observable.subscribe(observer); 
            }
        });
    }

    public Observable<String> getObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("吃饭");
                e.onNext("睡觉");
                e.onNext("打豆豆");
                e.onComplete();
//                e.onError(new Throwable("错误"));
            }
        });
    }

    public Observer<String> getObserver() {
        return new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };
    }
}

输出结果:

10-08 16:23:05.368 4767-4767/com.example.ggxiaozhi.rxjava D/MainActivity: onSubscribe: 
10-08 16:23:05.368 4767-4767/com.example.ggxiaozhi.rxjava D/MainActivity: onNext: 吃饭
10-08 16:23:05.368 4767-4767/com.example.ggxiaozhi.rxjava D/MainActivity: onNext: 睡觉
10-08 16:23:05.368 4767-4767/com.example.ggxiaozhi.rxjava D/MainActivity: onNext: 打豆豆
10-08 16:23:05.368 4767-4767/com.example.ggxiaozhi.rxjava D/MainActivity: onComplete: 

使用 e.onError(new Throwable("错误"));

10-08 16:25:17.948 6894-6894/com.example.ggxiaozhi.rxjava D/MainActivity: onSubscribe: 
10-08 16:25:17.998 6894-6894/com.example.ggxiaozhi.rxjava D/MainActivity: onNext: 吃饭
10-08 16:25:17.998 6894-6894/com.example.ggxiaozhi.rxjava D/MainActivity: onNext: 睡觉
10-08 16:25:17.998 6894-6894/com.example.ggxiaozhi.rxjava D/MainActivity: onNext: 打豆豆
10-08 16:25:17.998 6894-6894/com.example.ggxiaozhi.rxjava D/MainActivity: onError: 错误

我们来分析下这段代码 在Observable中我们通过Observable.create()创建,那么ObservableEmitter是什么呢?它是指发射器的意思它里面有onNext(),onComplete(),onError()(注意onComplete和onError两者是互斥的,不能同时发送),三个方法分别代表发送数据,发送结束,发送错误。其中的<String>指的是我们发送数据的类型。最后我们通过subscribe将2者关系进行订阅(注意只有订阅的时候才会发送数据)。从打印的中我们可以发现对应的Observer也有相对应的3个方法。所以你发送的是 e.onNext()就会进入public void onNext(String value)其他的也是一样的。

细心的小伙伴可能会发现,每次接受都会先走到onSubscribe方法中,那么这个方法是干什么的呢? 其实他有Disposable d这个参数,他里面只有两个重要的方法d.dispose();d.isDisposed();一个是用来阻断接受,另一个是用于判断的我们简单来试用下:

    Disposable dd;
    public Observer<String> getObserver() {
        
        return new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                dd=d;
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(String value) {
                if ("睡觉".equals(value)){
                    dd.dispose();
                }
                Log.d(TAG, "onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };
    }
10-08 16:35:16.158 14943-14943/com.example.ggxiaozhi.rxjava D/MainActivity: onSubscribe: 
10-08 16:35:16.158 14943-14943/com.example.ggxiaozhi.rxjava D/MainActivity: onNext: 吃饭
10-08 16:35:16.158 14943-14943/com.example.ggxiaozhi.rxjava D/MainActivity: onNext: 睡觉

当符合条件后调用dd.dispose();后面的数据就不在接受了。

那么有没有其他的创建方法呢?答案是肯定的。下面我们来使用其他操作符进行创建:

 public Observable<String> getObservable() {
        String[] strings={"吃饭","睡觉","打豆豆"};
        return Observable.fromArray(strings);
    }
return Observable.just("吃饭","睡觉","打豆豆");

这几种方法都可以简单的创建Observable对象。那么我们在来学习下Observer的创建方法:

observable.subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {

                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {

                    }
                }, new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        
                    }
                });

图片.png

这次我们创建Observer采用的是链式创建,通过参数我们可以看到其实这种创建方法和我们之前创建的方式意思是一样的,只不过它是分来了的,因为通常的时候我们一般都是在onNext中去接受数据那么我就开一单独创建一个Consumer()这样使用起来更加方便。


Scheduler线程控制

我们简单的学习了创建订阅关系(和链式创建),那么我们再来学习另一个Rxjava2的重要内容,就是线程控制。

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

AndroidSchedulers.mainThread(): 它指定的操作将在 Android 主线程运行。

我认为Rxjava2的强大之处就在与它有很多的操作符可以根据业务逻辑的需求通过Rxjava2链式不断的变化来满足我们的需求,另一个就是它可以制定任意Observer和Observa的业务逻辑在那个线程中执行(当然它的强大之处太多了比如生命周期的管理,背压缓冲区等等)那么我们通过实际的例子来了解下吧!

     Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        //在这里我们进行网络请求 请求结果返回一个字符串
                        e.onNext("网络请求结果");
                    }
                }).subscribeOn(Schedulers.io()).
                        observeOn(AndroidSchedulers.mainThread()).
                        subscribe(new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                Log.d(TAG, "accept: " + s);
                            }
                        });
            }
        });

这就是我们常用的在Observable发送数据前我们先请求网络(一般我们请求网络都会返回一个Json字符串或是实体类)然后将等到的消息发给Consumer()(Observer),我们都知道Android中是不允许在主线程请求网络操作的,并且通常我们请求的到结果是用来给UI控件赋值的,那么Rxjava中的线程控制就很好的帮住了我们解决这个问题,我们通过subscribeOn()这是制定Observable在那个线程执行,通过observeOn指定Consumer()运行在主线程从而更新UI(一定要记得切回主线程,因为你开启了子线程请求网络,如果不切回主线程的话默认还是在请求网络的子线程的那么是无法更新UI的)。相信大家也能明白,如果不制定线程所有操作都是在主线程中运行的。

observeOn() 指定 Subscriber 线程 subscribeOn 制定 Observable 线程

Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程


案例

在了解了Rxjava2的原理,创建和使用下面我就该学习他的操作符了,在文章的开头我已经把Rxjava操作符文档发出来了,大家会发现他的操作符实在是太!多!了!。如果真的想去精通了解每个操作符想必也是有些太费时间,本文是小白入门,那么我们通过结合实例来学习操作符会更容易理解和记忆

  • 登陆后获取用户信息(flatMap) 首先我们先了解下map操作符的定义:

Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable

 Observable<Integer> just = Observable.just(1);

                Observable<String> map = just.map(new Function<Integer, String>() {
                    /**
                     * map返回的也是一个Observable<String>
                     * @param integer 传入的类型
                     * @return 返回结果为字符串
                     */
                    @Override
                    public String apply(Integer integer) throws Exception {
                        
                        return integer + "value";
                    }
                });
                map.subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
            }
        });

结果:

10-08 18:45:21.588 31174-31174/com.example.ggxiaozhi.rxjava D/MainActivity: accept: 1value

可以看到我们发射的原始数据是Integer通过map操作符后我们将Integer转成了字符串 我们在接受到的结果也是(String s)字符串类型的。这就map操作符的作用。当然实际应用中它可不是简单的这么使用的。下面我们看一个实际的登录例子:

我们传入我们的用户ID,通过这个ID等落成功后返回一个结果,我们再通过这个结果查询用户的其他信息

在这个例子中我们用到flatmap:

        int userId = 111111;
        Observable.just(userId).flatMap(new Function<Integer, ObservableSource<Result>>() {
            @Override
            public ObservableSource<Result> apply(Integer integer) throws Exception {
                /**
                 *模拟网络请求
                 *通过userId登录 
                 * 登录成功返回结果result
                 */
                Result result = null;//这里是通过传入的userId作为请求参数请求网络返回用户信息
                return Observable.just(result);
            }
        }).flatMap(new Function<Result, ObservableSource<User>>() {
            @Override
            public ObservableSource<User> apply(Result result) throws Exception {
                /**
                 *模拟网络请求
                 * 根据返回的登录结果result
                 * 请求包含用户的帐号 年龄等个人信息User
                 */
                final User user = null;//这里是通过传入的result作为请求参数请求网络返回用户信息其他信息
                return Observable.create(new ObservableOnSubscribe<User>() {
                    @Override
                    public void subscribe(ObservableEmitter<User> e) throws Exception {
                        e.onNext(user);
                    }
                });
            }
        }).subscribeOn(Schedulers.io()).
                observeOn(AndroidSchedulers.mainThread()).
                subscribe(new Consumer<User>() {
                    @Override
                    public void accept(User user) throws Exception {
                        /**
                         * 通过返回的User等到name更新UI
                         */
                        Log.d(TAG, "accept: " + user.getName());
                    }
                });
    }

通过flatmap我们可以很方便的实现我们登录的业务逻辑需求。通过链式书写将所有操作一起完成,如果还有复杂的请求我们可以继续往下写。

为什么在等落的时候我们用的是flatmap而不是map呢? 比较会发现map返回的是基本数据类型或者是Object,而flatmap返回是的ObservableSource<?>,那么我就可以调用操作符再做处理,而map是数据类型不能再做其他处理了。多比较使用就会更好的理解。

总结:这篇文章已经不短了。我在阅读文章的时候就不喜欢长的文章。所以剩下的例子和操作符我会在写一篇。这篇就到这里了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏三流程序员的挣扎

RxAndroid 和 RxKotlin

RxAndroid 是在 RxJava 上的扩展,用法和 RxJava 一样。结构比较简单,主要用在 Android 上的线程切换。

5802
来自专栏zcqshine's blog

SpringMVC下获取验证码

4918
来自专栏Fundebug

JWT究竟是什么呢?

为了保证可读性,本文采用意译而非直译。另外,本文版权归原作者所有,翻译仅用于学习。

1177
来自专栏听雨堂

C#实现微信AES-128-CBC加密数据的解密

小程序登录时,获得用户的信息,只是昵称,无法用作ID。而有用的数据,都加密着,腾讯给出了解密的方法: 加密数据解密算法 接口如果涉及敏感数据(如wx.getUs...

4149
来自专栏xingoo, 一个梦想做发明家的程序员

【手把手教你全文检索】Apache Lucene初探

PS: 苦学一周全文检索,由原来的搜索小白,到初次涉猎,感觉每门技术都博大精深,其中精髓亦是不可一日而语。那小博猪就简单介绍一下这一周的学习历程,仅供各位程...

25810
来自专栏jeremy的技术点滴

《Network Programming with Go》阅读重点备忘(一)

3697
来自专栏Linyb极客之路

spring系列之自定义扩展PropertyPlaceHolderConfigurer

一、PropertyPlaceHolderConfigurer介绍 主要用于将一些配置信息移出xml文件,移到至properties文件 二、拓展使用 1、将...

7975
来自专栏我就是马云飞

Rxjava2最全面的解析

前言 由于公司重新规划的部门,我调到了另外一个部门,所以负责的项目也换了,仔细看了下整体的项目,rxjava+retrofit。整体的一套。众所周知,rxja...

34410
来自专栏Java与Android技术栈

RxJava 线程模型分析

RxJava的被观察者在使用操作符时可以利用线程调度器--Scheduler来切换线程,例如

2562
来自专栏androidBlog

Rxjava 2.x 源码系列 - 线程切换 (下)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/de...

1571

扫码关注云+社区

领取腾讯云代金券