RxJava 创建操作符

just

内部触发对 Observer 的 onNext 方法的调用,just 中传递的参数将直接在 onNext 方法中接收到,参数的类型要和 Observer 的泛型保持一致。

共有 10 个重载方法,其中 2 个以上参数方法的内部直接调用了 fromArray。

private val observerStr = object : Observer<String> {
    override fun onNext(t: String) {
        textView.text = "${textView.text}\nonNext $t"
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        textView.text = "${textView.text}\nonComplete "
        disposableStr?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableStr = d
    }
}

// 调用 observer 的 onNext("just")
Observable.just("just").subscribe(observerStr)

fromArray/fromIterable

1.x 的 from 方法没有了。遍历集合,每个元素调用一次观察者的 onNext,最后调用 onComplete。

var list = listOf("a", "b", "c")
// 依次调用
// onNext("a")
// onNext("b")
// onNext("c")
// onComplete
Observable.fromIterable(list).subscribe(observerStr)
Observable.fromArray("a","b","c").subscribe(observerStr)

fromCallable

public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

当有观察者订阅时,从 Callable 的回调方法里获取要发射的数据。

val callable = Callable {
   Thread.sleep(1000)
   "callable"
}
Executors.newSingleThreadExecutor().submit(callable)
Log.e("RX", "start subscribe")
Observable.fromCallable(callable).subscribe({
   Log.e("RX", "accept: $it")
})

日志输出:

05-07 14:16:15.531 1131-1131/pot.ner347.androiddemo E/RX: start subscribe
05-07 14:16:16.544 1131-1131/pot.ner347.androiddemo E/RX: accept: callable

订阅时是 14:16:15.531,Callable 返回结果前先休眠了 1000ms,所以发射出再收到的时间在 1000ms 后。所以 14:16:16.544 时观察者收到了数据。

fromFuture

public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)

// 相当于 fromFuture(future, 0L, null)
public static <T> Observable<T> fromFuture(Future<? extends T> future)

// 相当于 fromFuture(future, timeout, unit).subscribeOn(scheduler)
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)

// 相当于 fromFuture(future, 0L, null, scheduler)
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)

共 4 个重载方法,本质差别不大,所以只看第一个即可。timeout 最终是传到了 Future 的 get 方法里。

val callable = Callable {
    Thread.sleep(1000)
    "callable"
}
val future = Executors.newSingleThreadExecutor().submit(callable)
Log.e("RX", "start subscribe")
Observable.fromFuture(future, 500L, TimeUnit.MILLISECONDS).subscribe(observerStr)

超时时间只有 500ms,所以进了 onError,当把超时改成 5000 时进了 onNext,收到了 "callable",然后 onComplete。

fromPublisher

从响应式数据流获取要发射的数据。如果可以,尽可能使用 ObservableOnSubscribe。不支持背压。

public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

虽然 Publisher 看起来像个接口,但不建议通过无状态的 Lambda 实现它。注释不太看得明白。应该和其它配合,从其它地方的数据流里返回,单独用没什么意义。

看 Flowable 实现了 Publisher,写了个例子,现在不太清楚。

val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter ->
    emitter.onNext("abc")
    emitter.onNext("def")
}, BackpressureStrategy.ERROR)
Observable.fromPublisher(flowable).subscribe(object : Observer<String> {
    override fun onSubscribe(d: Disposable) {
        Log.e("RX", "onSubscribe")
    }

    override fun onNext(str: String) {
        Log.e("RX", "onNext $str")
    }

    override fun onError(e: Throwable) {
        Log.e("RX", "onError")
    }

    override fun onComplete() {
        Log.e("RX", "onComplete")
    }
})

defer

没有立刻创建被观察者,只有当观察者订阅时才创建,并且针对每个观察者创建都是一个新的 Observable。在回调里决定如何创建这个 Observable。不订阅就不创建。

Observable.defer {
    // 订阅后才创建这个 Observable,使用了 just,就又调了 Observer 的 onNext
    Observable.just("hello")
}.subscribe(observerStr)

empty

不需要发射数据,但又需要告知观察者事件结束,即需要调 onComplete。

