我试图为包含并行处理流的Akka流定义一个图形(我正在使用Akka.NET,但这不重要)。假设订单的数据源,每个订单由一个订单ID和一个产品列表(订单项)组成。工作流程如下:
步骤1 (Source.From)、2(广播)、4-5 (地图)、6(合并)、7 (Sink)看上去没问题.但是集合拆分是如何在Akka或反应性流术语中实现的呢?这不是广播或扁平化,N元素的集合需要分割成N个独立的子流,这些子流稍后将被合并回。这是如何实现的?
发布于 2016-10-18 23:57:14
我建议在一次流程中完成。我知道两个流程看起来更酷,但相信我,从设计的简单性来看,这是不值得的(我试过了)。你可以写这样的东西
import akka.stream.scaladsl.{Flow, Sink, Source, SubFlow}
import scala.collection.immutable
import scala.concurrent.Future
case class Item()
case class Order(items: List[Item])
val flow = Flow[Order]
.mapAsync(4) { order =>
Future {
// Enrich your order here
order
}
}
.mapConcat { order =>
order.items.map(order -> _)
}
.mapAsync(4) { case (order, item) =>
Future {
// Enrich your item here
order -> item
}
}
.groupBy(2, tuple => tuple._1)
.fold[Map[Order, List[Item]]](immutable.Map.empty) { case (map, (order, item)) => map.updated(order, map.getOrElse(order, Nil) :+ item) }
.mapConcat { _.map { case (order, newItems) => order.copy(items = newItems)} }
但即使是这种方式也是不好的。在上面的代码或您的设计中,有很多事情都可能出错。如果某项订单的浓缩失败,你会怎么做?如果订单对象的充实失败了怎么办?你的溪流应该发生什么?
如果我是你,我会有Flow[Order]
,并在mapAsync
中处理它的子程序,所以至少它保证了我没有部分处理订单。
https://stackoverflow.com/questions/40117489
复制相似问题