前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >在使用Java 8并行流之前要考虑两次

在使用Java 8并行流之前要考虑两次

作者头像
白石
发布2019-08-23 09:59:35
9040
发布2019-08-23 09:59:35
举报
文章被收录于专栏:白石白石

在使用Java 8并行流之前要考虑两次

如果您倾听来自Oracle的人们谈论Java 8背后的设计选择,您会经常听到并行性是主要动机。 并行化是lambdas,流API和其他方面的驱动力。 我们来看一下流API的示例。

代码语言:javascript
复制
private long countPrimes(int max) {
    return range(1, max).parallel().filter(this::isPrime).count();
}
private boolean isPrime(long n) {
    return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}

在这里,我们有countPrimes方法,它计算1到最大值之间的素数的数量。 数字流由范围方法创建。 然后将流切换到并行模式; 过滤掉非素数的数字,并计算剩余的数字。

您可以看到流API允许我们以简洁紧凑的方式描述问题。 而且,并行化只是调用parallel()方法。 当我们这样做时,流被分成多个块,每个块独立处理,结果总结在最后。 由于我们实现isPrime方法非常无效且占用大量CPU,我们可以利用并行化并利用所有可用的CPU内核。

我们来看另一个例子:

代码语言:javascript
复制
private List<StockInfo> getStockInfo(Stream<String> symbols) {
     return symbols.parallel()
            .map(this::getStockInfo) //slow network operation
            .collect(toList());
}

我们在输入上有一个股票代码列表,我们必须调用慢速网络操作来获取有关股票的一些细节。 在这里,我们不处理CPU密集型操作,但我们也可以利用并行化。 并行执行多个网络请求是个好主意。 同样,并行流的一个很好的任务,你同意吗?

如果您这样做,请再次查看上一个示例。 有一个很大的错误。 你看到了吗? 问题是所有并行流都使用common fork-join thread pool,如果 你提交一个长期运行的任务,你有效地阻止了池中的所有线程。因此,您将阻止使用并行流的所有其他任务。 想象一下servlet环境,当一个请求调用getStockInfo()和另一个countPrimes()时。 即使每个都需要不同的资源,也会阻止另一个。 更糟糕的是,你不能为并行流指定线程池; 整个类加载器必须使用相同的。

ForkJoinPool 的适用场景:

  1. ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。
  2. ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
  3. ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O线程间同步sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。

让我们在下面的例子中说明它:

代码语言:javascript
复制
private void run() throws InterruptedException {  
  ExecutorService es = Executors.newCachedThreadPool();  
  // Simulating multiple threads in the system  
  // if one of them is executing a long-running task.  
  // Some of the other threads/tasks are waiting  
  // for it to finish  es.execute(() -> countPrimes(MAX, 1000));    
  //incorrect task  es.execute(() -> countPrimes(MAX, 0));  es.execute(() -> countPrimes(MAX, 0));  es.execute(() -> countPrimes(MAX, 0));  es.execute(() -> countPrimes(MAX, 0));  es.execute(() -> countPrimes(MAX, 0));  es.shutdown();  es.awaitTermination(60, TimeUnit.SECONDS);}
  
  private void countPrimes(int max, int delay) {  
    System.out.println(range(1, max).parallel().filter(this::isPrime).peek(i -> sleep(delay)).count()  );
  }

在这里,我们模拟系统中的六个线程。 所有这些都在执行CPU密集型任务,第一个被“打破”并且在它找到素数后就睡了一秒钟。 这只是一个人为的例子; 你可以想象一个被卡住或执行阻塞操作的线程。

问题是:当我们执行这段代码时会发生什么?我们有六个任务;其中一项需要一整天才能完成,其余的应该会更快完成。毫不奇怪,每次执行代码时,都会得到不同的结果。有时候,所有健康的任务都会结束;另一些时候,他们中的一些人会被慢的那一个卡住。 您希望在生产系统中有这样的行为吗?一个坏掉的任务会导致应用程序的其余部分崩溃?我猜不会。

如何确保这样的事情永远不会发生,只有两种选择。第一个是确保提交给公共fork-join池的所有任务不会被卡住并在合理的时间内完成。 但这说起来容易做起来难,尤其是在复杂的应用程序中。另一个选项是不使用并行流,直到Oracle允许我们指定用于并行流的线程池

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 在使用Java 8并行流之前要考虑两次
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档