我有无限流,消息表示为Play Enumerator,我将Iteratee应用于其中。然后由Akka参与者处理每条消息(参与者的数量限制在10人以内)。
现在,我希望Iteratee中的代码异步地等待空闲的参与者,如果所有10个参与者都很忙,并且不向他们发送另一条导致异常Ask timed out on ...的消息。
如何实现这样的功能?是否有更好的方法来处理无await**?**的10个参与者的无限流?
我刚才提到的代码示例可能如下所示:
val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(10)))
val it = Iteratee.foreach[Msg] { msg =>
workers ? msg
}
msgEnumerator.apply(it)发布于 2016-01-21 18:37:10
将Iteratee.foldM与actor ask模式结合使用似乎是正确的方法。假设您不希望您的参与者构建大型邮箱(如果您不关心大型邮箱,只需使用tell和Iteratee.foreach而不是ask),这将需要一些专门的路由逻辑。由于用于定制akka路由器的api不支持异步,因此您将需要一个自定义参与者来处理一次只向参与者池中的每个参与者分发一段工作的逻辑。
我想是这样的:
class WorkDistributor extends Actor {
final val NUM_WORKERS = 10
val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(NUM_WORKERS)))
var numActiveWorkers = 0
var queuedWork: Option[Work] = None
def receive = {
case IterateeWork(work) if numActiveWorkers < NUM_WORKERS => workers ! work; numActiveWorkers += 1; sender ! SendMeMoreWork
case IterateeWork(work) => queuedWork = Some(work)
case ActorFinishedWork if queuedWork.isDefined => queuedWork.foreach(workers ! _); queuedWork = None
case ActorFinishedWork => numActiveWorkers -= 1; sender ! SendMeMoreWork
}
}其中,IterateeWork消息由迭代器发送,ActorFinishedWork消息由参与者池中的参与者发送。
看看我写的这件事,应该重写它,以便在参与者池满时使用become来更改行为(而不是在每一种情况下使用if过滤器,但我把这作为一个练习留给读者。
那么你的Iteratee看起来就像
Iteratee.foldM[Work, SendMeMoreWork.type](SendMeMoreWork) {
case (_, work) => workDistributor ? IterateeWork(work)
}https://stackoverflow.com/questions/34913324
复制相似问题