前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >响应式并发批处理

响应式并发批处理

作者头像
dhyuan
发布2022-05-30 14:30:51
4480
发布2022-05-30 14:30:51
举报
文章被收录于专栏:响应式编程

假设DataProcessor接口定义了方法batchProcess能够对一批数据进行处理,一批处理500个数据。现在我们需要对一个响应式数据流 Flux dataItems 调用 batchProcess() 进行处理。

代码语言:javascript
复制
public interface DataProcessor {
  Mono<String> batchProcess(List<DataItem> dataItems);
  ... ...
}


DataProcessor dataProcessor = ...;

int batchSize = 500;

Flux<DataItem> dataItems = ...

下面分别串行和并行的方式展示一下Reactor API的使用。

1)攒够 batchSize 个数据后进行处理。

这里关键是buffer方法的使用。

代码语言:javascript
复制
Mono<List<String>> result = dataItems.buffer(batchSize)
    .flatMap(dataProcessor::batchProcess)
    .collectList();

2)以并行的方式,把流分成10股,每股攒够 batchSize 个数据后进行处理。

代码语言:javascript
复制
Mono<List<String>> result = dataItems
    .parallel(10)
    .runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(10)))
    .groups()
    .flatMap(g -> g.buffer(batchSize).flatMap(dataProcessor::batchProcess))
    .collectList();

这里runOn接收的参数可以是Schedulers不同策略的实现,具有不同适用范围,比如适合计算密集型的ParallelScheduler、单线程的SingleScheduler。这里使用的是Executors FixedThreadPool。

可以想象如果我们自己实现这样一个处理逻辑的复杂度,而通过reactor api,仅仅几行代码就完成了这么复杂高效的处理。

3)使用 reactive mongo driver需要的线程。

Spring默认到monog的链接池最大为100,但是实际上在使用reactive方式访问时使用20~10个左右的线程就足够了。因此对mongog的连接串最好明确使用适合自己情况的连接数以避免连接浪费或不够。 测试了一个70万条、大概250M数据的批量插入,发现无论使用串行还是并行,数据库插入时间都差不多(36s~26s)。而连接池最大连接设为200、100、50、20、10对数据库插入的性能也没有太大影响,200个线程时反而有一点下降。这个情况从mongo响应式驱动的角度去解释是完全可以理解的,如果使用传统驱动,恐怕所需的线程就不是这个量级的了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 响应式编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1)攒够 batchSize 个数据后进行处理。
  • 2)以并行的方式,把流分成10股,每股攒够 batchSize 个数据后进行处理。
  • 3)使用 reactive mongo driver需要的线程。
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档