假设我必须ехеcute几个受CPU限制的任务。例如,如果我有4个CPU,我可能会创建一个由4-5个等待队列的工作线程组成的固定大小的线程池,并将任务放入队列中。在Java语言中,我可以使用java.util.concurrent
(也许是ThreadPoolExecutor
)来实现这种机制。
您将如何使用Scala参与者来实现它?
发布于 2011-06-25 00:25:28
所有参与者基本上都是由一个调度程序在幕后执行的线程。调度器创建一个线程池来执行大致绑定到您的核心数量的参与者。这意味着您可以只为需要执行的每个任务创建一个参与者,其余的留给Scala:
for(i <- 1 to 20) {
actor {
print(i);
Thread.sleep(1000);
}
}
这里的缺点是取决于任务的数量,为每个任务创建线程的成本可能非常昂贵,因为线程在Java中并不是那么便宜。
一种简单的方法是创建一个有界池的工作人员参与者,然后通过消息传递将任务分发给他们,方法如下:
import scala.actors.Actor._
val numWorkers = 4
val pool = (1 to numWorkers).map { i =>
actor {
loop {
react {
case x: String => println(x)
}
}
}
}
for(i <- 1 to 20) {
val r = (new util.Random).nextInt(numWorkers)
pool(r) ! "task "+i
}
我们想要创建多个参与者的原因是因为单个参与者一次只处理一条消息(即任务),因此要获得任务的并行性,您需要创建多个。
附注:当涉及到I/O限制任务时,默认调度程序变得特别重要,因为在这种情况下,您肯定想要更改线程池的大小。有两篇很好的博客文章详细介绍了这一点:Explore the Scheduling of Scala Actors和Scala actors thread pool pitfall。
话虽如此,Akka是一个参与者框架,它为参与者提供了更高级的工作流工具,我在任何实际的应用程序中都会使用它。这是一个负载均衡(而不是随机)的任务执行器:
import akka.actor.Actor
import Actor._
import akka.routing.{LoadBalancer, CyclicIterator}
class TaskHandler extends Actor {
def receive = {
case t: Task =>
// some computationally expensive thing
t.execute
case _ => println("default case is required in Akka...")
}
}
class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer {
val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start())
val seq = new CyclicIterator(workerPool)
}
val router = actorOf(new TaskRouter(4)).start()
for(i <- 1 to 20) {
router ! Task(..)
}
您可以使用不同类型的负载平衡(CyclicIterator是循环分发),因此您可以查看文档here以了解更多信息。
发布于 2011-06-24 22:43:38
嗯,你通常不需要。使用演员的吸引力之一就是他们会为你处理这些细节。
但是,如果您坚持要对此进行管理,则需要覆盖Actor
类上受保护的scheduler
方法,以返回适当的IScheduler
。另请参阅scala.actors.scheduler
package,以及Actor
trait上有关调度程序的注释。
https://stackoverflow.com/questions/6466878
复制相似问题