首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >自定义Akka流

自定义Akka流
EN

Stack Overflow用户
提问于 2018-06-15 11:22:06
回答 1查看 368关注 0票数 1

我有一组流阶段(源、流和流),我想添加一些MetaData信息。

因此,而不是产生A -> (A,StreamMetaData)的资源。我使用自定义流阶段成功地做到了这一点,在抓取(In)元素上,我推送(out,(elem,StreamMetaData))。实际上,它不是“转换”现有的源,而是将其传递给流以重新创建新的源。

现在,我正在尝试实现以下MetaStream阶段:

因此,在源产生(A,StreamMetaData)元组的情况下,我希望将A传递给一个现有的流,以便进行某些计算,然后将产生的输出'B‘与StreamMetaData合并。然后将这些信息传递给接受(B,StreamMetaData)的Sink。

你建议我怎么做。我被告知,部分图表是最好的选择,并将有助于完成这样的任务。UniformFanOut和UniformFanIn使用Unzip(A StreamMetaData,A,streamMetaData)和Zip(A,B)

代码语言:javascript
运行
复制
 val fanOut = GraphDSL.create() { implicit b =>
    val unzip = b.add(Unzip[T, StreamMetaData]) 
    UniformFanOutShape(unzip.in, unzip.out0, unzip.out1)
  }

  val fanIn = GraphDSL.create() { implicit b =>
    val zip = b.add(Zip[T ,StreamMetaData]()) 
    UniformFanInShape(zip)
  }

如何连接fanIn和fanOut,以实现与图片中相同的行为?

我有这样的想法;

代码语言:javascript
运行
复制
 def metaFlow[T, B, Mat](flow: Flow[T, B, Mat]): Unit = {
   val wrappedFlow =
     Flow.fromGraph(GraphDSL.create(){ implicit b =>
       import GraphDSL.Implicits._

       val unzip: FanOutShape2[(T, StreamMetaData), T, StreamMetaData] = b.add(Unzip[T, StreamMetaData])
       val existingFlow = b.add(flow)
       val zip: FanInShape2[B,StreamMetaData,(B,StreamMetaData)] = b.add(Zip[B, StreamMetaData])

       unzip.out0 ~> existingFlow ~> zip.in0
       unzip.out1 ~> zip.in1

       FlowShape(unzip.in, zip.out)
     })

 }

提前谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-06-15 13:31:31

创建一个新的SourceShape堆叠流图的程序可以工作,这与您的flowShape实现略有不同。

代码语言:javascript
运行
复制
  def sourceGraph[A, B](f: A => B, source: Source[(A, StreamMetaData), NotUsed]) = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val unzip = builder.add(Unzip[A, StreamMetaData]())
    val zip   = builder.add(Zip[B, StreamMetaData]())

    val flow0  = builder.add(Flow[A].map { f(_) })

    val flow1 = source ~> unzip.in
                          unzip.out0 ~> flow0 ~> zip.in0
                          unzip.out1          ~> zip.in1


    SourceShape(zip.out)
  })



def flowGraph[A, B](f: A => B) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val unzip = builder.add(Unzip[A, StreamMetaData]())
    val zip   = builder.add(Zip[B, StreamMetaData]())

    val flow0  = builder.add(Flow[A].map { f(_) })

    unzip.out0 ~> flow0 ~> zip.in0
    unzip.out1          ~> zip.in1

    FlowShape(unzip.in, zip.out)
  })
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50874583

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档