首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Scala并行集合

Scala并行集合
EN

Stack Overflow用户
提问于 2018-04-13 00:30:13
回答 2查看 299关注 0票数 1

我非常天真地尝试使用Scala .par,结果证明比非并行版本要慢得多。对此有何解释?

注意:问题不在于提高速度,而在于理解为什么这种对.par的天真使用不能立即提高速度。

注2:计时方法:我用N= 10000运行了这两种方法。第一个大约在20多秒后返回。第二个是我在3分钟后杀死的。一点也不接近。如果让它运行更长时间,我会遇到Java堆空间异常。

代码语言:javascript
复制
def pi_random(N: Long): Double = {
  val count = (0L until N * N)
    .map { _ =>
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1) 1 else 0
    }
    .sum
  4 * count.toDouble / (N * N)
}

def pi_random_parallel(N: Long): Double = {
  val count = (0L until N * N)
    .par
    .map { _ =>
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1) 1 else 0
    }
    .sum
  4 * count.toDouble / (N * N)
}
EN

回答 2

Stack Overflow用户

发布于 2018-04-13 00:56:58

如果不做一些实际的分析,很难确定,但我有两个理论:

首先,您可能会失去Range类的一些好处,特别是几乎为零的内存使用率。当您执行(0L until N * N)时,您将创建一个Range对象,该对象是惰性的。它实际上不会创建包含该范围内每个数字的任何对象。我想map也不知道。而且sum一次计算并相加一个数字,所以也几乎不分配任何内存。

我不确定ParRange是否也是如此。似乎它必须为每个拆分分配一些数量,并且在调用map之后,它可能必须在内存中存储一些中间结果,因为“相邻的”拆分等待另一个拆分完成。尤其是堆空间异常让我认为情况就是这样。所以你会在GC之类的事情上浪费很多时间。

其次,到目前为止,对rng.nextDouble的调用可能是该内部函数中开销最大的部分。但我相信java和scala的Random类本质上都是单线程的。它们在内部同步和阻塞。所以不管怎样,你不会从并行中获得那么多,实际上会损失一些开销。

票数 1
EN

Stack Overflow用户

发布于 2018-04-13 06:25:50

每个任务没有足够的工作量,任务粒度太细。

创建每个任务都需要一些开销:

  • 必须创建代表任务的某个对象
  • 必须确保一次只有一个线程执行一个任务
  • 在某些线程空闲的情况下,必须调用一些作业窃取过程。

对于N= 10000,您将实例化100,000,000个小任务。这些任务中的每一个几乎什么也不做:它生成两个随机数,并执行一些基本算术和一个if分支。创建任务的开销无法与每个任务所做的工作相比较。

任务必须大得多,以便每个线程都有足够的工作要做。此外,如果您将每个RNG线程设为本地线程,这样线程就可以并行执行它们的工作,而不会永久锁定默认的随机数生成器,可能会更快。

下面是一个示例:

代码语言:javascript
复制
import scala.util.Random

def pi_random(N: Long): Double = {
  val rng = new Random
  val count = (0L until N * N)
    .map { _ =>
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1) 1 else 0
    }
    .sum
  4 * count.toDouble / (N * N)
}

def pi_random_parallel(N: Long): Double = {
  val rng = new Random
  val count = (0L until N * N)
    .par
    .map { _ =>
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1) 1 else 0
    }
    .sum
  4 * count.toDouble / (N * N)
}


def pi_random_properly(n: Long): Double = {
  val count = (0L until n).par.map { _ =>
    val rng = ThreadLocalRandom.current
    var sum = 0
    var idx = 0
    while (idx < n) {
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1.0) sum += 1
      idx += 1
    }
    sum
  }.sum
  4 * count.toDouble / (n * n)
}

下面是一个小演示和时间安排:

代码语言:javascript
复制
def measureTime[U](repeats: Long)(block: => U): Double = {
  val start = System.currentTimeMillis

  var iteration = 0
  while (iteration < repeats) {
    iteration += 1
    block
  }

  val end = System.currentTimeMillis
  (end - start).toDouble / repeats
}

// basic sanity check that all algos return roughly same result
println(pi_random(2000))
println(pi_random_parallel(2000))
println(pi_random_properly(2000))

// time comparison (N = 2k, 10 repetitions for each algorithm)
val N = 2000
val Reps = 10
println("Sequential:  " + measureTime(Reps)(pi_random(N)))
println("Naive:       " + measureTime(Reps)(pi_random_parallel(N)))
println("My proposal: " + measureTime(Reps)(pi_random_properly(N)))

输出:

代码语言:javascript
复制
3.141333
3.143418
3.14142
Sequential: 621.7
Naive:      3032.6
My version: 44.7

现在,并行版本大约比顺序版本快一个数量级(结果显然取决于内核的数量等)。

我不能用N= 10000来测试它,因为这个天真的并行化版本使用"GC开销超过“的-error使所有东西崩溃,这也说明创建小任务的开销太大了。

在我的实现中,我额外地展开了内部的while:您只需要在一个寄存器中使用一个计数器,而不需要通过对范围执行mapping来创建一个巨大的集合。

编辑:ThreadLocalRandom取代了一切,现在你的编译器版本是否支持已经无关紧要了,所以它也应该适用于2.11的早期版本。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49801698

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档