parallelStream是什么,它是一个集合的并发处理流.其作用是把一个集合中的数据分片,进行一个多线程的处理,增快运行速度.
比如说这样一段代码
private Set<SysRole> sysRoles;
private Set<String> permission;
@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
Collection<GrantedAuthority> collection = Collections.synchronizedSet(new HashSet<>());
if (!CollectionUtils.isEmpty(sysRoles)) {
sysRoles.parallelStream().forEach(role -> {
if (role.getCode().startsWith("ROLE_")) {
collection.add(new SimpleGrantedAuthority(role.getCode()));
}else {
collection.add(new SimpleGrantedAuthority("ROLE_" + role.getCode()));
}
});
}
return collection;
}
它就是以不同的线程来给collection添加SimpleGrantedAuthority的,请注意collection的线程安全性.
当然我们可以用下面这个例子来证明parallelStream的确是多线程处理
public class App {
public static void main(String[] args) throws Exception {
System.out.println("Hello World!");
// 构造一个10000个元素的集合
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
list.add(i);
}
// 统计并行执行list的线程
Set<Thread> threadSet = new CopyOnWriteArraySet<>();
// 并行执行
list.parallelStream().forEach(integer -> {
Thread thread = Thread.currentThread();
// System.out.println(thread);
// 统计并行执行list的线程
threadSet.add(thread);
});
System.out.println("threadSet一共有" + threadSet.size() + "个线程");
System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu");
List<Integer> list1 = new ArrayList<>();
List<Integer> list2 = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
list1.add(i);
list2.add(i);
}
Set<Thread> threadSetTwo = new CopyOnWriteArraySet<>();
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread threadA = new Thread(() -> {
list1.parallelStream().forEach(integer -> {
Thread thread = Thread.currentThread();
// System.out.println("list1" + thread);
threadSetTwo.add(thread);
});
countDownLatch.countDown();
});
Thread threadB = new Thread(() -> {
list2.parallelStream().forEach(integer -> {
Thread thread = Thread.currentThread();
// System.out.println("list2" + thread);
threadSetTwo.add(thread);
});
countDownLatch.countDown();
});
threadA.start();
threadB.start();
countDownLatch.await();
System.out.print("threadSetTwo一共有" + threadSetTwo.size() + "个线程");
System.out.println("---------------------------");
System.out.println(threadSet);
System.out.println(threadSetTwo);
System.out.println("---------------------------");
threadSetTwo.addAll(threadSet);
System.out.println(threadSetTwo);
System.out.println("threadSetTwo一共有" + threadSetTwo.size() + "个线程");
System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu");
}
}
运行结果如下
Hello World! threadSet一共有3个线程 系统一个有4个cpu threadSetTwo一共有5个线程--------------------------- [Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main]] [Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,]] --------------------------- [Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,], Thread[main,5,main]] threadSetTwo一共有6个线程 系统一个有4个cpu
我们可以看到threadSet一共有3个线程,证明
Set<Thread> threadSet = new CopyOnWriteArraySet<>();
// 并行执行
list.parallelStream().forEach(integer -> {
Thread thread = Thread.currentThread();
// System.out.println(thread);
// 统计并行执行list的线程
threadSet.add(thread);
});
System.out.println("threadSet一共有" + threadSet.size() + "个线程");
是3个线程处理的,另外CopyOnWriteArraySet是线程安全的.后面是由显示线程调用,主线程等待的方式.
调节parallelStream的并发线程数可以用参数-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量)