前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava2学习笔记(3)

RxJava2学习笔记(3)

作者头像
菩提树下的杨过
发布2018-04-17 14:37:07
1.4K0
发布2018-04-17 14:37:07
举报

上回继续,今天来学习下zip(打包)操作

一、zip操作

代码语言:javascript
复制
    @Test
    public void zipTest() {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; i < 10; i++) {
                emitter.onNext(100 + i);
            }
        }), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(new Character((char) (65 + i)));
            }
        }), (integer, character) -> integer + "" + character).subscribe(s -> System.out.println(s));
    }

zip字面意义,就是打包操作,把多个Obserable合并在一起,形成一个新的Obserable,类似文件1、文件2 ... 文件n,合成一个新文件。上面这段代码的输出:

代码语言:javascript
复制
100A
101B
102C
103D
104E

第1个生产者,发射了10个数字(100~109),第1个生产者发射了5个字符(A~E),合并处理时,会把 “数字+字符",变成一个新字符串,然后继续发射。注意:这里有一个类似"木桶原理",即决定一个木桶能盛多少水的,永远是最短的那块木头。10发A型子弹 + 5发B型子弹,按1:1来合成,最终只有得到5发新型子弹。

二、限流

生产者-消费者模型中,有可能会遇到这样一种情况:生产者精力旺盛,狂生产数据,然后消费者力不从心,根本来不及处理,这样上游就堵住了,严重的话,可能导致内存耗尽。最简单的办法,就是把来不及处理的内容给扔掉(即:丢弃策略)。刚刚提到的zip操作中的木桶原理,就可以派上用场了。

代码语言:javascript
复制
    @Test
    public void zipTest1() throws InterruptedException {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; ; i++) { //一直不停的发
                emitter.onNext(i);
            }
        }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(0); //这里技巧性的处理:发1个0过去
            }
        }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (Integer) i1 + (Integer) i2) //1个数字+0,不影响原值
                .subscribe(integer -> System.out.println(integer));

        Thread.sleep(200);
    }

  输出:

代码语言:javascript
复制
0
1
2
3
4

  如果是字符串,可以参考下面这样处理:

代码语言:javascript
复制
        Observable.zip(Observable.create(emitter -> {
                    for (int i = 0; ; i++) {
                        emitter.onNext("A" + i);
                    }
                }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
                    for (int i = 0; i < 5; i++) {
                        emitter.onNext("");
                    }
                }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (String) i1 + (String) i2)
                .subscribe(s -> System.out.println(s));
        Thread.sleep(200);

  输出:

代码语言:javascript
复制
A0
A1
A2
A3
A4

三、Flowable

刚才用zip这种"奇淫技巧"实现了限流,但其实rxjava还有更科学的做法(Flowable)。再思考一下“限流”这种场景,生产者太猛,一下喷出来的量太多,而消费者太弱,完全吸收不下。比较温和的方式,最好是生产者喷发前先问下消费者,你1次能接承受多大的量?我根据你的能力来调整(多么体贴)没错!rxjava就是这么体贴,你想到的,它也想到了。

代码语言:javascript
复制
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; ; i++) {
                emitter.onNext(i);
            }
        }, BackpressureStrategy.DROP) //这里的BackpressureStrategy.DROP是一种丢弃策略
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(2); //只接收2条信息
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError!");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

        Thread.sleep(1000);  

 注意: onSubscribe 里有一行s.request(2),相当于消费者在订阅时,告诉生产者,只能处理2条记录。然后跑起来,就真的只有2条输出了:

代码语言:javascript
复制
onNext->0
onNext->1

值得一提的是:剩下的消息,虽然消费者不再处理了,但是生产者实际上还会继续发的,大家可以在emitter.onNext(i)这后面,输入一行文字,自行测试。之所以这么设计,大家可以思考一下,因为一个生产者射出来的东西,可能有多个消费者在消费,如果因为某1个消费者说:哎呀,太多了,我消化不了,你赶紧停下! 然后生产者如果真的停下来,其它消费者可能就有意见了。 

但是,如果只有一个消费者的情况下,我们就是想让生产者严格按照消费者的处理能力来发送数据,该怎么做呢?先把上面这段代码加几行输出看看:

代码语言:javascript
复制
   @Test
    public void flowableTest() throws InterruptedException {
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i <= 20; i++) {
                System.out.println("requested=>" + emitter.requested());
                emitter.onNext(i);
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(5);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError=>" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
        Thread.sleep(1000);
    }

输出:

代码语言:javascript
复制
requested=>5
onNext->0
requested=>4
onNext->1
requested=>3
onNext->2
requested=>2
onNext->3
requested=>1
onNext->4
requested=>0
onError=>create: could not emit value due to lack of requests
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0