private val observerAny = object: Observer<Any> {
  override fun onSubscribe(d: Disposable) { }

  override fun onNext(t: Any) {
      Log.e("RX", "onNext")
  }

  override fun onError(e: Throwable) {
      Log.e("RX", "onError:" + e.message)
  }

  override fun onComplete() {
      Log.e("RX", "onComplete")
  }
}
      
// 输出 onComplete
Observable.empty<Any>().subscribe(observerAny)

never

相比 empty,不仅不发 onNext,也不会发 onComplete 或 onError,什么都不发射而且也不终止。

Observable.never<String>().subscribe(object : Observer<String> {
   override fun onComplete() { Log.e("RX", "onComplete") }
   override fun onSubscribe(d: Disposable) { Log.e("RX", "onSubscribe") }
   override fun onNext(t: String) { Log.e("RX", "onNext") }
   override fun onError(e: Throwable) {  Log.e("RX", "onError") }
})

看日志只回调了 onSubscribe,主要用于测试。

error

直接发射 onError。两个重载方法

// 通过 errorSupplier 提供一个 Throwable 的子类
public static <T> Observable<T> error(Callable<? extends Throwable> errorSupplier)

// 参数直接就是一个 Throwable
public static <T> Observable<T> error(final Throwable exception)
// 输出 onError:abc
Observable.error<Any> { Throwable("abc") }.subscribe(observerAny)

// 输出 onError:edf
Observable.error<Any>(Throwable("edf")).subscribe(observerAny)

generate

public static <T, S> Observable<T> generate(
          final Callable<S> initialState,
          final BiConsumer<S, Emitter<T>> generator,
          Consumer<? super S> disposeState)
// 相当于上一个方法的第三个参数的 accept 方法是空实现
public static <T, S> Observable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator)

// 第二个参数是 BiFunction,最后一个泛型是 apply 方法返回值的类型,
// 表示新的 state,下面这个两个方法的 state 是可变的,而上面两个方法的 state 在第一个参数 initialState 指定后就不变了
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator)
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator,
          Consumer<? super S> disposeState)

// 无状态的,相当于 initialState 的回调提供了一个 null
public static <T> Observable<T> generate(final Consumer<Emitter<T>> generator)

create 每次可以发射多个事件,而 generate 是每次只能发送一个事件,连续调用多次 onNext 会抛出 IllegalStateException 异常。


无状态:

是一个无限循环,只要条件满足就一直发射事件。下面的代码将不停的收到数据。

Observable.generate(Consumer<Emitter<String>>{
   it.onNext("abc")
}).subscribe({ Log.e("RX", "收到: $it") })

一次连续发射 onNext 和 onComplete/onError 是可以的。

Observable.generate(Consumer<Emitter<String>>{
   it.onNext("abc")
   it.onComplete()
}).subscribe({ Log.e("RX", "收到: $it") })

如果连续调用多次 onNext 就崩溃了。

Observable.generate(Consumer<Emitter<String>>{
   // 崩溃
   it.onNext("abc")
   it.onNext("ab")
}).subscribe({ Log.e("RX", "收到: $it") })

不可变状态:

Observable.generate(Callable<Int> { 0 }, BiConsumer<Int, Emitter<String>> { t1, t2 ->
   Log.e("RX", "当前状态 $t1")
   t2.onNext("$t1")
}) .subscribe({
   Log.e("RX", "收到: $it")
})

状态不变,一直是 0,不停的发射不停的接收。


可变状态:

Observable.generate(Callable<Int> { 0 }, BiFunction<Int, Emitter<String>, Int> { currentState, t2 ->
    if (currentState < 5) {
        Log.e("RX", "onNext $currentState")
        t2.onNext("emitter $currentState")
    } else {
        Log.e("RX", "onComplete $currentState")
        t2.onComplete()
    }
    currentState + 1 // 返回一个新的状态
}, Consumer<Int> {
    Log.e("RX", "dispose 时的 state $it")
})

日志:

05-10 15:41:06.922 20685-20685/pot.ner347.androiddemo E/RX: onNext 0
05-10 15:41:06.923 20685-20685/pot.ner347.androiddemo E/RX: 收到: 0
    onNext 1
    收到: 1
    onNext 2
    收到: 2
    onNext 3
    收到: 3
    onNext 4
