首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >GraphDSL Streams:如何从Akka接口获取物化Sink输出?

GraphDSL Streams:如何从Akka接口获取物化Sink输出?
EN

Stack Overflow用户
提问于 2017-02-24 05:43:21
回答 2查看 1.3K关注 0票数 5

这是一个使用GraphDSL应用编程接口的非常简单的新手问题。我读了几个相关的SO帖子,但我看不到答案:

代码语言:javascript
运行
复制
val actorSystem = ActorSystem("QuickStart")
val executor = actorSystem.dispatcher
val materializer = ActorMaterializer()(actorSystem)

val source: Source[Int, NotUsed] = Source(1 to 5)
val throttledSource = source.throttle(1, 1.second, 1, ThrottleMode.shaping)
val intDoublerFlow = Flow.fromFunction[Int, Int](i => i * 2)
val sink = Sink.foreach(println)

val graphModel = GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  throttledSource ~> intDoublerFlow ~> sink

  // I presume I want to change this shape to something else
  // but I can't figure out what it is.
  ClosedShape
}
// TODO: This is RunnableGraph[NotUsed], I want RunnableGraph[Future[Done]] that gives the
// materialized Future[Done] from the sink. I presume I need to use a GraphDSL SourceShape
// but I can't get that working.
val graph = RunnableGraph.fromGraph(graphModel)

// This works and gives me the materialized sink output using the simpler API.
// But I want to use the GraphDSL so that I can add branches or junctures.
val graphThatIWantFromDslAPI = throttledSource.toMat(sink)(Keep.right)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-02-24 06:00:56

诀窍是将您需要的阶段(在本例中为sink)传递给GraphDSL.create。作为第二个参数传递的函数也会发生变化,需要一个可以在图形中使用的Shape输入参数(下面示例中的s)。

代码语言:javascript
运行
复制
  val graphModel: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit b => s =>
    import GraphDSL.Implicits._

    throttledSource ~> intDoublerFlow ~> s

    // ClosedShape is just fine - it is always the shape of a RunnableGraph
    ClosedShape
  }
  val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)

更多信息可以在docs中找到。

票数 14
EN

Stack Overflow用户

发布于 2017-02-24 05:59:19

代码语言:javascript
运行
复制
val graphModel = GraphDSL.create(sink) { implicit b: Builder[Future[Done]] => sink =>
  import akka.stream.scaladsl.GraphDSL.Implicits._

  throttledSource ~> intDoublerFlow ~> sink

  ClosedShape
}
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)    
val graphThatIWantFromDslAPI: RunnableGraph[Future[Done]] = throttledSource.toMat(sink)(Keep.right)

GraphDSL应用程序接口的问题是,隐式生成器严重过载。您需要将接收器包装在create中,这会将Builder[NotUsed]转换为Builder[Future[Done]],并且现在表示来自builder => sink => shape的函数,而不是builder => shape

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42426741

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档