首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >为集合元素的并行处理创建Akka流

为集合元素的并行处理创建Akka流
EN

Stack Overflow用户
提问于 2016-10-18 20:27:58
回答 1查看 1.5K关注 0票数 3

我试图为包含并行处理流的Akka流定义一个图形(我正在使用Akka.NET,但这不重要)。假设订单的数据源,每个订单由一个订单ID和一个产品列表(订单项)组成。工作流程如下:

  1. 接收和订购
  2. 将订单广播到两个流程,流程A处理订单项目,B频道处理订单ID (一些簿记工作)
  3. 流程A:将订单项的集合拆分为单独的元素,每个元素分别处理
  4. 流程A:对于上一步中的拆分所产生的每一项订单,调用一些外部服务来查找额外的信息(价格、可用性等)。
  5. 流程B:对给定的订单ID进行额外的簿记
  6. 合并流程A和B
  7. 将上一步的合并数据发送到接收器,从而丰富订单信息。

步骤1 (Source.From)、2(广播)、4-5 (地图)、6(合并)、7 (Sink)看上去没问题.但是集合拆分是如何在Akka或反应性流术语中实现的呢?这不是广播或扁平化,N元素的集合需要分割成N个独立的子流,这些子流稍后将被合并回。这是如何实现的?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-10-18 23:57:14

我建议在一次流程中完成。我知道两个流程看起来更酷,但相信我,从设计的简单性来看,这是不值得的(我试过了)。你可以写这样的东西

代码语言:javascript
运行
复制
import akka.stream.scaladsl.{Flow, Sink, Source, SubFlow}

import scala.collection.immutable
import scala.concurrent.Future

case class Item()

case class Order(items: List[Item])

val flow = Flow[Order]
  .mapAsync(4) { order =>
    Future {
      // Enrich your order here
      order
    }
  }
  .mapConcat { order =>
    order.items.map(order -> _)
  }
  .mapAsync(4) { case (order, item) =>
    Future {
      // Enrich your item here
      order -> item
    }
  }
  .groupBy(2, tuple => tuple._1)
  .fold[Map[Order, List[Item]]](immutable.Map.empty) { case (map, (order, item)) => map.updated(order, map.getOrElse(order, Nil) :+ item) }
  .mapConcat { _.map { case (order, newItems) => order.copy(items = newItems)} }

但即使是这种方式也是不好的。在上面的代码或您的设计中,有很多事情都可能出错。如果某项订单的浓缩失败,你会怎么做?如果订单对象的充实失败了怎么办?你的溪流应该发生什么?

如果我是你,我会有Flow[Order],并在mapAsync中处理它的子程序,所以至少它保证了我没有部分处理订单。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40117489

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档