Backpressure 背压现象指生产者的速度大于消费者的速度。
同一个线程生产一个就消费了,不会产生问题,在异步线程中,如果生产者的速度大于消费者的速度,就会产生 Backpressure 问题。比如子线程的被观察者 1 秒生产发送一次,而观察者 2 秒才消费处理一个,造成事件的堆积,最后造成 OOM。
在 1.x 中,Backpressure 问题由 Observable 处理,2.x 中由 Flowable 专门来处理。
val flowable = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}, BackpressureStrategy.ERROR) // 增加了一个参数
val subscriber = object : Subscriber<Int> {
override fun onSubscribe(s: Subscription) {
s.request(java.lang.Long.MAX_VALUE)
}
override fun onNext(integer: Int?) {}
override fun onError(t: Throwable) {}
override fun onComplete() {}
}
flowable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(subscriber)
onSubscribe 的参数类型不再是 Disposable,而是 Subscription,可以调用它的 cancel()
切断观察者与被观察者之间的联系。Subscription 还有一个 request(long n)
方法,用来向生产者申请可以消费的事件数量。这样便可以根据本身的消费能力进行消费事件。
当调用了 request() 方法后,生产者便发送对应数量的事件供消费者消费。即生产者要求多少,消费者就发多少。
如果不显式调用 request 就表示消费能力为 0。request 这个方法若不调用,下游的 onNext 与 OnComplete 都不会调用。
处理 Backpressure 的策略是处理 Subscriber 接收事件的方式,并不影响 Flowable 发送事件的方法。即使采用了处理 Backpressure 的策略,Flowable 原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是 Subscriber 接收事件的方式。
在异步调用时,RxJava 中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为 128,即只能缓存 128 个事件。无论 request() 中传入的数字比 128 大或小,缓存池中在刚开始都会存入 128 个事件。如果本身并没有这么多事件需要发送,则不会存 128 个事件。
策略就是创建 Flowable 的第二个参数。
在 ERROR 策略下,如果缓存池溢出,就会立刻抛出 MissingBackpressureException 异常。
Flowable.create(FlowableOnSubscribe<Int> { emitter ->
for (i in 0..129) {
debug("发$i")
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.ERROR) //增加了一个参数
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription) {
// s.request(2)
}
override fun onNext(integer: Int?) {
debug("收$integer")
}
override fun onError(t: Throwable) {
error { t.toString() }
}
override fun onComplete() {
}
})
Flowable 发送 129 个事件,而 Subscriber 一个也不处理,在 onError 中就收到了错误回调。
backpress.PNG
就是把 RxJava 中默认的只能存 128 个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者即使通过 request() 传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式仍然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生 OOM。
总之 BUFFER 要慎用。
消费者处理不了的事件就丢弃。消费者通过 request() 传入其需求 n,然后生产者把 n 个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
与 DROP 功能基本一致。唯一的区别就是 LATEST 总能使消费者能够接收到生产者产生的最后一个事件。
直接消失了,下游不知道任何情况,不知道有没有溢出。
如果 Flowable 对象不是通过 create() 获取的或不是自己创建的,可以采用 onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest() 的方式指定背压策略。
Flowable.just(1).onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
Processor 和 Subject 的作用相同的,既是观察者,也是被观察者。Subject 不支持背压,是 RxJava 1.x 继承过来的,Processor 继承 FlowableProcessor,支持背压。
不要使用 Flowable 或 Observable 里的方法,这样会将 Processor 转成一个 Flowable 或 Observable,用 Processor 内部重写的 create。
自己控制在合适的时机发射什么值,是 complete,还是 error。
// 发射 3
// val processor = AsyncProcessor.create<Int>()
// 发射 2,3
// val processor = BehaviorProcessor.create<Int>()
// 发射 3
// val processor = PublishProcessor.create<Int>()
// 发射 1,2,3
val processor = ReplayProcessor.create<Int>()
processor.onNext(1)
processor.onNext(2)
processor.subscribe({Log.e("RX", "$it")})
processor.onNext(3)
processor.onComplete()