首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

GraphDSL Streams:如何从Akka接口获取物化Sink输出?

GraphDSL Streams是Akka Streams中的一个模块,用于构建和操作数据流图。它提供了一种声明式的方式来定义数据流图,并且可以通过物化(materialize)操作将其转换为可执行的数据流。

要从Akka接口获取物化Sink输出,可以按照以下步骤进行操作:

  1. 首先,使用GraphDSL.create()方法创建一个GraphDSL.Builder对象,该对象用于构建数据流图。
  2. 使用builder.add()方法添加所需的操作符(operators)和连接它们的边(edges)来定义数据流图的结构。在这个过程中,可以使用builder.addSink()方法添加一个Sink操作符,并将其与其他操作符连接起来。
  3. 使用builder.materializedValue()方法获取物化值(materialized value)。
  4. 使用builder.run()方法运行数据流图,并获取物化值。

下面是一个示例代码,演示了如何从Akka接口获取物化Sink输出:

代码语言:txt
复制
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import scala.concurrent._
import scala.concurrent.duration._

object GraphDSLStreamsExample {
  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("GraphDSLStreamsExample")
    implicit val materializer: ActorMaterializer = ActorMaterializer()

    val source: Source[Int, NotUsed] = Source(1 to 10)
    val sink: Sink[Int, Future[Done]] = Sink.foreach(println)

    val graph: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit builder =>
      sink =>
        import GraphDSL.Implicits._

        val broadcast = builder.add(Broadcast[Int](2))
        val merge = builder.add(Merge[Int](2))

        source ~> broadcast ~> merge ~> sink
                          broadcast ~> merge

        ClosedShape
    }

    val materializedSink: Future[Done] = RunnableGraph.fromGraph(graph).run()

    materializedSink.onComplete {
      case Success(_) =>
        println("Stream completed successfully.")
        system.terminate()
      case Failure(ex) =>
        println(s"Stream failed with: $ex")
        system.terminate()
    }

    Await.result(system.whenTerminated, Duration.Inf)
  }
}

在这个示例中,我们创建了一个数据流图,其中包含一个源(source)、一个广播(broadcast)、一个合并(merge)和一个汇(sink)。源产生1到10的整数,广播将整数发送到两个分支,合并将两个分支的整数合并为一个流,并将其发送到汇。物化值是一个Future对象,表示数据流图的执行结果。

推荐的腾讯云相关产品:腾讯云云服务器(CVM)、腾讯云云数据库MySQL版、腾讯云云原生容器服务(TKE)等。你可以通过访问腾讯云官方网站(https://cloud.tencent.com/)获取更多关于这些产品的详细信息和介绍。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Akka(19): Stream:组合数据流,组合共用-Graph modular composition

而Fan-In合并型,Fan-Out扩散型则具备多个输入或输出端口,可以用来构建更复杂的数据流图。..., source)(Keep.none) 这个Flow流向来说先Sink再Source是反的,形成的Flow上下游间无法协调,即Source端终结信号无法到达Sink端,因为这两端是相互独立的。...的GraphDSL,对复合型Graph的构建可以实现形象化,大部分工作都在如何对组件之间的端口进行连接。...但用akka GraphDSL可以很形象的组合这个数据流图; import GraphDSL.Implicits._ RunnableGraph.fromGraph(GraphDSL.create...(_ > 1)).to(sink) 和scalaz-stream不同的还有akka-stream的运算是在actor上进行的,除了大家都能对数据流元素进行处理之外,akka-stream还可以通过actor

1K100

Akka(20): Stream:异步运算,压力缓冲-Async, batching backpressure and buffering

因为akka-stream已经在上下游环节全部实现了Reactive-Streams-Specification,所以上下游之间可以进行互动,这样就可以在akka-stream里由下游通知上游自身可接收数据的状态来控制上游数据流速...(0)(_ + _)) // wire an atomic sink to the nestedFlow .withAttributes(name("nestedSink") and inputBuffer...ActorMaterializerSettings(sys) .withInputBuffer(1,1) ) case class Tick() RunnableGraph.fromGraph(GraphDSL.create...() { implicit b => import GraphDSL.Implicits._ // this is the asynchronous stage in this graph...这时我们会发现输出端Seq长度代表ZipWith消耗数据的延迟间隔。注意:前面3个输出好像没有延迟,这是akka-stream 预读prefetch造成的。

85370

响应式编程的实践

粗略看来,这些操作皆为函数式的编程接口FP的角度看,我们甚至可以将Source视为一个monad。而站在Java编程的角度看,我们则很容易将Source视为等同于集合的数据结构。...如果我们创建的流A与流B并不包含uri到user的转换,就可以通过merge等合并操作将A与B合并,然后再共同重用uri到user的转换。...编程实践看,lambda表达式本身就应该保持微小的粒度。这时,就应该将这些逻辑单独分离出来,放到单独的类与方法中。...例如,我们根据device的配置信息去调用远程服务获取设备信息,然后提取信息获得业务需要的指标,对指标进行转换,最后将转换的数据写入到数据库中。...Akka Stream的流拓扑图 Akka Stream对流处理的抽象被建模为图。这一设计思想使得流的处理变得更加直观,流的处理变成了“搭积木”游戏。

1.3K80

akka-streams - 应用角度学习:basic stream parts

