RxJava2学习笔记(3)

上回继续,今天来学习下zip(打包)操作

一、zip操作

    @Test
    public void zipTest() {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; i < 10; i++) {
                emitter.onNext(100 + i);
            }
        }), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(new Character((char) (65 + i)));
            }
        }), (integer, character) -> integer + "" + character).subscribe(s -> System.out.println(s));
    }

zip字面意义,就是打包操作,把多个Obserable合并在一起,形成一个新的Obserable,类似文件1、文件2 ... 文件n,合成一个新文件。上面这段代码的输出:

100A
101B
102C
103D
104E

第1个生产者,发射了10个数字(100~109),第1个生产者发射了5个字符(A~E),合并处理时,会把 “数字+字符",变成一个新字符串,然后继续发射。注意:这里有一个类似"木桶原理",即决定一个木桶能盛多少水的,永远是最短的那块木头。10发A型子弹 + 5发B型子弹,按1:1来合成,最终只有得到5发新型子弹。

二、限流

生产者-消费者模型中,有可能会遇到这样一种情况:生产者精力旺盛,狂生产数据,然后消费者力不从心,根本来不及处理,这样上游就堵住了,严重的话,可能导致内存耗尽。最简单的办法,就是把来不及处理的内容给扔掉(即:丢弃策略)。刚刚提到的zip操作中的木桶原理,就可以派上用场了。

    @Test
    public void zipTest1() throws InterruptedException {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; ; i++) { //一直不停的发
                emitter.onNext(i);
            }
        }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(0); //这里技巧性的处理:发1个0过去
            }
        }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (Integer) i1 + (Integer) i2) //1个数字+0,不影响原值
                .subscribe(integer -> System.out.println(integer));

        Thread.sleep(200);
    }

  输出:

0
1
2
3
4

  如果是字符串,可以参考下面这样处理:

        Observable.zip(Observable.create(emitter -> {
                    for (int i = 0; ; i++) {
                        emitter.onNext("A" + i);
                    }
                }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
                    for (int i = 0; i < 5; i++) {
                        emitter.onNext("");
                    }
                }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (String) i1 + (String) i2)
                .subscribe(s -> System.out.println(s));
        Thread.sleep(200);

  输出:

A0
A1
A2
A3
A4

三、Flowable

刚才用zip这种"奇淫技巧"实现了限流,但其实rxjava还有更科学的做法(Flowable)。再思考一下“限流”这种场景,生产者太猛,一下喷出来的量太多,而消费者太弱,完全吸收不下。比较温和的方式,最好是生产者喷发前先问下消费者,你1次能接承受多大的量?我根据你的能力来调整(多么体贴)没错!rxjava就是这么体贴,你想到的,它也想到了。

        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; ; i++) {
                emitter.onNext(i);
            }
        }, BackpressureStrategy.DROP) //这里的BackpressureStrategy.DROP是一种丢弃策略
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(2); //只接收2条信息
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError!");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

        Thread.sleep(1000);  

 注意: onSubscribe 里有一行s.request(2),相当于消费者在订阅时,告诉生产者,只能处理2条记录。然后跑起来,就真的只有2条输出了:

onNext->0
onNext->1

值得一提的是:剩下的消息,虽然消费者不再处理了,但是生产者实际上还会继续发的,大家可以在emitter.onNext(i)这后面,输入一行文字,自行测试。之所以这么设计,大家可以思考一下,因为一个生产者射出来的东西,可能有多个消费者在消费,如果因为某1个消费者说:哎呀,太多了,我消化不了,你赶紧停下! 然后生产者如果真的停下来,其它消费者可能就有意见了。 

但是,如果只有一个消费者的情况下,我们就是想让生产者严格按照消费者的处理能力来发送数据,该怎么做呢?先把上面这段代码加几行输出看看:

   @Test
    public void flowableTest() throws InterruptedException {
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i <= 20; i++) {
                System.out.println("requested=>" + emitter.requested());
                emitter.onNext(i);
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(5);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError=>" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
        Thread.sleep(1000);
    }

