前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊reactive streams的parallel flux

聊聊reactive streams的parallel flux

作者头像
code4it
发布2018-09-17 15:40:19
1.5K0
发布2018-09-17 15:40:19
举报
文章被收录于专栏:码匠的流水账

本文主要研究下reactive streams的flux的parallel运行方式.

目的

在一些涉及IO操作,比如读取文件,访问数据库等,通常建议使用异步线程以parallel模式运行,以提升性能。

实例

代码语言:javascript
复制
    @Test
    public void testParallelRunOn(){
        Flux.range(1, 1000)
                .log()
                .parallel(8)
                .runOn(Schedulers.parallel()) //parallel flux
                .sequential() //必须使用sequential来将这些异步线程的执行结果汇集成一个stream
                .map(e -> {
                    LOGGER.info("map thread:{},e:{}",Thread.currentThread().getName(),e);
                    return e*10;
                })
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:{},e:{}",Thread.currentThread().getName(),e);
                });
    }

部分输出

代码语言:javascript
复制
2:38:53.949 [main] INFO reactor.Flux.Range.1 - | onNext(13)
22:38:53.949 [parallel-2] INFO com.example.demo.ParallelTest - subscribe thread:parallel-2,e:120
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(14)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:13
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:130
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(15)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:14
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:140
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(16)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:15
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:150
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(17)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:16
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(18)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:160
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:17
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(19)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:170
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:18

小结

  • parallel来指定线程池线程个数
  • runOn启动parallel flux
  • sequential将异步线程池执行结果汇集成一个stream

doc

  • advanced-parallelizing-parralelflux
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-01-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目的
  • 实例
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档