我一直在寻找新的rx java2,我不太确定我是否已经理解了backpressure
的想法……
我知道我们有不支持backpressure
的Observable
和有backpressure
支持的Flowable
。
因此,基于示例,假设我有带有interval
的flowable
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
这将在大约128个值之后崩溃,这很明显我消耗的速度比获取项的速度要慢。
但是我们在Observable
上也是如此
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
这根本不会崩溃,即使我延迟了一些时间,它仍然可以工作。为了让Flowable
工作,假设我放入了onBackpressureDrop
操作符,crash消失了,但也不是所有的值都会被发出。
因此,我目前在脑海中找不到答案的基本问题是,为什么我要关心backpressure
,当我可以使用普通的Observable
时,仍然可以在不管理buffer
的情况下接收所有值?或者从另一个角度来看,在管理和处理消费方面,backpressure
给我带来了什么好处?
https://stackoverflow.com/questions/40323307
复制相似问题