前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava 创建操作符

RxJava 创建操作符

作者头像
三流之路
发布2018-09-11 15:55:47
9540
发布2018-09-11 15:55:47
举报
文章被收录于专栏:三流程序员的挣扎

just

内部触发对 Observer 的 onNext 方法的调用,just 中传递的参数将直接在 onNext 方法中接收到,参数的类型要和 Observer 的泛型保持一致。

共有 10 个重载方法,其中 2 个以上参数方法的内部直接调用了 fromArray。

代码语言:javascript
复制
private val observerStr = object : Observer<String> {
    override fun onNext(t: String) {
        textView.text = "${textView.text}\nonNext $t"
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        textView.text = "${textView.text}\nonComplete "
        disposableStr?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableStr = d
    }
}

// 调用 observer 的 onNext("just")
Observable.just("just").subscribe(observerStr)

fromArray/fromIterable

1.x 的 from 方法没有了。遍历集合,每个元素调用一次观察者的 onNext,最后调用 onComplete。

代码语言:javascript
复制
var list = listOf("a", "b", "c")
// 依次调用
// onNext("a")
// onNext("b")
// onNext("c")
// onComplete
Observable.fromIterable(list).subscribe(observerStr)
Observable.fromArray("a","b","c").subscribe(observerStr)

fromCallable

代码语言:javascript
复制
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

当有观察者订阅时,从 Callable 的回调方法里获取要发射的数据。

代码语言:javascript
复制
val callable = Callable {
   Thread.sleep(1000)
   "callable"
}
Executors.newSingleThreadExecutor().submit(callable)
Log.e("RX", "start subscribe")
Observable.fromCallable(callable).subscribe({
   Log.e("RX", "accept: $it")
})

日志输出:

代码语言:javascript
复制
05-07 14:16:15.531 1131-1131/pot.ner347.androiddemo E/RX: start subscribe
05-07 14:16:16.544 1131-1131/pot.ner347.androiddemo E/RX: accept: callable

订阅时是 14:16:15.531,Callable 返回结果前先休眠了 1000ms,所以发射出再收到的时间在 1000ms 后。所以 14:16:16.544 时观察者收到了数据。

fromFuture

代码语言:javascript
复制
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)

// 相当于 fromFuture(future, 0L, null)
public static <T> Observable<T> fromFuture(Future<? extends T> future)

// 相当于 fromFuture(future, timeout, unit).subscribeOn(scheduler)
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)

// 相当于 fromFuture(future, 0L, null, scheduler)
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)

共 4 个重载方法,本质差别不大,所以只看第一个即可。timeout 最终是传到了 Future 的 get 方法里。

代码语言:javascript
复制
val callable = Callable {
    Thread.sleep(1000)
    "callable"
}
val future = Executors.newSingleThreadExecutor().submit(callable)
Log.e("RX", "start subscribe")
Observable.fromFuture(future, 500L, TimeUnit.MILLISECONDS).subscribe(observerStr)

超时时间只有 500ms,所以进了 onError,当把超时改成 5000 时进了 onNext,收到了 "callable",然后 onComplete。

fromPublisher

从响应式数据流获取要发射的数据。如果可以,尽可能使用 ObservableOnSubscribe。不支持背压。

代码语言:javascript
复制
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

虽然 Publisher 看起来像个接口,但不建议通过无状态的 Lambda 实现它。注释不太看得明白。应该和其它配合,从其它地方的数据流里返回,单独用没什么意义。

看 Flowable 实现了 Publisher,写了个例子,现在不太清楚。

代码语言:javascript
复制
val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter ->
    emitter.onNext("abc")
    emitter.onNext("def")
}, BackpressureStrategy.ERROR)
Observable.fromPublisher(flowable).subscribe(object : Observer<String> {
    override fun onSubscribe(d: Disposable) {
        Log.e("RX", "onSubscribe")
    }

    override fun onNext(str: String) {
        Log.e("RX", "onNext $str")
    }

    override fun onError(e: Throwable) {
        Log.e("RX", "onError")
    }

    override fun onComplete() {
        Log.e("RX", "onComplete")
    }
})