05-10 15:41:06.924 20685-20685/pot.ner347.androiddemo E/RX: 收到: 4
    onComplete 5
    dispose 时的 state 6

关于在 Flowable 里的应用看这篇文章 RxJava: Generating Backpressure-Aware Streams,和背压有关,没有彻底弄明白。

Flowable.generate(Callable<Int> { 0 }, BiFunction<Int, Emitter<String>, Int> { t1, t2 ->
    if (t1 < 130) {
        Log.e("RX", "发送 $t1")
        t2.onNext("$t1")
    } else {
        t2.onComplete()
    }
    ++state
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())
        .doAfterNext({
            Thread.sleep(10)
            Log.e("RX", "观察者 $it 休眠 10ms")
        })
       .subscribe(object : Subscriber<String> {
           override fun onSubscribe(s: Subscription?) {
               s?.request(130)
           }

           override fun onNext(t: String?) {}

           override fun onError(t: Throwable?) {}

           override fun onComplete() {}
       })

测试发现,假设要求 130 个数据,被观察者总共发 140 个事件,在发完 128 个后,观察者消费事件,当观察者接收了一些数据(有一次是已经收到 95 了)后,被观察者继续发送第 129-140 个事件,然后观察者接收到第 130 个。

interval

每隔固定时长调用 onNext 发送一个 long 值,从 0 开始。默认在计算线程发射。

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
// 相当于 interval(initialDelay, period, unit, Schedulers.computation())
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

// 相当于 interval(period, period, unit, scheduler)
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
// 相当于 interval(period, unit, Schedulers.computation())
public static Observable<Long> interval(long period, TimeUnit unit)
private val observerLong = object : Observer<Long> {
    override fun onNext(t: Long) {
        Log.e("RX", "onNext thread: ${Thread.currentThread().name}, t:$t")
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        Log.e("RX", "complete thread: ${Thread.currentThread().name}")
        disposableLong?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableLong = d
    }
}

Observable.interval(2, 2, TimeUnit.SECONDS).subscribe(observerLong)

日志结果:

05-03 17:59:52.649 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:0
05-03 17:59:54.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:1
05-03 17:59:56.648 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:2
05-03 17:59:58.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:3
05-03 18:00:00.645 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:4
05-03 18:00:02.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:5
...

intervalRange

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

// 在 Schedulers.computation() 线程
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)

看日志,从 1 开始,发了 5 个数据。从 click 开始 500ms 开始发射,然后大约 1000ms 发一次。

05-10 16:21:23.231 26568-26568/pot.ner347.androiddemo E/RX: click
05-10 16:21:23.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:1
05-10 16:21:24.759 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:2
05-10 16:21:25.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:3
05-10 16:21:26.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:4
05-10 16:21:27.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:5
05-10 16:21:27.761 26568-26867/pot.ner347.androiddemo E/RX: complete thread: RxComputationThreadPool-1

range/rangeLong

发射一个整数序列。区别一个是 int,一个是 long。

private val observerInt = object : Observer<Int> {
    override fun onNext(t: Int) {
        textView.text = "${textView.text}\nonNext $t"
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        textView.text = "${textView.text}\nonComplete "
        disposableInt?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableInt = d
    }
}

// 第一个参数是起始值,第二个参数是发射的个数,将依次调用
// onNext(10)
// onNext(11)
// onNext(12)
// onNext(13)
// onNext(14)
// onComplete
Observable.range(10, 5).subscribe(observerInt)

timer

在一个给定的延迟后发射一个特殊的值。默认在 Schedulers.computation() 线程。

Observable.timer(2, TimeUnit.SECONDS).subscribe(observerLong)

onNext 中收到的是 0:

05-03 17:51:01.143 22986-24694/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:0
05-03 17:51:01.145 22986-24694/pot.ner347.androiddemo E/RX: complete thread: RxComputationThreadPool-1

unsafeCreate

create 的参数是 ObservableOnSubscribe,发射器是 ObservableEmitter。

unsafeCreate 的参数是 ObservableSource,参数是 Observer。相当于 1.x 时代的 create,没什么用。