输出:

requested=>5
onNext->0
requested=>4
onNext->1
requested=>3
onNext->2
requested=>2
onNext->3
requested=>1
onNext->4
requested=>0
onError=>create: could not emit value due to lack of requests
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0
requested=>0

注意:当消费者设置了request(x)后,生产者里的requested值,就会设置成相应的x值(仅同步模式),然后每emitter.onNext()发一次数据,这个值就减少,第11,12行,当生产者emitter的requested值为0时,下游就开始报错了,也就是说这时已经达到了消费者的处理极限。利用这一点,就可以实现我们刚才说的小目标:

    @Test
    public void flowableTest() throws InterruptedException {
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i <= 20; i++) {
                System.out.println("requested=>" + emitter.requested());
                // 想想:为什么不用if这种判断方式?
//                if (emitter.requested()>0){
//                    emitter.onNext(i);
//                }
                while (emitter.requested() <= 0) {
                    Thread.sleep(10);//防止cpu占用过高
                    continue;
                }
                emitter.onNext(i);
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(5);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError=>" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
        Thread.sleep(1000);
    }

 输出:

requested=>5
onNext->0
requested=>4
onNext->1
requested=>3
onNext->2
requested=>2
onNext->3
requested=>1
onNext->4
requested=>0

上面这些都是同步情况下(即:生产者与消费者都在一个线程里)Flowable的处理方法,如果是在异步多线程情况下,我们来看看是否能继续适用:

@Test
    public void flowableTest() throws InterruptedException {
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i <= 200; i++) {
                System.out.println("requested=>" + emitter.requested());
                while (emitter.requested() <= 0) {
                    Thread.sleep(10);//防止cpu占用过高
                    continue;
                }
                emitter.onNext(i);
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread()) //生产者使用独立线程
                .observeOn(Schedulers.newThread()) //消费者使用独立线程
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(5);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError=>" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

        while (true) {
            Thread.sleep(1000);
        }
    }

输出:

requested=>128
requested=>127
requested=>126
requested=>125
requested=>124
requested=>123
requested=>122
requested=>121
requested=>120
requested=>119
requested=>118
requested=>117
requested=>116
requested=>115
requested=>114
requested=>113
requested=>112
requested=>111
requested=>110
requested=>109
requested=>108
requested=>107
requested=>106
requested=>105
requested=>104
onNext->0
requested=>103
onNext->1
requested=>102
onNext->2
requested=>101
onNext->3
requested=>100
onNext->4
requested=>99
requested=>98
requested=>97
requested=>96
requested=>95
requested=>94
requested=>93
requested=>92
requested=>91
requested=>90
requested=>89
requested=>88
requested=>87
requested=>86
requested=>85
requested=>84
requested=>83
requested=>82
requested=>81
requested=>80
requested=>79
requested=>78
requested=>77
requested=>76
requested=>75
requested=>74
requested=>73
requested=>72
requested=>71
requested=>70
requested=>69
requested=>68
requested=>67
requested=>66
requested=>65
requested=>64
requested=>63
requested=>62
requested=>61
requested=>60
requested=>59
requested=>58
requested=>57
requested=>56
requested=>55
requested=>54
requested=>53
requested=>52
requested=>51
requested=>50
requested=>49
requested=>48
requested=>47
requested=>46
requested=>45
requested=>44
requested=>43
requested=>42
requested=>41
requested=>40
requested=>39
requested=>38
requested=>37
requested=>36
requested=>35
requested=>34
requested=>33
requested=>32
requested=>31
requested=>30
requested=>29
requested=>28
requested=>27
requested=>26
requested=>25
requested=>24
requested=>23
requested=>22
requested=>21
requested=>20
requested=>19
requested=>18
requested=>17
requested=>16
requested=>15
requested=>14
requested=>13
requested=>12
requested=>11
requested=>10
requested=>9
requested=>8
requested=>7
requested=>6
requested=>5
requested=>4
requested=>3
requested=>2
requested=>1
requested=>0  