注意:当消费者设置了request(x)后,生产者里的requested值,就会设置成相应的x值(仅同步模式),然后每emitter.onNext()发一次数据,这个值就减少,第11,12行,当生产者emitter的requested值为0时,下游就开始报错了,也就是说这时已经达到了消费者的处理极限。利用这一点,就可以实现我们刚才说的小目标:

代码语言:javascript
复制
    @Test
    public void flowableTest() throws InterruptedException {
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i <= 20; i++) {
                System.out.println("requested=>" + emitter.requested());
                // 想想:为什么不用if这种判断方式?
//                if (emitter.requested()>0){
//                    emitter.onNext(i);
//                }
                while (emitter.requested() <= 0) {
                    Thread.sleep(10);//防止cpu占用过高
                    continue;
                }
                emitter.onNext(i);
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(5);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError=>" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
        Thread.sleep(1000);
    }

 输出:

代码语言:javascript
复制
requested=>5
onNext->0
requested=>4
onNext->1
requested=>3
onNext->2
requested=>2
onNext->3
requested=>1
onNext->4
requested=>0

上面这些都是同步情况下(即:生产者与消费者都在一个线程里)Flowable的处理方法,如果是在异步多线程情况下,我们来看看是否能继续适用:

代码语言:javascript
复制
@Test
    public void flowableTest() throws InterruptedException {
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i <= 200; i++) {
                System.out.println("requested=>" + emitter.requested());
                while (emitter.requested() <= 0) {
                    Thread.sleep(10);//防止cpu占用过高
                    continue;
                }
                emitter.onNext(i);
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread()) //生产者使用独立线程
                .observeOn(Schedulers.newThread()) //消费者使用独立线程
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(5);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError=>" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

        while (true) {
            Thread.sleep(1000);
        }
    }

输出:

代码语言:javascript
复制
requested=>128
requested=>127
requested=>126
requested=>125
requested=>124
requested=>123
requested=>122
requested=>121
requested=>120
requested=>119
requested=>118
requested=>117
requested=>116
requested=>115
requested=>114
requested=>113
requested=>112
requested=>111
requested=>110
requested=>109
requested=>108
requested=>107
requested=>106
requested=>105
requested=>104
onNext->0
requested=>103
onNext->1
requested=>102
onNext->2
requested=>101
onNext->3
requested=>100
onNext->4
requested=>99
requested=>98
requested=>97
requested=>96
requested=>95
requested=>94
requested=>93
requested=>92
requested=>91
requested=>90
requested=>89
requested=>88
requested=>87
requested=>86
requested=>85
requested=>84
requested=>83
requested=>82
requested=>81
requested=>80
requested=>79
requested=>78
requested=>77
requested=>76
requested=>75
requested=>74
requested=>73
requested=>72
requested=>71
requested=>70
requested=>69
requested=>68
requested=>67
requested=>66
requested=>65
requested=>64
requested=>63
requested=>62
requested=>61
requested=>60
requested=>59
requested=>58
requested=>57
requested=>56
requested=>55
requested=>54
requested=>53
requested=>52
requested=>51
requested=>50
requested=>49
requested=>48
requested=>47
requested=>46
requested=>45
requested=>44
requested=>43
requested=>42
requested=>41
requested=>40
requested=>39
requested=>38
requested=>37
requested=>36
requested=>35
requested=>34
requested=>33
requested=>32
requested=>31
requested=>30
requested=>29
requested=>28
requested=>27
requested=>26
requested=>25
requested=>24
requested=>23
requested=>22
requested=>21
requested=>20
requested=>19
requested=>18
requested=>17
requested=>16
requested=>15
requested=>14
requested=>13
requested=>12
requested=>11
requested=>10
requested=>9
requested=>8
requested=>7
requested=>6
requested=>5
requested=>4
requested=>3
requested=>2
requested=>1
requested=>0  

可以发现,之前的套路不管用了,生产者还是在一直持续不停的发送,但是并没有发射满200次,而是正好等于缓冲区大小128(关于128这个数字,可参考本文最后的参考文章)。

先来解决异步场景下,生产者为啥不能发送超过128条消息的问题,把上面的问题略改一下:

代码语言:javascript
复制
@Test
    public void flowableTest() throws InterruptedException {
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i <= 200; i++) {
                System.out.println("requested=>" + emitter.requested());
                while (emitter.requested() <= 0) {
                    Thread.sleep(10);
                    continue;
                }
                emitter.onNext(i);
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(96); //神奇的96
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError=>" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

        while (true) {
            Thread.sleep(1000);
        }
    }

  注意:19行,消费者一上来,就设置request(96),这是一个神奇的数字96,低于这个值,生产者仍然只能最多发送128个事件,达到这个值,生产者就可以继续发送(详情分析见本文最后的参考文章)。输出如下:

