RxJava2--基本使用

RxJava2介绍

RxJava是通过事件传递,并且在传递过程中对事件内部数据进行修改,最终发送给接收者的响应式框架。

借助某个同学的一张图可以更直观的了解:

RxJava事件流向

上图只是在同个线程中,可以让事件携带数据按顺序从上层流转到下层。而在事件流转的过程中,RxJava提供了很多操作符可以对源头事件进行处理再往下传递。

RxJava2的优势

  1. 书写简便,没有层层回调
  2. 流式调用,可以简单的看出来整个过程
  3. 操作符非常强大,可以在事件流转中间进行各种处理
  4. 可以保证多线程之间事件的顺序(不过数据同步需要自己保证)

RxJava2的配置

在Module中引入即可:

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

RxJava2中基本类介绍

Observable:一个可被subscribe的对象,也可以理解成被监听的对象,而该对象中保存着一个名为ObservableEmitter的对象,ObservableEmitter对象就是上面提到的发送事件的对象。

Observer:接收Observable发送事件的对象。

Consumer: 只接收onNext事件的对象

本质RxJava就是一套非常强大的Observer框架

Observable与Observer基本使用

Observable对象中调用onNext发射(Emitter)了12,以及ErrorComplete这四个事件。 而在Observer对应的几个响应函数中打印日志(为了方便,把Log.e替换成了System.out.println)。

 Observable.create<Int> { emitter ->
            Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
            emitter.onNext(1)
            Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
            emitter.onNext(2)
            Log.e(TAG, "Emitter onError...${Thread.currentThread().name}")
            emitter.onError(NullPointerException())
            Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
            emitter.onComplete()
        }.subscribe(object : Observer<Int> {
            override fun onComplete() = System.out.println("onComplete...${Thread.currentThread().name}")
            override fun onSubscribe(d: Disposable?) = System.out.println("onSubscribe...${Thread.currentThread().name}...Disposable:$d")
            override fun onNext(value: Int?) = System.out.println("onNext...${Thread.currentThread().name}...Value:$value")
            override fun onError(e: Throwable?) = System.out.println("onError...${Thread.currentThread().name}...Throwable:$e")
        })

而在同一个线程中,输出结果如下:

 E/SelectImageActivity: onSubscribe...main...Disposable:null
 E/SelectImageActivity: Emitter onNext1...main
 E/SelectImageActivity: onNext...main...Value:1
 E/SelectImageActivity: Emitter onNext2...main
 E/SelectImageActivity: onNext...main...Value:2
 E/SelectImageActivity: Emitter onError...main
 E/SelectImageActivity: onError...main...Throwable:java.lang.NullPointerException
 E/SelectImageActivity: Emitter onComplete...main

可以发现:

1. 在同一个线程中,发送一个事件,就会接收一个事件,再发送下一个事件 2. 在发送完onError事件后,即使发送了onComplete事件,也无法接收 3. 在发送完onComplete事件后,再发送了onError事件,则会将该Throwable对象抛出,出现crash 4. 在发送完onCompleteonError事件后,再发送onNext事件,则无法接收

事件的消费者Consumer

在大多数情况下,我们只用关心onNext或者onError单独的事件,而对于其他的事件均不关心,这种情况下,我们就可以使用Consumer对象

对于subscribe函数的重载函数有这些:

public final void subscribe(Observer<? super T> observer) 
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)

举例,我们只关心onNext事件,则可以这样来表示:

Observable.create<Int> { emitter ->
            Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
            emitter.onNext(1)
            Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
            emitter.onNext(2)
            Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
            emitter.onComplete()
            Log.e(TAG, "Emitter onNext3...${Thread.currentThread().name}")
            emitter.onNext(3)
        }.subscribe { data ->
            Log.e(TAG, "onNext...$data")
        }

在接收端,仅仅只接收了onNext事件。

 E/SelectImageActivity: Emitter onNext1...main
 E/SelectImageActivity: onNext...1
 E/SelectImageActivity: Emitter onNext2...main
 E/SelectImageActivity: onNext...2
 E/SelectImageActivity: Emitter onComplete...main
 E/SelectImageActivity: Emitter onNext3...main

参考资料

给初学者的RxJava2.0教程

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券