实际上很早就写了一系列关于akka-streams的博客。但那个时候纯粹是为了了解akka而去学习的,主要是了解akka-streams的原理为出发点。...因为akka-streamsakka系列工具的基础,如:akka-http, persistence-query等都是基于akka-streams的,其实没有真正把akka-streams用起来。...不过,现在所处的环境还是逼迫着去真正了解akka-streams的应用场景。现状是这样的:跨入大数据时代,已经有大量的现代IT系统传统关系数据库转到分布式数据库(非关系数据库)了。...这是这次写akka-streams的初衷:希望能通过akka-streams来实现分布式数据处理编程。 先从基本流部件basic stream parts开始,即source,flow,sink。...虽然运算值不能像流元素一样流动,但akka-streams提供了机制让用户选择是否返回某个节点的运算值M。

1K10

akka-grpc - 基于akka-http和akka-streams的scala gRPC开发工具

akka-http用户来说,akka-grpc具有很大吸引(相对其它gRPC开放工具),因为它是基于akka-http的,看看下面grpc服务端的接口: // Bind service handler...至于akka-grpc基于akka-streams的特性,我并没有感到太大的兴趣。如上所述,我们的目标是实现一种开放数据平台的终端接入接口。...akka-streams恰恰把总体方案限制在了内部系统集成模式,因为服务端客户端就是akka-streams的两端,是内部系统集成的场景。...binding => println(s"gRPC server bound to: ${binding.localAddress}") } binding //#server } } 获取身份凭证...akka.grpc.scaladsl._ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import learn.akka.grpc

1.9K20

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的。...一个完整的数据流(可运行数据流)必须是一个闭合的数据流,即:外表上看,数据流两头必须连接一个Source和一个Sink。...我们可以直接把一个Sink连接到一个Source来获取一个最简单的可运行数据流,如下: Source(1 to 10).runWith(Sink.foreach(println)) 另一个角度说明...就构件自身来说需要:输入端口pull(in),对输出端口push(out)。 下面我们就示范设计一个循环产生一串指定字符的Source。...4、readN(in)(andThen):临时替换InHandler,端口读取n个数据元素,然后再恢复InHandler 5、abortEmitting():取消输出端口上未完成的数据推送 6、abortReading

1.7K80

Flink1.9整合Kafka

本文基于Flink1.9版本简述如何连接Kafka。 流式连接器 我们知道可以自己来开发Source 和 Sink ,但是一些比较基本的 Source 和 Sink 已经内置在 Flink 里。...预定义的sink支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。 连接器可以和多种多样的第三方系统进行交互。...目前支持以下系统: Apache Kafka Apache Cassandra(sink) Amazon Kinesis Streams(source/sink) Elasticsearch(sink)...Apache Bahir 中定义了其他一些连接器 Apache ActiveMQ(source/sink) Apache Flume(sink) Redis(sink) Akka (sink) Netty...在这种场景下,如果对数据的读操作远少于写操作,可以让外部应用 Flink 拉取所需的数据,需要用到Flink的可查询状态接口

2.1K31

Flink Data Source

一、内置 Data Source Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下: 1.1...第一个参数用于定义迭代器,第二个参数用于定义输出元素的类型。...当前内置连接器的支持情况如下: Apache Kafka (支持 source 和 sink) Apache Cassandra (sink) Amazon Kinesis Streams (source...) Apache Flume (sink) Redis (sink) Akka (sink) Netty (source) 随着 Flink 的不断发展,可以预见到其会支持越来越多类型的连接器,关于连接器的后续发展情况...测试结果 在 Producer 上输入任意测试数据,之后观察程序控制台的输出: 程序控制台的输出如下: 可以看到已经成功接收并打印出相关的数据。

1.1K20

Flink1.9整合Kafka实战

本文基于Flink1.9版本简述如何连接Kafka。 流式连接器 ? 我们知道可以自己来开发Source 和 Sink ,但是一些比较基本的 Source 和 Sink 已经内置在 Flink 里。...预定义的sink支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。 连接器可以和多种多样的第三方系统进行交互。...目前支持以下系统: Apache Kafka Apache Cassandra(sink) Amazon Kinesis Streams(source/sink) Elasticsearch(sink)...Apache Bahir 中定义了其他一些连接器 Apache ActiveMQ(source/sink) Apache Flume(sink) Redis(sink) Akka (sink) Netty...在这种场景下,如果对数据的读操作远少于写操作,可以让外部应用 Flink 拉取所需的数据,需要用到Flink的可查询状态接口

77020

Flutter响应式编程:Streams和BLoC

换句话说,UI组件应该只关心UI事物而不关心业务, 依赖Streams独家使用输入(Sink)和输出(流), 保持平台独立, 保持环境独立。...第三,自由组织布局 由于使用了Streams,你现在可以独立于业务逻辑组织布局。 可以应用程序中的任何位置启动任何操作:只需调用.incrementCounter sink即可。...为了在每个BLoC中强制执行dispose()方法,所有BLoC都必 须实现BlocBase接口。...关于BLoC的个人建议 与BLoC相关的第三条规则是:“依赖于Streams对输入(Sink)和输出(stream)的独占使用”。 我的个人经历稍微关系到这个说法......让我解释一下。...不同BLoCs / Streams的编排 下图显示了如何使用主要3个BLoC: 在BLoC的左侧,哪些组件调用Sink 在右侧,哪些组件监听流 例如,当MovieDetailsWidget调用inAddFavorite

4.1K90
领券