前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava2.x 的并行编程

RxJava2.x 的并行编程

作者头像
程序员飞飞
发布2020-02-27 17:12:31
9610
发布2020-02-27 17:12:31
举报
文章被收录于专栏:Android&Java技术Android&Java技术

题图:来自飞哥的图片工厂

音乐推荐:你的姑娘 文丨IT大飞说 预计阅读时间:1.2 分钟

哈喽,朋友们,之前我们学习了一些 RxJava2.x 的常用操作符,今天我们来继续学习一下RxJava 的并行编程。

随着手机 CPU 的高速发展,性能越来越强劲,核心数越来越多,我们要充分、高效地利用这些 CPU 资源,来提高程序运行的效率,解决复杂的业务问题,这将变得越来越重要。

1.什么是并行编程?

对于并发我们可能比较清楚,那么并行是什么呢?它们的区别是什么?并发(concurrency)是指一个处理器同时处理多个任务,并行(parallelism)是多个处理器或者是多核处理器同时处理多个不同的任务,并行是同时发生的多个并发事件,具有并发的含义,而并发不一定是并行。

在 Java 8 中有个并行流(parallelStream),有的同学可能用过,我们想使用并行流的方式打印出 1-100 之间的整数,来看下面的代码:

代码语言:javascript
复制
private void parallelismWithJava8() {
        List<Integer> list = new ArrayList<>();

        for (int i = 1; i <= 100; i++) {
            list.add(i);
        }

        list.parallelStream()
                .map(Object::toString)
                .forEach(s -> LogUtil.i(TAG, "s=" + s + ",Current Thread Name=" + Thread.currentThread().getName()));
    }

上面的结果会交错输出 1-100 之间的整数,因为并行的缘故所以每个输出执行的时间可能不一样,所以会交错输出,其实上面的代码是 Java 8 借助了 JDK 的 fork/join 框架来实现并行编程的。

2.使用 RxJava 的 flatMap 实现并行编程

我们前面学习过 flatMap 操作符,我们知道 flatMap 可以将一些数据转换成一些 Observables,然后我们可以指定它的调度器来实现并行编程的目的,还是打印 1-100 的数字,直接看代码吧:

代码语言:javascript
复制
    private void parallelismWithRxJavaFlatMap() {
        Observable.range(1, 100)
                .flatMap((Function<Integer, ObservableSource<String>>) integer ->
                        Observable.just(integer)
                                .subscribeOn(Schedulers.computation())
                                .map(integer1 -> integer1.toString()))
                .subscribe(s -> LogUtil.i(TAG, "s===" + s));
    }

这种方式使用的是默认的调度器,当然我们也可以创建一个线程池,来自定义调度器,修改后的代码如下:

代码语言:javascript
复制
    private void parallelismWithRxJavaFlatMap() {
        int threadNum = Runtime.getRuntime().availableProcessors() + 1;
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executorService);

        Observable.range(1, 100)
                .flatMap((Function<Integer, ObservableSource<String>>) integer ->
                        Observable.just(integer)
                                .subscribeOn(scheduler)
                                .map(integer1 -> integer1.toString()))
                .doFinally(() -> executorService.shutdown())
                .subscribe(s -> LogUtil.i(TAG, "s===" + s));
    }

这 2 种实现方式都差不多,性能方面也差别不大,根据喜好请自行选择,注意一点,如果使用自己创建的线程池,那么记得使用 doFinally 操作符将线程池关闭,或者由下游的消费者进行处理!

3.使用 ParallelFlowable 实现并行编程

Flowable 是 RxJava2.x 新增的被观察者,支持背压,因此它对应的并行被观察者为 ParallelFlowable,因为并行编程肯定涉及到异步,而异步又涉及到背压,所以是没有 ParallelObservable 的。

我们还是用 ParallelFlowable 来实现打印 1-100 这个需求吧,请看代码:

代码语言:javascript
复制
    private void parallelFlowable() {
        ParallelFlowable<Integer> parallelFlowable = Flowable.range(1, 100).parallel();

        parallelFlowable
                .runOn(Schedulers.io())
                .map(integer -> integer.toString())
                .sequential()
                .subscribe(s -> LogUtil.i(TAG, "s===" + s));
    }

其实代码是比较简单的,我们发现 runOn 和 sequential 操作符我们之前没见过,这里解释下,runOn 其实就是相当于我们上面使用 flatMap 实现中的 subscribeOn,我们可以用它来定义异步,它还有一个重载的方法,我们可以指定 prefetch 的数量。

sequential 操作符是将并行的操作结果返回到并行流中,这样,才能打印出所有的输出结果。

我们上面学会了好几种并行编程的方式,那么我们在实际的开发中应该选择哪种呢?

并非所有的顺序操作在并行中都是有意义的,目前 ParallelFlowable 只支持如下操作: map、filter、flatMap、concatMap、reduce、collect、sorted、toSortedList、compose、fromArray、doOnCancel、doOnError、doOnComplete、doOnNext、doAfterNext、doOnSubscribe、doAfterTerminated、doOnRequest,优先推荐使用 ParallelFlowable 实现并行编程,对于无法使用 ParallelFlowable 的操作符,则使用 flatMap 来实现。

好了,今天的学习内容就算完成了,感觉是不是很简单?实践证明,学完后动手敲一遍的效果是最好的,赶紧去动手敲一遍吧!

最后,我这边有个技术交流群,平常我会分享一些学习资源到群里,还可以和大家一起交流学习,需要的朋友可以扫描下面的二维码加我微信并备注「加群」,拉你进入技术交流群!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-06-04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.什么是并行编程?
  • 2.使用 RxJava 的 flatMap 实现并行编程
  • 3.使用 ParallelFlowable 实现并行编程
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档