public static <T> Observable<T> unsafeCreate(ObservableSource<T> onSubscribe) {
    ObjectHelper.requireNonNull(onSubscribe, "source is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    if (onSubscribe instanceof Observable) {
        throw new IllegalArgumentException("unsafeCreate(Observable) should be upgraded");
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(onSubscribe));
}

wrap

public static <T> Observable<T> wrap(ObservableSource<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    if (source instanceof Observable) {
        return RxJavaPlugins.onAssembly((Observable<T>)source);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}

和 unsafe 的区别,主要在 source instanceof Observable,一个抛出异常,一个返回,返回时做了个类型转换,不知道怎么用。

compose

对一个 Observable 做一系列的链式调用。

val transformer = ObservableTransformer<Int, String> {
    upstream -> upstream.map { it.toString() }.take(2)
}
     Observable.just(1,2,3,4).compose(transformer).subscribe(observerStr)

创建了一个 ObservableTransformer 对象,它里面封装了一些调用,以后可以统一复用。

repeat

重复调用。

// 调用
// onNext("repeat just")
// onNext("repeat just")
// onNext("repeat just")
// onComplete
Observable.just("repeat just").repeat(3).subscribe(observerStr)
// 无限重复
Observable.just("repeat just").repeat().subscribe{Log.e("RX", "$it")}

repeatUntil

无限循环,直到参数返回的布尔值为 true,表明满足停止条件。

// i 为 11 时停止
var i = 0
Observable.just("repeat just").repeatUntil {
   Log.e("RX", "i:$i")
   i++ > 10
}.subscribe{Log.e("RX", "$it")}

repeatWhen

repeatWhen 的参数接收原始 Observable 的 complete 和 error 通知,且决定是否要重新订阅和发射原来的 Observable。

Observable.just("repeat").repeatWhen {
    Observable.interval(1000, TimeUnit.MILLISECONDS).take(5)
}.subscribe{Log.e("RX", "$it")}

重复了 6 次,1 秒重复一次。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小工匠技术圈

【小工匠聊密码学】--消息摘要--SHA算法

1955
来自专栏Hongten

PBE_Password-based encryption(基于密码加密)_项目中你也可以有

中说道了PBE——Password-based encryption(基于密码加密)。我也做测试了一下,现在把我做的效果给大家演示一下:

1011
来自专栏freesan44

三重Des对称加密在Android、Ios 和Java 平台的实现

如今手机app五彩缤纷,确保手机用户的数据安全是开发人员必须掌握的技巧,下面通过实例介绍DES在android、ios、java平台的使用方法;

1271
来自专栏Java与Android技术栈

Cold Observable 和 Hot Observable

Hot Observable 无论有没有 Subscriber 订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时,Hot Observa...

1642
来自专栏BaronTalk

RxJava系列六(从微观角度解读RxJava源码)

前言 通过前面五个篇幅的介绍,相信大家对RxJava的基本使用以及操作符应该有了一定的认识。但是知其然还要知其所以然;所以从这一章开始我们聊聊源码,分析RxJ...

4217
来自专栏小工匠技术圈

【小工匠聊密码学】--对称加密--3DES

  3DES(或称为Triple DES)是三重数据加密算法(TDEA,Triple Data Encryption Algorithm)块密码的通称。它相当于...

1062
来自专栏数据结构与算法

08:Vigenère密码

08:Vigenère密码 总时间限制: 1000ms 内存限制: 65536kB描述 16世纪法国外交家Blaise de Vigenère设计了一种多表...

4599
来自专栏技术博文

PHP的几个常用加密函数

在php的开发过程中,常常需要对部分数据(如用户密码)进行加密 一、加密类型: 1.单向散列加密   就是把任意长度的信息进行散列计算,得到固定长度的输出,这个...

4408
来自专栏三流程序员的挣扎

RxJava 错误处理操作符

在源 Observable 遇到错误时,立即停止源 Observable 的数据发送,并用新的 Observable 对象进行新的数据发送。

3222
来自专栏ccylovehs

武电实业卡密接口对接经验分享

以往卡密都是电信直接提供txt文件我们这边再导入数据库中,现在电信要求厂家获取卡密必须通过接口调用,此次对接主要用到三个方面知识:

1151

扫码关注云+社区

领取腾讯云代金券