前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈parallelStream 顶

浅谈parallelStream 顶

作者头像
算法之名
发布2019-08-20 11:11:34
6440
发布2019-08-20 11:11:34
举报
文章被收录于专栏:算法之名算法之名

parallelStream是什么,它是一个集合的并发处理流.其作用是把一个集合中的数据分片,进行一个多线程的处理,增快运行速度.

比如说这样一段代码

private Set<SysRole> sysRoles;
private Set<String> permission;

@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
    Collection<GrantedAuthority> collection = Collections.synchronizedSet(new HashSet<>());
    if (!CollectionUtils.isEmpty(sysRoles)) {
        sysRoles.parallelStream().forEach(role -> {
            if (role.getCode().startsWith("ROLE_")) {
                collection.add(new SimpleGrantedAuthority(role.getCode()));
            }else {
                collection.add(new SimpleGrantedAuthority("ROLE_" + role.getCode()));
            }
        });
    }
    return collection;
}

它就是以不同的线程来给collection添加SimpleGrantedAuthority的,请注意collection的线程安全性.

当然我们可以用下面这个例子来证明parallelStream的确是多线程处理

public class App {
    public static void main(String[] args) throws Exception {
        System.out.println("Hello World!");
        // 构造一个10000个元素的集合
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list.add(i);
        }
        // 统计并行执行list的线程
        Set<Thread> threadSet = new CopyOnWriteArraySet<>();
        // 并行执行
        list.parallelStream().forEach(integer -> {
            Thread thread = Thread.currentThread();
            // System.out.println(thread);
            // 统计并行执行list的线程
            threadSet.add(thread);
        });
        System.out.println("threadSet一共有" + threadSet.size() + "个线程");
        System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu");
        List<Integer> list1 = new ArrayList<>();
        List<Integer> list2 = new ArrayList<>();
        for (int i = 0; i < 100000; i++) {
            list1.add(i);
            list2.add(i);
        }
        Set<Thread> threadSetTwo = new CopyOnWriteArraySet<>();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Thread threadA = new Thread(() -> {
            list1.parallelStream().forEach(integer -> {
                Thread thread = Thread.currentThread();
                // System.out.println("list1" + thread);
                threadSetTwo.add(thread);
            });
            countDownLatch.countDown();
        });
        Thread threadB = new Thread(() -> {
            list2.parallelStream().forEach(integer -> {
                Thread thread = Thread.currentThread();
                // System.out.println("list2" + thread);
                threadSetTwo.add(thread);
            });
            countDownLatch.countDown();
        });

        threadA.start();
        threadB.start();
        countDownLatch.await();
        System.out.print("threadSetTwo一共有" + threadSetTwo.size() + "个线程");

        System.out.println("---------------------------");
        System.out.println(threadSet);
        System.out.println(threadSetTwo);
        System.out.println("---------------------------");
        threadSetTwo.addAll(threadSet);
        System.out.println(threadSetTwo);
        System.out.println("threadSetTwo一共有" + threadSetTwo.size() + "个线程");
        System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu");
    }
}

运行结果如下

Hello World! threadSet一共有3个线程 系统一个有4个cpu threadSetTwo一共有5个线程--------------------------- [Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main]] [Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,]] --------------------------- [Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,], Thread[main,5,main]] threadSetTwo一共有6个线程 系统一个有4个cpu

我们可以看到threadSet一共有3个线程,证明

Set<Thread> threadSet = new CopyOnWriteArraySet<>();
// 并行执行
list.parallelStream().forEach(integer -> {
    Thread thread = Thread.currentThread();
    // System.out.println(thread);
    // 统计并行执行list的线程
    threadSet.add(thread);
});
System.out.println("threadSet一共有" + threadSet.size() + "个线程");

是3个线程处理的,另外CopyOnWriteArraySet是线程安全的.后面是由显示线程调用,主线程等待的方式.

调节parallelStream的并发线程数可以用参数-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量)

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档