正如在akka streams文档中所描述的,我试图创建一个工人池(流):
def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      val balancer = b.add(Balance[In](workerCount))
      val merge = b.add(Merge[Out](workerCount))
      for (_ <- 1 to workerCount) {
        balancer ~> worker ~> merge
      }
      FlowShape(balancer.in, merge.out)
    })
  }然后,我使用这个函数并行运行一个流:
def main(args: Array[String]) {
    val system = ActorSystem()
    implicit val mat = ActorMaterializer.create(system)
    val flow = Flow[Int].map(e => {
      println(e)
      Thread.sleep(1000) // 1 second
      e
    })
    Source(Range.apply(1, 10).toList)
      .via(balancer(flow, 3))
      .runForeach(e => {})
  }我得到了预期的输出1, 2, 3, 4, 5, 6, 7, 8, 9,但是数字以每秒1的速度出现(没有并行性)。我做错什么了?
发布于 2016-03-25 18:01:39
正如Endre所指出的,流本身应该被标记为.async。
但即使如此,这种行为也不是决定性的,因为异步阶段的默认缓冲区大小为16,平衡器可能将所有消息发送给同一个工作人员。
因此,balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge将导致所需的行为。
有关项目成员的答案,请参见:https://github.com/akka/akka/issues/20146#issuecomment-201381356
发布于 2016-03-25 16:50:46
该部分中的文档已经过时,将在下一个版本中修复。基本上,您所需要的就是调用流本身上的.async。通过这样做,您就可以在流周围画一个" box“(您可以想象它是一个带有一个输入和输出端口的盒子),这将防止在该框中融合。通过这样做,所有的工作人员都将成为敬业的演员。图的其余部分(广播和合并阶段)将共享另一个参与者(它们不会在单独的参与者上运行,异步框只保护流程,外部的事物仍将被融合)。
https://stackoverflow.com/questions/36220636
复制相似问题