首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用Scala参与者执行CPU绑定的任务?

使用Scala参与者执行CPU绑定的任务?
EN

Stack Overflow用户
提问于 2011-06-24 19:05:57
回答 2查看 1.2K关注 0票数 8

假设我必须ехеcute几个受CPU限制的任务。例如,如果我有4个CPU,我可能会创建一个由4-5个等待队列的工作线程组成的固定大小的线程池,并将任务放入队列中。在Java语言中,我可以使用java.util.concurrent (也许是ThreadPoolExecutor)来实现这种机制。

您将如何使用Scala参与者来实现它?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2011-06-25 00:25:28

所有参与者基本上都是由一个调度程序在幕后执行的线程。调度器创建一个线程池来执行大致绑定到您的核心数量的参与者。这意味着您可以只为需要执行的每个任务创建一个参与者,其余的留给Scala:

代码语言:javascript
运行
复制
for(i <- 1 to 20) {
  actor {
    print(i); 
    Thread.sleep(1000);
  }
}

这里的缺点是取决于任务的数量,为每个任务创建线程的成本可能非常昂贵,因为线程在Java中并不是那么便宜。

一种简单的方法是创建一个有界池的工作人员参与者,然后通过消息传递将任务分发给他们,方法如下:

代码语言:javascript
运行
复制
import scala.actors.Actor._

val numWorkers = 4
val pool = (1 to numWorkers).map { i =>
  actor { 
    loop { 
      react { 
        case x: String => println(x) 
      } 
    }
  }
}

for(i <- 1 to 20) {
  val r = (new util.Random).nextInt(numWorkers)
  pool(r) ! "task "+i
}

我们想要创建多个参与者的原因是因为单个参与者一次只处理一条消息(即任务),因此要获得任务的并行性,您需要创建多个。

附注:当涉及到I/O限制任务时,默认调度程序变得特别重要,因为在这种情况下,您肯定想要更改线程池的大小。有两篇很好的博客文章详细介绍了这一点:Explore the Scheduling of Scala ActorsScala actors thread pool pitfall

话虽如此,Akka是一个参与者框架,它为参与者提供了更高级的工作流工具,我在任何实际的应用程序中都会使用它。这是一个负载均衡(而不是随机)的任务执行器:

代码语言:javascript
运行
复制
import akka.actor.Actor
import Actor._
import akka.routing.{LoadBalancer, CyclicIterator}

class TaskHandler extends Actor {
  def receive = {
    case t: Task =>
      // some computationally expensive thing
      t.execute
    case _ => println("default case is required in Akka...")
  }
}

class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer {
   val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start())
   val seq = new CyclicIterator(workerPool)
}

val router = actorOf(new TaskRouter(4)).start()

for(i <- 1 to 20) {
  router ! Task(..)
}

您可以使用不同类型的负载平衡(CyclicIterator是循环分发),因此您可以查看文档here以了解更多信息。

票数 9
EN

Stack Overflow用户

发布于 2011-06-24 22:43:38

嗯,你通常不需要。使用演员的吸引力之一就是他们会为你处理这些细节。

但是,如果您坚持要对此进行管理,则需要覆盖Actor类上受保护的scheduler方法,以返回适当的IScheduler。另请参阅scala.actors.scheduler package,以及Actor trait上有关调度程序的注释。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/6466878

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档