前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >再忆RxJava---背压策略

再忆RxJava---背压策略

作者头像
提莫队长
发布2020-06-02 15:25:46
6630
发布2020-06-02 15:25:46
举报
文章被收录于专栏:刘晓杰

1 背压存在的背景

被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应或者处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM

2 背压策略的原理

  • 2.1 未雨绸缪(事情在还没有发生之前做一定的处理),一共有两种 (1)控制被观察者发送事件的速度---反馈控制 (2)控制观察者接收事件的速度---响应式拉取
  • 2.2 亡羊补牢(事情已经发生,如何补救)---对多余的数据进行有选择的抛弃,或者保留,或者报错

3 背压具体情况讨论

3.1 同步策略

代码语言:javascript
复制
        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) {
                Log.e("emitter", "发送1");
                emitter.onNext("111");
                Log.e("emitter", "发送2");
                emitter.onNext("222");
                Log.e("emitter", "发送3");
                emitter.onNext("333");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }

            @Override
            public void onNext(String s) {
                Log.e("emitter", "接受" + s);
            }

            @Override
            public void onError(Throwable t) {
                Log.e("onError", t.getLocalizedMessage());
            }

            @Override
            public void onComplete() {

            }
        });

其实对于同步而言,讨论背压毫无意义。emitter.onNext然后直接就是Subscriber.onNext,然后再下一个emitter.onNext。因为这是同步的,不存在缓存队列。就如例子而言,s.request(n),如果n小于3,会根据Error策略,直接走OnError方法(具体请看代码)。如果n大于3,是5,直接onComplete,不管有没有发送满5个 总的来说,同步并没有采用什么背压,如果非要说的话,那也是亡羊补牢式的

3.2 异步

先来看几段代码 FlowableCreate---NoOverflowBaseAsyncEmitter的onNext方法

代码语言:javascript
复制
        public final void onNext(T t) {
            。。。。。。
            if (get() != 0) {//get最初是128,也就是buffer-size,这是子线程
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

也就是发送的时候,超过128个数据,就走onError,没有就往下一个onNext走 (可以先看一下ObserveOnSubscriber的onSubscribe函数,里面有queue的构造,以及sourceMode其实并没有赋值) 再来看BaseObserveOnSubscriber的onNext方法

代码语言:javascript
复制
        @Override
        public final void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode == ASYNC) {
                trySchedule();
                return;
            }
            if (!queue.offer(t)) {//这个queue就是FlowableObserveOn的构造函数中的prefetch大小的一个队列。这里默认是128
                //也就是最上面get为什么是128的原因
                //此时还没到Handler,所以还是子线程
                upstream.cancel();
                error = new MissingBackpressureException("Queue is full?!");
                done = true;
            }
            trySchedule();
        }

接下来就是trySchedule,接下来就是调用自身run方法,走runAsync(ObserveOnSubscriber),然后无限循环poll直到没有数据,然后onNext runAsync主要注意produced和requested.get()

  • requested.get()就是自己定义的s.request,如果不定义就永远没有onNext
  • produced就是已经onNext出去的数据个数

总结:子线程生成一个128长度的缓存队列。被观察者发送数据,如果队列没满,就走onNext,满了就报错。主线程s.request来控制要取多少数据,不设置就永远没有onNext打印出来(有点类似于线程池)

3.2.1 控制被观察者发送事件的速度---反馈控制

由于观察者和被观察者处于不同线程,所以被观察者无法通过requested()知道观察者自身接收事件能力 可以定义一些边界条件emitter.requested()!=0,或者drop,直接不管

3.2.2 控制观察者接收事件的速度---响应式拉取

比如发送100,s.request(50),那么也就是说还会有50个在缓存队列里面。存在问题就是可能会超出缓存队列,可以用BackpressureStrategy.ERROR来处理等等

参考文献 https://www.jianshu.com/p/ceb48ed8719d

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 背压存在的背景
  • 2 背压策略的原理
  • 3 背压具体情况讨论
    • 3.1 同步策略
      • 3.2 异步
        • 3.2.1 控制被观察者发送事件的速度---反馈控制
          • 3.2.2 控制观察者接收事件的速度---响应式拉取
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档