前面介绍过RxJava的基本概念与使用,可以通过RxJava发射事件,而通过Observer
来接收事件。
然而我们大多数时候,会有耗时的操作,比如在子线程完成复杂的矩阵运算,文件的IO操作,网络请求,数据库读写等等,我们希望可以在子线程完成这些事情,而在主线程接收回调事件。
这种情况,我们就需要用到Scheduler
对象了。
所使用的Scheduler主要在Schedulers
这个类中,RxJava仅仅提供了以下这些调度器:
Schedulers.SINGLE
Schedulers.COMPUTATION
Schedulers.IO
Schedulers.TRAMPOLINE
Schedulers.NEW_THREAD
AndroidSchedulers.MAIN_THREAD
通过subscribeOn
以及observerOn
分别指定任务事件与监听事件所在的线程。
还是之前的例子,只是增加了subscribeOn(Schedulers.io())
以及observeOn(AndroidSchedulers.mainThread())
,让事件在IO线程中发射,而在Android主线程接收。
Observable.create<Int> { emitter ->
Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
emitter.onNext(1)
Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
emitter.onNext(2)
Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
emitter.onComplete()
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { data ->
Log.e(TAG, "onNext...$data...${Thread.currentThread().name}")
}
看输出的日志:会发现有些不一样
onNext
发送的线程在子线程RxCachedThreadScheduler-1
onNext
事件在子线程发送后,并不会在主线程立即响应,而是会积攒后,等事件都发送完毕后,统一按顺序回调到主线程 E/SelectImageActivity: Emitter onNext1...RxCachedThreadScheduler-1
E/SelectImageActivity: Emitter onNext2...RxCachedThreadScheduler-1
E/SelectImageActivity: Emitter onComplete...RxCachedThreadScheduler-1
E/SelectImageActivity: onNext...1...main
E/SelectImageActivity: onNext...2...main
而如果将subscribeOn
与ObserverOn
都指定成同一个Scheduler都话,如Schedulers.computation()
,则需要看这个Scheduler的调度策略了。
Schedulers.single()
E/SelectImageActivity: Emitter onNext1...RxSingleScheduler-1
E/SelectImageActivity: Emitter onNext2...RxSingleScheduler-1
E/SelectImageActivity: Emitter onComplete...RxSingleScheduler-1
E/SelectImageActivity: onNext...1...RxSingleScheduler-1
E/SelectImageActivity: onNext...2...RxSingleScheduler-1
AndroidSchedulers.mainThread()
E/SelectImageActivity: Emitter onNext1...main
E/SelectImageActivity: Emitter onNext2...main
E/SelectImageActivity: Emitter onComplete...main
E/SelectImageActivity: onNext...1...main
E/SelectImageActivity: onNext...2...main
Schedulers.computation()
E/SelectImageActivity: Emitter onNext1...RxComputationThreadPool-1
E/SelectImageActivity: Emitter onNext2...RxComputationThreadPool-1
E/SelectImageActivity: Emitter onComplete...RxComputationThreadPool-1
E/SelectImageActivity: onNext...1...RxComputationThreadPool-2
E/SelectImageActivity: onNext...2...RxComputationThreadPool-2
sleep
模拟线程阻塞的操作,代码如下:Observable.create<Int> { emitter ->
Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
emitter.onNext(1)
sleep()
Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
emitter.onNext(2)
sleep()
Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
emitter.onComplete()
}.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe { data ->
Log.e(TAG, "onNext...$data...${Thread.currentThread().name}")
}
而打印结果如下:
E/SelectImageActivity: Emitter onNext1...RxComputationThreadPool-1
E/SelectImageActivity: onNext...1...RxComputationThreadPool-2
E/SelectImageActivity: Emitter onNext2...RxComputationThreadPool-1
E/SelectImageActivity: onNext...2...RxComputationThreadPool-2
E/SelectImageActivity: Emitter onComplete...RxComputationThreadPool-1
可以看到,只要使用了Scheduler后,在加入sleep
的阻塞操作后,执行了线程的调度,就会打印出来事件的发射与接收的顺序。