首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >拥有Akka流的工人池

拥有Akka流的工人池
EN

Stack Overflow用户
提问于 2016-03-25 13:13:18
回答 2查看 1K关注 0票数 5

正如在akka streams文档中所描述的,我试图创建一个工人池(流):

代码语言:javascript
运行
复制
def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      val balancer = b.add(Balance[In](workerCount))
      val merge = b.add(Merge[Out](workerCount))

      for (_ <- 1 to workerCount) {
        balancer ~> worker ~> merge
      }
      FlowShape(balancer.in, merge.out)
    })
  }

然后,我使用这个函数并行运行一个流:

代码语言:javascript
运行
复制
def main(args: Array[String]) {
    val system = ActorSystem()
    implicit val mat = ActorMaterializer.create(system)

    val flow = Flow[Int].map(e => {
      println(e)
      Thread.sleep(1000) // 1 second
      e
    })

    Source(Range.apply(1, 10).toList)
      .via(balancer(flow, 3))
      .runForeach(e => {})
  }

我得到了预期的输出1, 2, 3, 4, 5, 6, 7, 8, 9,但是数字以每秒1的速度出现(没有并行性)。我做错什么了?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-03-25 18:01:39

正如Endre所指出的,流本身应该被标记为.async

但即使如此,这种行为也不是决定性的,因为异步阶段的默认缓冲区大小为16,平衡器可能将所有消息发送给同一个工作人员。

因此,balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge将导致所需的行为。

有关项目成员的答案,请参见:https://github.com/akka/akka/issues/20146#issuecomment-201381356

票数 1
EN

Stack Overflow用户

发布于 2016-03-25 16:50:46

该部分中的文档已经过时,将在下一个版本中修复。基本上,您所需要的就是调用流本身上的.async。通过这样做,您就可以在流周围画一个" box“(您可以想象它是一个带有一个输入和输出端口的盒子),这将防止在该框中融合。通过这样做,所有的工作人员都将成为敬业的演员。图的其余部分(广播和合并阶段)将共享另一个参与者(它们不会在单独的参与者上运行,异步框只保护流程,外部的事物仍将被融合)。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36220636

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档