Rxjava2学习

1.去重

Flowable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "distinct : " + integer + "n");
                    }
                });

2.每次用一个方法处理一个值.这里是两两相加

Flowable.just(1, 2, 3, 4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: reduce : " + integer + "n");
            }
        });

3.跳过 count 个数目开始接收

Flowable.just(1, 2, 3, 4, 5)
                .skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "skip : " + integer + "n");
                    }
                });

4.至多接收 count 个数据

Flowable.fromArray(1, 2, 3, 4, 5)
                .take(3)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "accept: take : " + integer + "n");
                    }
                });

5.和少的配对

Flowable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
            @Override
            public String apply(String s, Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "zip : accept : " + s + "n");
            }
        });


private Flowable<String> getStringObservable() {
        return Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                e.onNext("A");
                Log.e(TAG, "String emit : A n");
                e.onNext("B");
                Log.e(TAG, "String emit : B n");
                e.onNext("C");
                Log.e(TAG, "String emit : C n");
            }
        }, BackpressureStrategy.BUFFER);
    }

    private Flowable<Integer> getIntegerObservable() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                Log.e(TAG, "Integer emit : 1 n");
                e.onNext(2);
                Log.e(TAG, "Integer emit : 2 n");
                e.onNext(3);
                Log.e(TAG, "Integer emit : 3 n");
                e.onNext(4);
                Log.e(TAG, "Integer emit : 4 n");
                e.onNext(5);
                Log.e(TAG, "Integer emit : 5 n");
            }
        }, BackpressureStrategy.BUFFER);
    }

6.merge的作用是把多个 Observable 结合起来.

它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送

Flowable.merge(Flowable.just(1, 2), Flowable.just(3, 4, 5))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("accept", "merge :" + integer + "n");
                    }
                });

7.一对一

Flowable.just(250)
                //这个第一个泛型为接收参数的数据类型,第二个泛型为转换后要发射的数据类型
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer s) throws Exception {
                        return "你是" + s;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("consumer", s);
                    }
                });

8.1对多

ArrayList<String[]> list = new ArrayList<>();
        String[] words1 = {"Hello,", "I am", "China!"};
        String[] words2 = {"Hello,", "I am", "Beijing!"};
        list.add(words1);
        list.add(words2);
        Flowable.fromIterable(list)
                .flatMap(new Function<String[], Publisher<String>>() {
                    @Override
                    public Publisher<String> apply(String[] strings) throws Exception {
                        return Flowable.fromArray(strings[0] + strings[1] + strings[2]);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("consumer", s);
                    }
                });

9.构造函数

//创建订阅者
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                //这一步是必须,我们通常可以在这里做一些初始化操作,调用request()方法表示初始化工作已经完成
                //调用request()方法,会立即触发onNext()方法---不调用的话会卡住,onNext无法调用
                Log.e("onSubscribe", "onSubscribe");
                Log.e("onSubscribe", Thread.currentThread().getName());
                //s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String value) {
                Log.e("onNext", value);
                Log.e("onNext", Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable t) {
                Log.e("onError", t.getMessage());
            }

            @Override
            public void onComplete() {
                //由于Reactive-Streams的兼容性,方法onCompleted被重命名为onComplete
                Log.e("onComplete", "onComplete");
                Log.e("onComplete", Thread.currentThread().getName());
            }
        };

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                Log.e("subscribe", Thread.currentThread().getName());
                e.onNext("Hello,I am China!");
                //e.onError(new Throwable("发生错误啦"));
                //onError,onComplete二选一
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subscriber);

10.amb操作符只发射首先发射数据或通知的那个Observable的所有数据

ArrayList<Flowable<String>> list = new ArrayList();
        list.add(Flowable.just("FIRST").delay(2, TimeUnit.SECONDS));
        list.add(Flowable.just("SECOND"));
        Flowable.amb(list)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        showMsg(s);//SECOND
                    }
                });

11.多个edittext来决定是否激活button—combineLatest

这个只有1.0有效

// 1.0
        Observable<CharSequence> ObservableEmail = RxTextView.textChanges(mEmailView);
        Observable<CharSequence> ObservablePassword = RxTextView.textChanges(mPasswordView);
        ArrayList<Observable<CharSequence>> date = new ArrayList<>();
        date.add(ObservableEmail);
        date.add(ObservablePassword);
        //Function第一个参数必须是Object[]
        Observable.combineLatest(date, new Function<Object[], Boolean>() {
            @Override
            public Boolean apply(Object[] str) {
                return isEmailValid(str[0].toString()) && isPasswordValid(str[1].toString());
            }
        }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                if (aBoolean) {
                    showMsg("success");
                } else {
                    showMsg("fail");
                }
            }
        });

12.concat

Flowable.concat(Flowable.just("a"), Flowable.just("b"), Flowable.just("c")).subscribe(
                new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("TAG", s);
                    }
                });

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Android:Content Provider数据共享

    ContentProvider 属于Android应用程序的组件之一,作用是对外共享数据。我们可以通过ContentProvider把应用中的数据共享给其他应用...

    提莫队长
  • 20(数据库函数库)

    Figure 20.3. Create a database and write three records to it

    提莫队长
  • UNPv1第十四章:Unix域协议

    Unix域协议并不是一个实际的协议族,它只是在同一台主机上进行客户-服务器通信时,使用与在不同主机上的客户和服务器间通信时相同的API(套接口或XTI)的一种方...

    提莫队长
  • 游戏中的人物是如何寻路的?

    拾点阳光
  • a-start寻路算法

    在英雄联盟之中,当你和你的队友都苦苦修炼到十八级的时候,仍然与敌方阵营不分胜负,就在你刚买好装备已经神装的时候,你看见信息框中一条队友的消息:“大龙集合”,这个...

    拾点阳光
  • 游戏中的人物是如何寻路的?

    拾点阳光
  • BIBO稳定性的热带几何学方法(CS CG)

    给定Laurent多项式F及其变形虫AF,讨论涉及具有合理传递函数的多线性时定常系统的BIBO稳定性问题。我们针对原点0相对于变形虫AF的位置,针对BIBO强或...

    蔡秋纯
  • 情绪表达的多模态建模

    情绪表达是指到一个人倾向于通过行为向外表达他的情感的程度。由于情绪表达与行为健康的紧密的联系,及其在社交互动中的关键作用,自动预测情绪表达的能力将推动科学、医学...

    用户7724216
  • [译] 用 Typescript + Composition API 重构 Vue 3 组件

    原文:https://vuejs-course.com/blog/vuejs-3-typescript-options-composition-api

    江米小枣
  • 【DL笔记6】从此明白了卷积神经网络(CNN)

    从【DL笔记1】到【DL笔记N】,是我学习深度学习一路上的点点滴滴的记录,是从Coursera网课、各大博客、论文的学习以及自己的实践中总结而来。从基本的概念、...

    beyondGuo

扫码关注云+社区

领取腾讯云代金券