defer

没有立刻创建被观察者,只有当观察者订阅时才创建,并且针对每个观察者创建都是一个新的 Observable。在回调里决定如何创建这个 Observable。不订阅就不创建。

代码语言:javascript
复制
Observable.defer {
    // 订阅后才创建这个 Observable,使用了 just,就又调了 Observer 的 onNext
    Observable.just("hello")
}.subscribe(observerStr)

empty

不需要发射数据,但又需要告知观察者事件结束,即需要调 onComplete。

代码语言:javascript
复制
private val observerAny = object: Observer<Any> {
  override fun onSubscribe(d: Disposable) { }

  override fun onNext(t: Any) {
      Log.e("RX", "onNext")
  }

  override fun onError(e: Throwable) {
      Log.e("RX", "onError:" + e.message)
  }

  override fun onComplete() {
      Log.e("RX", "onComplete")
  }
}
      
// 输出 onComplete
Observable.empty<Any>().subscribe(observerAny)

never

相比 empty,不仅不发 onNext,也不会发 onComplete 或 onError,什么都不发射而且也不终止。

代码语言:javascript
复制
Observable.never<String>().subscribe(object : Observer<String> {
   override fun onComplete() { Log.e("RX", "onComplete") }
   override fun onSubscribe(d: Disposable) { Log.e("RX", "onSubscribe") }
   override fun onNext(t: String) { Log.e("RX", "onNext") }
   override fun onError(e: Throwable) {  Log.e("RX", "onError") }
})

看日志只回调了 onSubscribe,主要用于测试。

error

直接发射 onError。两个重载方法

代码语言:javascript
复制
// 通过 errorSupplier 提供一个 Throwable 的子类
public static <T> Observable<T> error(Callable<? extends Throwable> errorSupplier)

// 参数直接就是一个 Throwable
public static <T> Observable<T> error(final Throwable exception)
代码语言:javascript
复制
// 输出 onError:abc
Observable.error<Any> { Throwable("abc") }.subscribe(observerAny)

// 输出 onError:edf
Observable.error<Any>(Throwable("edf")).subscribe(observerAny)

generate

代码语言:javascript
复制
public static <T, S> Observable<T> generate(
          final Callable<S> initialState,
          final BiConsumer<S, Emitter<T>> generator,
          Consumer<? super S> disposeState)
// 相当于上一个方法的第三个参数的 accept 方法是空实现
public static <T, S> Observable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator)

// 第二个参数是 BiFunction,最后一个泛型是 apply 方法返回值的类型,
// 表示新的 state,下面这个两个方法的 state 是可变的,而上面两个方法的 state 在第一个参数 initialState 指定后就不变了
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator)
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator,
          Consumer<? super S> disposeState)

// 无状态的,相当于 initialState 的回调提供了一个 null
public static <T> Observable<T> generate(final Consumer<Emitter<T>> generator)

create 每次可以发射多个事件,而 generate 是每次只能发送一个事件,连续调用多次 onNext 会抛出 IllegalStateException 异常。


无状态:

是一个无限循环,只要条件满足就一直发射事件。下面的代码将不停的收到数据。

代码语言:javascript
复制
Observable.generate(Consumer<Emitter<String>>{
   it.onNext("abc")
}).subscribe({ Log.e("RX", "收到: $it") })

一次连续发射 onNext 和 onComplete/onError 是可以的。

代码语言:javascript
复制
Observable.generate(Consumer<Emitter<String>>{
   it.onNext("abc")
   it.onComplete()
}).subscribe({ Log.e("RX", "收到: $it") })

如果连续调用多次 onNext 就崩溃了。

代码语言:javascript
复制
Observable.generate(Consumer<Emitter<String>>{
   // 崩溃
   it.onNext("abc")
   it.onNext("ab")
}).subscribe({ Log.e("RX", "收到: $it") })

不可变状态:

代码语言:javascript
复制
Observable.generate(Callable<Int> { 0 }, BiConsumer<Int, Emitter<String>> { t1, t2 ->
   Log.e("RX", "当前状态 $t1")
   t2.onNext("$t1")
}) .subscribe({
   Log.e("RX", "收到: $it")
})

状态不变,一直是 0,不停的发射不停的接收。


可变状态:

代码语言:javascript
复制
Observable.generate(Callable<Int> { 0 }, BiFunction<Int, Emitter<String>, Int> { currentState, t2 ->
    if (currentState < 5) {
        Log.e("RX", "onNext $currentState")
        t2.onNext("emitter $currentState")
    } else {
        Log.e("RX", "onComplete $currentState")
        t2.onComplete()
    }
    currentState + 1 // 返回一个新的状态
}, Consumer<Int> {
    Log.e("RX", "dispose 时的 state $it")
})

日志:

代码语言:javascript
复制
05-10 15:41:06.922 20685-20685/pot.ner347.androiddemo E/RX: onNext 0
05-10 15:41:06.923 20685-20685/pot.ner347.androiddemo E/RX: 收到: 0
    onNext 1
    收到: 1
    onNext 2
    收到: 2
    onNext 3
    收到: 3
    onNext 4
05-10 15:41:06.924 20685-20685/pot.ner347.androiddemo E/RX: 收到: 4
    onComplete 5
    dispose 时的 state 6

关于在 Flowable 里的应用看这篇文章 RxJava: Generating Backpressure-Aware Streams,和背压有关,没有彻底弄明白。

代码语言:javascript
复制
Flowable.generate(Callable<Int> { 0 }, BiFunction<Int, Emitter<String>, Int> { t1, t2 ->
    if (t1 < 130) {
        Log.e("RX", "发送 $t1")
        t2.onNext("$t1")
    } else {
        t2.onComplete()
    }
    ++state
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())
        .doAfterNext({
            Thread.sleep(10)
            Log.e("RX", "观察者 $it 休眠 10ms")
        })
       .subscribe(object : Subscriber<String> {
           override fun onSubscribe(s: Subscription?) {
               s?.request(130)
           }

           override fun onNext(t: String?) {}

           override fun onError(t: Throwable?) {}

           override fun onComplete() {}
       })

测试发现,假设要求 130 个数据,被观察者总共发 140 个事件,在发完 128 个后,观察者消费事件,当观察者接收了一些数据(有一次是已经收到 95 了)后,被观察者继续发送第 129-140 个事件,然后观察者接收到第 130 个。

interval

每隔固定时长调用 onNext 发送一个 long 值,从 0 开始。默认在计算线程发射。

代码语言:javascript
复制
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
// 相当于 interval(initialDelay, period, unit, Schedulers.computation())
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

// 相当于 interval(period, period, unit, scheduler)
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
// 相当于 interval(period, unit, Schedulers.computation())
public static Observable<Long> interval(long period, TimeUnit unit)
代码语言:javascript
复制
private val observerLong = object : Observer<Long> {
    override fun onNext(t: Long) {
        Log.e("RX", "onNext thread: ${Thread.currentThread().name}, t:$t")
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        Log.e("RX", "complete thread: ${Thread.currentThread().name}")
        disposableLong?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableLong = d
    }
}

Observable.interval(2, 2, TimeUnit.SECONDS).subscribe(observerLong)

日志结果:

代码语言:javascript
复制
05-03 17:59:52.649 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:0
05-03 17:59:54.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:1
05-03 17:59:56.648 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:2
05-03 17:59:58.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:3
05-03 18:00:00.645 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:4
05-03 18:00:02.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:5
...

intervalRange

代码语言:javascript
复制
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

// 在 Schedulers.computation() 线程
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)

看日志,从 1 开始,发了 5 个数据。从 click 开始 500ms 开始发射,然后大约 1000ms 发一次。

代码语言:javascript
复制
05-10 16:21:23.231 26568-26568/pot.ner347.androiddemo E/RX: click
05-10 16:21:23.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:1
05-10 16:21:24.759 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:2
05-10 16:21:25.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:3
05-10 16:21:26.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:4
05-10 16:21:27.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:5
05-10 16:21:27.761 26568-26867/pot.ner347.androiddemo E/RX: complete thread: RxComputationThreadPool-1