代码语言:javascript
复制
requested=>128
requested=>127
requested=>126
requested=>125
requested=>124
requested=>123
requested=>122
requested=>121
requested=>120
requested=>119
requested=>118
requested=>117
requested=>116
requested=>115
requested=>114
requested=>113
requested=>112
requested=>111
requested=>110
requested=>109
requested=>108
requested=>107
requested=>106
requested=>105
requested=>104
requested=>103
requested=>102
requested=>101
onNext->0
requested=>100
onNext->1
requested=>99
onNext->2
requested=>98
onNext->3
requested=>97
onNext->4
onNext->5
onNext->6
onNext->7
onNext->8
onNext->9
onNext->10
onNext->11
onNext->12
requested=>96
onNext->13
requested=>95
onNext->14
requested=>94
onNext->15
requested=>93
onNext->16
requested=>92
onNext->17
requested=>91
onNext->18
requested=>90
onNext->19
requested=>89
onNext->20
requested=>88
onNext->21
requested=>87
onNext->22
requested=>86
onNext->23
requested=>85
onNext->24
requested=>84
onNext->25
requested=>83
onNext->26
requested=>82
onNext->27
requested=>81
requested=>80
requested=>79
onNext->28
requested=>78
requested=>77
requested=>76
onNext->29
requested=>75
onNext->30
requested=>74
onNext->31
requested=>73
onNext->32
requested=>72
onNext->33
requested=>71
onNext->34
requested=>70
onNext->35
requested=>69
onNext->36
requested=>68
onNext->37
requested=>67
onNext->38
requested=>66
onNext->39
requested=>65
onNext->40
requested=>64
onNext->41
requested=>63
onNext->42
requested=>62
onNext->43
requested=>61
requested=>60
requested=>59
requested=>58
onNext->44
requested=>57
requested=>56
requested=>55
requested=>54
requested=>53
onNext->45
requested=>52
onNext->46
requested=>51
onNext->47
requested=>50
onNext->48
requested=>49
onNext->49
requested=>48
onNext->50
requested=>47
onNext->51
requested=>46
onNext->52
requested=>45
onNext->53
requested=>44
onNext->54
requested=>43
onNext->55
requested=>42
onNext->56
requested=>41
onNext->57
requested=>40
onNext->58
requested=>39
onNext->59
onNext->60
requested=>38
onNext->61
requested=>37
onNext->62
requested=>36
onNext->63
requested=>35
onNext->64
requested=>34
onNext->65
requested=>33
onNext->66
requested=>32
onNext->67
requested=>31
onNext->68
requested=>30
onNext->69
requested=>29
onNext->70
requested=>28
onNext->71
requested=>27
onNext->72
requested=>26
onNext->73
requested=>25
onNext->74
requested=>24
onNext->75
requested=>23
onNext->76
requested=>22
onNext->77
requested=>21
onNext->78
requested=>20
onNext->79
requested=>19
onNext->80
requested=>18
onNext->81
requested=>17
onNext->82
requested=>16
onNext->83
requested=>15
onNext->84
requested=>14
onNext->85
requested=>13
onNext->86
requested=>12
onNext->87
requested=>11
onNext->88
requested=>10
onNext->89
requested=>9
onNext->90
requested=>8
onNext->91
requested=>7
onNext->92
requested=>6
onNext->93
requested=>5
onNext->94
requested=>4
onNext->95
requested=>3
requested=>98
requested=>97
requested=>96
requested=>95
requested=>94
requested=>93
requested=>92
requested=>91
requested=>90
requested=>89
requested=>88
requested=>87
requested=>86
requested=>85
requested=>84
requested=>83
requested=>82
requested=>81
requested=>80
requested=>79
requested=>78
requested=>77
requested=>76
requested=>75
requested=>74
requested=>73
requested=>72
requested=>71
requested=>70
requested=>69
requested=>68
requested=>67
requested=>66
requested=>65
requested=>64
requested=>63
requested=>62
requested=>61
requested=>60
requested=>59
requested=>58
requested=>57
requested=>56
requested=>55
requested=>54
requested=>53
requested=>52
requested=>51
requested=>50
requested=>49
requested=>48
requested=>47
requested=>46
requested=>45
requested=>44
requested=>43
requested=>42
requested=>41
requested=>40
requested=>39
requested=>38
requested=>37
requested=>36
requested=>35
requested=>34
requested=>33
requested=>32
requested=>31
requested=>30
requested=>29
requested=>28
requested=>27
requested=>26
requested=>25
requested=>24

  注意223行,requested值下降后,又开始回升了。

参考文章:

https://www.jianshu.com/p/9b1304435564

https://www.jianshu.com/p/a75ecf461e02

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-03-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档