内部触发对 Observer 的 onNext 方法的调用,just 中传递的参数将直接在 onNext 方法中接收到,参数的类型要和 Observer 的泛型保持一致。
共有 10 个重载方法,其中 2 个以上参数方法的内部直接调用了 fromArray。
private val observerStr = object : Observer<String> {
override fun onNext(t: String) {
textView.text = "${textView.text}\nonNext $t"
}
override fun onError(e: Throwable) {
textView.text = "${textView.text}\nonError "
}
override fun onComplete() {
textView.text = "${textView.text}\nonComplete "
disposableStr?.dispose() // 解除订阅
}
override fun onSubscribe(d: Disposable) {
disposableStr = d
}
}
// 调用 observer 的 onNext("just")
Observable.just("just").subscribe(observerStr)
1.x 的 from 方法没有了。遍历集合,每个元素调用一次观察者的 onNext,最后调用 onComplete。
var list = listOf("a", "b", "c")
// 依次调用
// onNext("a")
// onNext("b")
// onNext("c")
// onComplete
Observable.fromIterable(list).subscribe(observerStr)
Observable.fromArray("a","b","c").subscribe(observerStr)
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
当有观察者订阅时,从 Callable 的回调方法里获取要发射的数据。
val callable = Callable {
Thread.sleep(1000)
"callable"
}
Executors.newSingleThreadExecutor().submit(callable)
Log.e("RX", "start subscribe")
Observable.fromCallable(callable).subscribe({
Log.e("RX", "accept: $it")
})
日志输出:
05-07 14:16:15.531 1131-1131/pot.ner347.androiddemo E/RX: start subscribe
05-07 14:16:16.544 1131-1131/pot.ner347.androiddemo E/RX: accept: callable
订阅时是 14:16:15.531
,Callable 返回结果前先休眠了 1000ms,所以发射出再收到的时间在 1000ms 后。所以 14:16:16.544
时观察者收到了数据。
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
// 相当于 fromFuture(future, 0L, null)
public static <T> Observable<T> fromFuture(Future<? extends T> future)
// 相当于 fromFuture(future, timeout, unit).subscribeOn(scheduler)
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
// 相当于 fromFuture(future, 0L, null, scheduler)
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
共 4 个重载方法,本质差别不大,所以只看第一个即可。timeout 最终是传到了 Future 的 get 方法里。
val callable = Callable {
Thread.sleep(1000)
"callable"
}
val future = Executors.newSingleThreadExecutor().submit(callable)
Log.e("RX", "start subscribe")
Observable.fromFuture(future, 500L, TimeUnit.MILLISECONDS).subscribe(observerStr)
超时时间只有 500ms,所以进了 onError,当把超时改成 5000 时进了 onNext,收到了 "callable",然后 onComplete。
从响应式数据流获取要发射的数据。如果可以,尽可能使用 ObservableOnSubscribe。不支持背压。
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
虽然 Publisher 看起来像个接口,但不建议通过无状态的 Lambda 实现它。注释不太看得明白。应该和其它配合,从其它地方的数据流里返回,单独用没什么意义。
看 Flowable 实现了 Publisher,写了个例子,现在不太清楚。
val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter ->
emitter.onNext("abc")
emitter.onNext("def")
}, BackpressureStrategy.ERROR)
Observable.fromPublisher(flowable).subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.e("RX", "onSubscribe")
}
override fun onNext(str: String) {
Log.e("RX", "onNext $str")
}
override fun onError(e: Throwable) {
Log.e("RX", "onError")
}
override fun onComplete() {
Log.e("RX", "onComplete")
}
})
没有立刻创建被观察者,只有当观察者订阅时才创建,并且针对每个观察者创建都是一个新的 Observable。在回调里决定如何创建这个 Observable。不订阅就不创建。
Observable.defer {
// 订阅后才创建这个 Observable,使用了 just,就又调了 Observer 的 onNext
Observable.just("hello")
}.subscribe(observerStr)
不需要发射数据,但又需要告知观察者事件结束,即需要调 onComplete。
private val observerAny = object: Observer<Any> {
override fun onSubscribe(d: Disposable) { }
override fun onNext(t: Any) {
Log.e("RX", "onNext")
}
override fun onError(e: Throwable) {
Log.e("RX", "onError:" + e.message)
}
override fun onComplete() {
Log.e("RX", "onComplete")
}
}
// 输出 onComplete
Observable.empty<Any>().subscribe(observerAny)
相比 empty,不仅不发 onNext,也不会发 onComplete 或 onError,什么都不发射而且也不终止。
Observable.never<String>().subscribe(object : Observer<String> {
override fun onComplete() { Log.e("RX", "onComplete") }
override fun onSubscribe(d: Disposable) { Log.e("RX", "onSubscribe") }
override fun onNext(t: String) { Log.e("RX", "onNext") }
override fun onError(e: Throwable) { Log.e("RX", "onError") }
})
看日志只回调了 onSubscribe,主要用于测试。
直接发射 onError。两个重载方法
// 通过 errorSupplier 提供一个 Throwable 的子类
public static <T> Observable<T> error(Callable<? extends Throwable> errorSupplier)
// 参数直接就是一个 Throwable
public static <T> Observable<T> error(final Throwable exception)
// 输出 onError:abc
Observable.error<Any> { Throwable("abc") }.subscribe(observerAny)
// 输出 onError:edf
Observable.error<Any>(Throwable("edf")).subscribe(observerAny)
public static <T, S> Observable<T> generate(
final Callable<S> initialState,
final BiConsumer<S, Emitter<T>> generator,
Consumer<? super S> disposeState)
// 相当于上一个方法的第三个参数的 accept 方法是空实现
public static <T, S> Observable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator)
// 第二个参数是 BiFunction,最后一个泛型是 apply 方法返回值的类型,
// 表示新的 state,下面这个两个方法的 state 是可变的,而上面两个方法的 state 在第一个参数 initialState 指定后就不变了
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator)
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator,
Consumer<? super S> disposeState)
// 无状态的,相当于 initialState 的回调提供了一个 null
public static <T> Observable<T> generate(final Consumer<Emitter<T>> generator)
create 每次可以发射多个事件,而 generate 是每次只能发送一个事件,连续调用多次 onNext 会抛出 IllegalStateException 异常。
无状态:
是一个无限循环,只要条件满足就一直发射事件。下面的代码将不停的收到数据。
Observable.generate(Consumer<Emitter<String>>{
it.onNext("abc")
}).subscribe({ Log.e("RX", "收到: $it") })
一次连续发射 onNext 和 onComplete/onError 是可以的。
Observable.generate(Consumer<Emitter<String>>{
it.onNext("abc")
it.onComplete()
}).subscribe({ Log.e("RX", "收到: $it") })
如果连续调用多次 onNext 就崩溃了。
Observable.generate(Consumer<Emitter<String>>{
// 崩溃
it.onNext("abc")
it.onNext("ab")
}).subscribe({ Log.e("RX", "收到: $it") })
不可变状态:
Observable.generate(Callable<Int> { 0 }, BiConsumer<Int, Emitter<String>> { t1, t2 ->
Log.e("RX", "当前状态 $t1")
t2.onNext("$t1")
}) .subscribe({
Log.e("RX", "收到: $it")
})
状态不变,一直是 0,不停的发射不停的接收。
可变状态:
Observable.generate(Callable<Int> { 0 }, BiFunction<Int, Emitter<String>, Int> { currentState, t2 ->
if (currentState < 5) {
Log.e("RX", "onNext $currentState")
t2.onNext("emitter $currentState")
} else {
Log.e("RX", "onComplete $currentState")
t2.onComplete()
}
currentState + 1 // 返回一个新的状态
}, Consumer<Int> {
Log.e("RX", "dispose 时的 state $it")
})
日志:
05-10 15:41:06.922 20685-20685/pot.ner347.androiddemo E/RX: onNext 0
05-10 15:41:06.923 20685-20685/pot.ner347.androiddemo E/RX: 收到: 0
onNext 1
收到: 1
onNext 2
收到: 2
onNext 3
收到: 3
onNext 4
05-10 15:41:06.924 20685-20685/pot.ner347.androiddemo E/RX: 收到: 4
onComplete 5
dispose 时的 state 6
关于在 Flowable 里的应用看这篇文章 RxJava: Generating Backpressure-Aware Streams,和背压有关,没有彻底弄明白。
Flowable.generate(Callable<Int> { 0 }, BiFunction<Int, Emitter<String>, Int> { t1, t2 ->
if (t1 < 130) {
Log.e("RX", "发送 $t1")
t2.onNext("$t1")
} else {
t2.onComplete()
}
++state
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())
.doAfterNext({
Thread.sleep(10)
Log.e("RX", "观察者 $it 休眠 10ms")
})
.subscribe(object : Subscriber<String> {
override fun onSubscribe(s: Subscription?) {
s?.request(130)
}
override fun onNext(t: String?) {}
override fun onError(t: Throwable?) {}
override fun onComplete() {}
})
测试发现,假设要求 130 个数据,被观察者总共发 140 个事件,在发完 128 个后,观察者消费事件,当观察者接收了一些数据(有一次是已经收到 95 了)后,被观察者继续发送第 129-140 个事件,然后观察者接收到第 130 个。
每隔固定时长调用 onNext 发送一个 long 值,从 0 开始。默认在计算线程发射。
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
// 相当于 interval(initialDelay, period, unit, Schedulers.computation())
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
// 相当于 interval(period, period, unit, scheduler)
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
// 相当于 interval(period, unit, Schedulers.computation())
public static Observable<Long> interval(long period, TimeUnit unit)
private val observerLong = object : Observer<Long> {
override fun onNext(t: Long) {
Log.e("RX", "onNext thread: ${Thread.currentThread().name}, t:$t")
}
override fun onError(e: Throwable) {
textView.text = "${textView.text}\nonError "
}
override fun onComplete() {
Log.e("RX", "complete thread: ${Thread.currentThread().name}")
disposableLong?.dispose() // 解除订阅
}
override fun onSubscribe(d: Disposable) {
disposableLong = d
}
}
Observable.interval(2, 2, TimeUnit.SECONDS).subscribe(observerLong)
日志结果:
05-03 17:59:52.649 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:0
05-03 17:59:54.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:1
05-03 17:59:56.648 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:2
05-03 17:59:58.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:3
05-03 18:00:00.645 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:4
05-03 18:00:02.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:5
...
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
// 在 Schedulers.computation() 线程
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
看日志,从 1 开始,发了 5 个数据。从 click 开始 500ms 开始发射,然后大约 1000ms 发一次。
05-10 16:21:23.231 26568-26568/pot.ner347.androiddemo E/RX: click
05-10 16:21:23.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:1
05-10 16:21:24.759 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:2
05-10 16:21:25.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:3
05-10 16:21:26.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:4
05-10 16:21:27.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:5
05-10 16:21:27.761 26568-26867/pot.ner347.androiddemo E/RX: complete thread: RxComputationThreadPool-1
发射一个整数序列。区别一个是 int,一个是 long。
private val observerInt = object : Observer<Int> {
override fun onNext(t: Int) {
textView.text = "${textView.text}\nonNext $t"
}
override fun onError(e: Throwable) {
textView.text = "${textView.text}\nonError "
}
override fun onComplete() {
textView.text = "${textView.text}\nonComplete "
disposableInt?.dispose() // 解除订阅
}
override fun onSubscribe(d: Disposable) {
disposableInt = d
}
}
// 第一个参数是起始值,第二个参数是发射的个数,将依次调用
// onNext(10)
// onNext(11)
// onNext(12)
// onNext(13)
// onNext(14)
// onComplete
Observable.range(10, 5).subscribe(observerInt)
在一个给定的延迟后发射一个特殊的值。默认在 Schedulers.computation() 线程。
Observable.timer(2, TimeUnit.SECONDS).subscribe(observerLong)
onNext 中收到的是 0:
05-03 17:51:01.143 22986-24694/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:0
05-03 17:51:01.145 22986-24694/pot.ner347.androiddemo E/RX: complete thread: RxComputationThreadPool-1
create 的参数是 ObservableOnSubscribe,发射器是 ObservableEmitter。
unsafeCreate 的参数是 ObservableSource,参数是 Observer。相当于 1.x 时代的 create,没什么用。
public static <T> Observable<T> unsafeCreate(ObservableSource<T> onSubscribe) {
ObjectHelper.requireNonNull(onSubscribe, "source is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
if (onSubscribe instanceof Observable) {
throw new IllegalArgumentException("unsafeCreate(Observable) should be upgraded");
}
return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(onSubscribe));
}
public static <T> Observable<T> wrap(ObservableSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Observable) {
return RxJavaPlugins.onAssembly((Observable<T>)source);
}
return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}
和 unsafe 的区别,主要在 source instanceof Observable
,一个抛出异常,一个返回,返回时做了个类型转换,不知道怎么用。
对一个 Observable 做一系列的链式调用。
val transformer = ObservableTransformer<Int, String> {
upstream -> upstream.map { it.toString() }.take(2)
}
Observable.just(1,2,3,4).compose(transformer).subscribe(observerStr)
创建了一个 ObservableTransformer 对象,它里面封装了一些调用,以后可以统一复用。
重复调用。
// 调用
// onNext("repeat just")
// onNext("repeat just")
// onNext("repeat just")
// onComplete
Observable.just("repeat just").repeat(3).subscribe(observerStr)
// 无限重复
Observable.just("repeat just").repeat().subscribe{Log.e("RX", "$it")}
无限循环,直到参数返回的布尔值为 true,表明满足停止条件。
// i 为 11 时停止
var i = 0
Observable.just("repeat just").repeatUntil {
Log.e("RX", "i:$i")
i++ > 10
}.subscribe{Log.e("RX", "$it")}
repeatWhen 的参数接收原始 Observable 的 complete 和 error 通知,且决定是否要重新订阅和发射原来的 Observable。
Observable.just("repeat").repeatWhen {
Observable.interval(1000, TimeUnit.MILLISECONDS).take(5)
}.subscribe{Log.e("RX", "$it")}
重复了 6 次,1 秒重复一次。