前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >IO密集型任务使用Java的parallelStream并行流,提高性能及隔离故障,如何自定义线程池

IO密集型任务使用Java的parallelStream并行流,提高性能及隔离故障,如何自定义线程池

作者头像
崔认知
发布2024-09-13 13:55:45
1260
发布2024-09-13 13:55:45
举报
文章被收录于专栏:nobody

在Java中,parallelStream 是 Java 8 引入的 Stream API 的一部分,它允许并行处理集合中的元素。默认情况下,parallelStream 共享使用默认的 ForkJoinPool 作为其线程池,可能对你的业务影响性能,而且起不到隔离的作用。所以我们需要自定义其使用的线程池。

下面列出几种方法设置线程池:

一、设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism,修改默认共享的ForkJoinPool 的并行数

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
        List<Integer> list = IntStream.range(1, 50).boxed().collect(Collectors.toList());
        list.parallelStream().forEach(t->{
            System.out.println(t+":"+ Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        TimeUnit.HOURS.sleep(1);
    }

并行流使用的默认线程池是:ForkJoinPool.commonPool()

代码语言:javascript
复制
代码语言:javascript
复制
代码语言:javascript
复制
 ForkJoinPool common的初始化:

其中并行度的值和系统属性:

java.util.concurrent.ForkJoinPool.common.parallelism

有关,如果没配置,则默认和系统cpu核数相关(

Runtime.getRuntime().availableProcessors()获取)。

本机测试,默认并行数为11。

代码语言:javascript
复制

System.out.println("ForkJoinPool并行数" + ForkJoinPool.getCommonPoolParallelism());

修改系统属性 ,设置并行数为20:

代码语言:javascript
复制
 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");
 System.out.println("ForkJoinPool并行数:  " + ForkJoinPool.getCommonPoolParallelism());

执行结果:

注意:虽然可以通过设置系统属性修改默认

代码语言:javascript
复制
ForkJoinPool common的并行数,提高并行度,但是默认共享使用一个

ForkJoinPool起不到隔离作用,择情况而选择使用。

二、在自定义的ForkJoinPool中运行parallel()操作

通过创建新的ForkJoinPool,设置线程池数目:

代码语言:javascript
复制
ForkJoinPool forkJoinPool = new ForkJoinPool(20);

然后并行流的执行提交给新建的ForkJoinPool执行:

代码语言:javascript
复制
forkJoinPool.submit

示例:

代码语言:javascript
复制
package com.renzhikeji.demo;


import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    public static void main(String[] args) throws InterruptedException {

        List<Integer> list = IntStream.range(1, 50).boxed().toList();
        ForkJoinPool forkJoinPool = new ForkJoinPool(20);
        forkJoinPool.submit(() -> list.parallelStream().forEach(t->{
            System.out.println(t+":"+ Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        TimeUnit.HOURS.sleep(1);
    }

}

执行结果:

执行原理:

判断当前线程是否ForkJoinWorkerThread,如果时,则使用当前线程绑定的ForkJoinPool即我们自定义创建的去执行任务。

三、小结

java的parallelStream并行流,可能需要开发者自定义线程池,起到提高性能及隔离故障的作用。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 认知科技技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档