被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应或者处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) {
Log.e("emitter", "发送1");
emitter.onNext("111");
Log.e("emitter", "发送2");
emitter.onNext("222");
Log.e("emitter", "发送3");
emitter.onNext("333");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(String s) {
Log.e("emitter", "接受" + s);
}
@Override
public void onError(Throwable t) {
Log.e("onError", t.getLocalizedMessage());
}
@Override
public void onComplete() {
}
});
其实对于同步而言,讨论背压毫无意义。emitter.onNext然后直接就是Subscriber.onNext,然后再下一个emitter.onNext。因为这是同步的,不存在缓存队列。就如例子而言,s.request(n),如果n小于3,会根据Error策略,直接走OnError方法(具体请看代码)。如果n大于3,是5,直接onComplete,不管有没有发送满5个 总的来说,同步并没有采用什么背压,如果非要说的话,那也是亡羊补牢式的
先来看几段代码 FlowableCreate---NoOverflowBaseAsyncEmitter的onNext方法
public final void onNext(T t) {
。。。。。。
if (get() != 0) {//get最初是128,也就是buffer-size,这是子线程
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
也就是发送的时候,超过128个数据,就走onError,没有就往下一个onNext走 (可以先看一下ObserveOnSubscriber的onSubscribe函数,里面有queue的构造,以及sourceMode其实并没有赋值) 再来看BaseObserveOnSubscriber的onNext方法
@Override
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
if (!queue.offer(t)) {//这个queue就是FlowableObserveOn的构造函数中的prefetch大小的一个队列。这里默认是128
//也就是最上面get为什么是128的原因
//此时还没到Handler,所以还是子线程
upstream.cancel();
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
trySchedule();
}
接下来就是trySchedule,接下来就是调用自身run方法,走runAsync(ObserveOnSubscriber),然后无限循环poll直到没有数据,然后onNext runAsync主要注意produced和requested.get()
总结:子线程生成一个128长度的缓存队列。被观察者发送数据,如果队列没满,就走onNext,满了就报错。主线程s.request来控制要取多少数据,不设置就永远没有onNext打印出来(有点类似于线程池)
由于观察者和被观察者处于不同线程,所以被观察者无法通过requested()知道观察者自身接收事件能力 可以定义一些边界条件emitter.requested()!=0,或者drop,直接不管
比如发送100,s.request(50),那么也就是说还会有50个在缓存队列里面。存在问题就是可能会超出缓存队列,可以用BackpressureStrategy.ERROR来处理等等