range/rangeLong

发射一个整数序列。区别一个是 int,一个是 long。

代码语言:javascript
复制
private val observerInt = object : Observer<Int> {
    override fun onNext(t: Int) {
        textView.text = "${textView.text}\nonNext $t"
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        textView.text = "${textView.text}\nonComplete "
        disposableInt?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableInt = d
    }
}

// 第一个参数是起始值,第二个参数是发射的个数,将依次调用
// onNext(10)
// onNext(11)
// onNext(12)
// onNext(13)
// onNext(14)
// onComplete
Observable.range(10, 5).subscribe(observerInt)

timer

在一个给定的延迟后发射一个特殊的值。默认在 Schedulers.computation() 线程。

代码语言:javascript
复制
Observable.timer(2, TimeUnit.SECONDS).subscribe(observerLong)

onNext 中收到的是 0:

代码语言:javascript
复制
05-03 17:51:01.143 22986-24694/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:0
05-03 17:51:01.145 22986-24694/pot.ner347.androiddemo E/RX: complete thread: RxComputationThreadPool-1

unsafeCreate

create 的参数是 ObservableOnSubscribe,发射器是 ObservableEmitter。

unsafeCreate 的参数是 ObservableSource,参数是 Observer。相当于 1.x 时代的 create,没什么用。

代码语言:javascript
复制
public static <T> Observable<T> unsafeCreate(ObservableSource<T> onSubscribe) {
    ObjectHelper.requireNonNull(onSubscribe, "source is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    if (onSubscribe instanceof Observable) {
        throw new IllegalArgumentException("unsafeCreate(Observable) should be upgraded");
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(onSubscribe));
}

wrap

代码语言:javascript
复制
public static <T> Observable<T> wrap(ObservableSource<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    if (source instanceof Observable) {
        return RxJavaPlugins.onAssembly((Observable<T>)source);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}

和 unsafe 的区别,主要在 source instanceof Observable,一个抛出异常,一个返回,返回时做了个类型转换,不知道怎么用。

compose

对一个 Observable 做一系列的链式调用。

代码语言:javascript
复制
val transformer = ObservableTransformer<Int, String> {
    upstream -> upstream.map { it.toString() }.take(2)
}
     Observable.just(1,2,3,4).compose(transformer).subscribe(observerStr)

创建了一个 ObservableTransformer 对象,它里面封装了一些调用,以后可以统一复用。

repeat

重复调用。

代码语言:javascript
复制
// 调用
// onNext("repeat just")
// onNext("repeat just")
// onNext("repeat just")
// onComplete
Observable.just("repeat just").repeat(3).subscribe(observerStr)
代码语言:javascript
复制
// 无限重复
Observable.just("repeat just").repeat().subscribe{Log.e("RX", "$it")}

repeatUntil

无限循环,直到参数返回的布尔值为 true,表明满足停止条件。

代码语言:javascript
复制
// i 为 11 时停止
var i = 0
Observable.just("repeat just").repeatUntil {
   Log.e("RX", "i:$i")
   i++ > 10
}.subscribe{Log.e("RX", "$it")}

repeatWhen

repeatWhen 的参数接收原始 Observable 的 complete 和 error 通知,且决定是否要重新订阅和发射原来的 Observable。

代码语言:javascript
复制
Observable.just("repeat").repeatWhen {
    Observable.interval(1000, TimeUnit.MILLISECONDS).take(5)
}.subscribe{Log.e("RX", "$it")}

重复了 6 次,1 秒重复一次。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.06.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • just
  • fromArray/fromIterable
  • fromCallable
  • fromFuture
  • fromPublisher
  • defer
  • empty
  • never
  • error
  • generate
  • interval
  • intervalRange
  • range/rangeLong
  • timer
  • unsafeCreate
  • wrap
  • compose
  • repeat
  • repeatUntil
  • repeatWhen
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档