可以发现,之前的套路不管用了,生产者还是在一直持续不停的发送,但是并没有发射满200次,而是正好等于缓冲区大小128(关于128这个数字,可参考本文最后的参考文章)。

先来解决异步场景下,生产者为啥不能发送超过128条消息的问题,把上面的问题略改一下:

@Test
    public void flowableTest() throws InterruptedException {
        Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i <= 200; i++) {
                System.out.println("requested=>" + emitter.requested());
                while (emitter.requested() <= 0) {
                    Thread.sleep(10);
                    continue;
                }
                emitter.onNext(i);
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(96); //神奇的96
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError=>" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

        while (true) {
            Thread.sleep(1000);
        }
    }

  注意:19行,消费者一上来,就设置request(96),这是一个神奇的数字96,低于这个值,生产者仍然只能最多发送128个事件,达到这个值,生产者就可以继续发送(详情分析见本文最后的参考文章)。输出如下:

requested=>128
requested=>127
requested=>126
requested=>125
requested=>124
requested=>123
requested=>122
requested=>121
requested=>120
requested=>119
requested=>118
requested=>117
requested=>116
requested=>115
requested=>114
requested=>113
requested=>112
requested=>111
requested=>110
requested=>109
requested=>108
requested=>107
requested=>106
requested=>105
requested=>104
requested=>103
requested=>102
requested=>101
onNext->0
requested=>100
onNext->1
requested=>99
onNext->2
requested=>98
onNext->3
requested=>97
onNext->4
onNext->5
onNext->6
onNext->7
onNext->8
onNext->9
onNext->10
onNext->11
onNext->12
requested=>96
onNext->13
requested=>95
onNext->14
requested=>94
onNext->15
requested=>93
onNext->16
requested=>92
onNext->17
requested=>91
onNext->18
requested=>90
onNext->19
requested=>89
onNext->20
requested=>88
onNext->21
requested=>87
onNext->22
requested=>86
onNext->23
requested=>85
onNext->24
requested=>84
onNext->25
requested=>83
onNext->26
requested=>82
onNext->27
requested=>81
requested=>80
requested=>79
onNext->28
requested=>78
requested=>77
requested=>76
onNext->29
requested=>75
onNext->30
requested=>74
onNext->31
requested=>73
onNext->32
requested=>72
onNext->33
requested=>71
onNext->34
requested=>70
onNext->35
requested=>69
onNext->36
requested=>68
onNext->37
requested=>67
onNext->38
requested=>66
onNext->39
requested=>65
onNext->40
requested=>64
onNext->41
requested=>63
onNext->42
requested=>62
onNext->43
requested=>61
requested=>60
requested=>59
requested=>58
onNext->44
requested=>57
requested=>56
requested=>55
requested=>54
requested=>53
onNext->45
requested=>52
onNext->46
requested=>51
onNext->47
requested=>50
onNext->48
requested=>49
onNext->49
requested=>48
onNext->50
requested=>47
onNext->51
requested=>46
onNext->52
requested=>45
onNext->53
requested=>44
onNext->54
requested=>43
onNext->55
requested=>42
onNext->56
requested=>41
onNext->57
requested=>40
onNext->58
requested=>39
onNext->59
onNext->60
requested=>38
onNext->61
requested=>37
onNext->62
requested=>36
onNext->63
requested=>35
onNext->64
requested=>34
onNext->65
requested=>33
onNext->66
requested=>32
onNext->67
requested=>31
onNext->68
requested=>30
onNext->69
requested=>29
onNext->70
requested=>28
onNext->71
requested=>27
onNext->72
requested=>26
onNext->73
requested=>25
onNext->74
requested=>24
onNext->75
requested=>23
onNext->76
requested=>22
onNext->77
requested=>21
onNext->78
requested=>20
onNext->79
requested=>19
onNext->80
requested=>18
onNext->81
requested=>17
onNext->82
requested=>16
onNext->83
requested=>15
onNext->84
requested=>14
onNext->85
requested=>13
onNext->86
requested=>12
onNext->87
requested=>11
onNext->88
requested=>10
onNext->89
requested=>9
onNext->90
requested=>8
onNext->91
requested=>7
onNext->92
requested=>6
onNext->93
requested=>5
onNext->94
requested=>4
onNext->95
requested=>3
requested=>98
requested=>97
requested=>96
requested=>95
requested=>94
requested=>93
requested=>92
requested=>91
requested=>90
requested=>89
requested=>88
requested=>87
requested=>86
requested=>85
requested=>84
requested=>83
requested=>82
requested=>81
requested=>80
requested=>79
requested=>78
requested=>77
requested=>76
requested=>75
requested=>74
requested=>73
requested=>72
requested=>71
requested=>70
requested=>69
requested=>68
requested=>67
requested=>66
requested=>65
requested=>64
requested=>63
requested=>62
requested=>61
requested=>60
requested=>59
requested=>58
requested=>57
requested=>56
requested=>55
requested=>54
requested=>53
requested=>52
requested=>51
requested=>50
requested=>49
requested=>48
requested=>47
requested=>46
requested=>45
requested=>44
requested=>43
requested=>42
requested=>41
requested=>40
requested=>39
requested=>38
requested=>37
requested=>36
requested=>35
requested=>34
requested=>33
requested=>32
requested=>31
requested=>30
requested=>29
requested=>28
requested=>27
requested=>26
requested=>25
requested=>24

  注意223行,requested值下降后,又开始回升了。

