前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava Flowable Processor

RxJava Flowable Processor

作者头像
三流编程
发布2018-09-11 15:54:13
2.1K0
发布2018-09-11 15:54:13
举报

Flowable/Subscriber

Backpressure 背压现象指生产者的速度大于消费者的速度。

同一个线程生产一个就消费了,不会产生问题,在异步线程中,如果生产者的速度大于消费者的速度,就会产生 Backpressure 问题。比如子线程的被观察者 1 秒生产发送一次,而观察者 2 秒才消费处理一个,造成事件的堆积,最后造成 OOM。

在 1.x 中,Backpressure 问题由 Observable 处理,2.x 中由 Flowable 专门来处理。

代码语言:javascript
复制
val flowable = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
    emitter.onNext(1)
    emitter.onNext(2)
    emitter.onNext(3)
    emitter.onComplete()
}, BackpressureStrategy.ERROR) // 增加了一个参数

val subscriber = object : Subscriber<Int> {
    override fun onSubscribe(s: Subscription) {
        s.request(java.lang.Long.MAX_VALUE)
    }

    override fun onNext(integer: Int?) {}
    override fun onError(t: Throwable) {}
    override fun onComplete() {}
}
flowable
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation())
        .subscribe(subscriber)

onSubscribe 的参数类型不再是 Disposable,而是 Subscription,可以调用它的 cancel() 切断观察者与被观察者之间的联系。Subscription 还有一个 request(long n) 方法,用来向生产者申请可以消费的事件数量。这样便可以根据本身的消费能力进行消费事件。

当调用了 request() 方法后,生产者便发送对应数量的事件供消费者消费。即生产者要求多少,消费者就发多少。

如果不显式调用 request 就表示消费能力为 0。request 这个方法若不调用,下游的 onNext 与 OnComplete 都不会调用。

处理策略

处理 Backpressure 的策略是处理 Subscriber 接收事件的方式,并不影响 Flowable 发送事件的方法。即使采用了处理 Backpressure 的策略,Flowable 原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是 Subscriber 接收事件的方式。

在异步调用时,RxJava 中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为 128,即只能缓存 128 个事件。无论 request() 中传入的数字比 128 大或小,缓存池中在刚开始都会存入 128 个事件。如果本身并没有这么多事件需要发送,则不会存 128 个事件。

策略就是创建 Flowable 的第二个参数。

  1. ERROR

在 ERROR 策略下,如果缓存池溢出,就会立刻抛出 MissingBackpressureException 异常。

代码语言:javascript
复制
Flowable.create(FlowableOnSubscribe<Int> { emitter ->
          for (i in 0..129) {
              debug("发$i")
              emitter.onNext(i)
          }
          emitter.onComplete()
      }, BackpressureStrategy.ERROR) //增加了一个参数
              .subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(object : Subscriber<Int> {
                  override fun onSubscribe(s: Subscription) {
//                      s.request(2)
                  }

                  override fun onNext(integer: Int?) {
                      debug("收$integer")
                  }

                  override fun onError(t: Throwable) {
                      error { t.toString() }
                  }

                  override fun onComplete() {
                  }
              })

Flowable 发送 129 个事件,而 Subscriber 一个也不处理,在 onError 中就收到了错误回调。

backpress.PNG

  1. BUFFER

就是把 RxJava 中默认的只能存 128 个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。

这样,消费者即使通过 request() 传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。

但是这种方式仍然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生 OOM。

总之 BUFFER 要慎用。

  1. DROP

消费者处理不了的事件就丢弃。消费者通过 request() 传入其需求 n,然后生产者把 n 个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。

  1. LATEST

与 DROP 功能基本一致。唯一的区别就是 LATEST 总能使消费者能够接收到生产者产生的最后一个事件。

  1. MISSING

直接消失了,下游不知道任何情况,不知道有没有溢出。


如果 Flowable 对象不是通过 create() 获取的或不是自己创建的,可以采用 onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest() 的方式指定背压策略。

代码语言:javascript
复制
Flowable.just(1).onBackpressureBuffer()
                .observeOn(AndroidSchedulers.mainThread())

Processor

Processor 和 Subject 的作用相同的,既是观察者,也是被观察者。Subject 不支持背压,是 RxJava 1.x 继承过来的,Processor 继承 FlowableProcessor,支持背压。

不要使用 Flowable 或 Observable 里的方法,这样会将 Processor 转成一个 Flowable 或 Observable,用 Processor 内部重写的 create。

自己控制在合适的时机发射什么值,是 complete,还是 error。

  • AsyncProcessor 不论何时订阅,都只发射最后一个数据,如果因为异常而终止,不会释放任何数据,但是会向 Observer 传递一个异常通知。
  • BehaviorProcessor 发射订阅之前的一个数据和订阅之后的全部数据。如果订阅之前没有值,可以使用默认值。
  • PublishProcessor 从哪里订阅就从哪里发射数据。
  • ReplayProcessor 无论何时订阅,都发射所有的数据。
  • SerializedProcessor 其它 Processor 不要在多线程上发射数据,如果确实要在多线程上使用,用这个 Processor 封装,可以保证在一个时刻只在一个线程上执行。
  • UnicastProcessor 只能有一个观察者。
代码语言:javascript
复制
// 发射 3
// val processor = AsyncProcessor.create<Int>()
// 发射 2,3
// val processor = BehaviorProcessor.create<Int>()
// 发射 3
// val processor = PublishProcessor.create<Int>()
// 发射 1,2,3
val processor = ReplayProcessor.create<Int>()

processor.onNext(1)
processor.onNext(2)
processor.subscribe({Log.e("RX", "$it")})
processor.onNext(3)
processor.onComplete()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.06.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flowable/Subscriber
    • 处理策略
    • Processor
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档