首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在使用剧本Iteratee处理数据流时等待免费的Akka演员

如何在使用剧本Iteratee处理数据流时等待免费的Akka演员
EN

Stack Overflow用户
提问于 2016-01-21 00:32:51
回答 1查看 36关注 0票数 0

我有无限流,消息表示为Play Enumerator,我将Iteratee应用于其中。然后由Akka参与者处理每条消息(参与者的数量限制在10人以内)。

现在,我希望Iteratee中的代码异步地等待空闲的参与者,如果所有10个参与者都很忙,并且不向他们发送另一条导致异常Ask timed out on ...的消息。

如何实现这样的功能?是否有更好的方法来处理无await**?**的10个参与者的无限流?

我刚才提到的代码示例可能如下所示:

代码语言:javascript
运行
复制
val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(10)))
val it = Iteratee.foreach[Msg] { msg => 
  workers ? msg
}

msgEnumerator.apply(it)
EN

回答 1

Stack Overflow用户

发布于 2016-01-21 18:37:10

Iteratee.foldM与actor ask模式结合使用似乎是正确的方法。假设您不希望您的参与者构建大型邮箱(如果您不关心大型邮箱,只需使用tellIteratee.foreach而不是ask),这将需要一些专门的路由逻辑。由于用于定制akka路由器的api不支持异步,因此您将需要一个自定义参与者来处理一次只向参与者池中的每个参与者分发一段工作的逻辑。

我想是这样的:

代码语言:javascript
运行
复制
class WorkDistributor extends Actor {
  final val NUM_WORKERS = 10
  val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(NUM_WORKERS))) 

  var numActiveWorkers = 0
  var queuedWork: Option[Work] = None

  def receive = {
    case IterateeWork(work) if numActiveWorkers < NUM_WORKERS => workers ! work; numActiveWorkers += 1; sender ! SendMeMoreWork
    case IterateeWork(work) => queuedWork = Some(work)
    case ActorFinishedWork if queuedWork.isDefined => queuedWork.foreach(workers ! _); queuedWork = None
    case ActorFinishedWork => numActiveWorkers -= 1; sender ! SendMeMoreWork
  }
}

其中,IterateeWork消息由迭代器发送,ActorFinishedWork消息由参与者池中的参与者发送。

看看我写的这件事,应该重写它,以便在参与者池满时使用become来更改行为(而不是在每一种情况下使用if过滤器,但我把这作为一个练习留给读者。

那么你的Iteratee看起来就像

代码语言:javascript
运行
复制
Iteratee.foldM[Work, SendMeMoreWork.type](SendMeMoreWork) {
  case (_, work) => workDistributor ? IterateeWork(work)
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34913324

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档