参考文章:

https://www.jianshu.com/p/9b1304435564

https://www.jianshu.com/p/a75ecf461e02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏BaronTalk

RxJava系列六(从微观角度解读RxJava源码)

前言 通过前面五个篇幅的介绍,相信大家对RxJava的基本使用以及操作符应该有了一定的认识。但是知其然还要知其所以然;所以从这一章开始我们聊聊源码,分析RxJ...

4267
来自专栏Python疯子

python hashlib模块

hashlib主要提供字符加密功能,将md5和sha模块整合到了一起,支持md5,sha1, sha224, sha256, sha384, sha512等算法

1262
来自专栏C#

DotNet加密方式解析--数字签名

    马上就要过年回村里了,村里没有wifi,没有4G,没有流量,更加重要的是过几天电脑就得卖掉换车票了,得赶紧写几篇文章。 ?     数据安全的相关技术在...

3217
来自专栏Android开发指南

6.android加密解析

39610
来自专栏XAI

Java生成SM2证书基于BouncyCastle(cer)

Java生成SM2证书基于BouncyCastle(cer) 可以先加QQ 783021975 咨询相关问题。代码后续会更新一部分 整理中。完全是Java代码调...

2.4K5
来自专栏小工匠技术圈

【小工匠聊密码学】--消息摘要--HMAC算法

  HMAC是密钥相关的消息认证码,HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出。

1121
来自专栏技术博文

PHP的几个常用加密函数

在php的开发过程中,常常需要对部分数据(如用户密码)进行加密 一、加密类型: 1.单向散列加密   就是把任意长度的信息进行散列计算,得到固定长度的输出,这个...

4678
来自专栏三流程序员的挣扎

RxJava 创建操作符

内部触发对 Observer 的 onNext 方法的调用,just 中传递的参数将直接在 onNext 方法中接收到,参数的类型要和 Observer 的泛型...

2461
来自专栏三流程序员的挣扎

RxJava 连接操作符

看注释意思是将所有数据按原来的顺序缓存起来,就是不知道观察者什么时候订阅,什么时候解除订阅,所以缓存起来,以后直接用。

2352
来自专栏V站

php中常用的Rc4/Des/AuthCode可逆加密函数

首先是AuthCode可逆加密函数,在dz论坛等各大程序的数据传输和登陆验证都有用到。

2885

扫码关注云+社区

领取腾讯云代金券