Akka(19): Stream:组合数据流,组合共用-Graph modular composition

   akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据流图如:Source/Flow/Sink,也可能是由更基础的流图组合而成相对复杂点的某种复合流图,而这个复合流图本身又可以被当作组件来组合更大的Graph。因为Graph只是对数据流运算的描述,所以它是可以被重复利用的。所以我们应该尽量地按照业务流程需要来设计构建Graph。在更高的功能层面上实现Graph的模块化(modular)。按上回讨论,Graph又可以被描述成一种黑盒子,它的入口和出口就是Shape,而内部的作用即处理步骤Stage则是用GraphStage来形容的。下面是akka-stream预设的一些基础数据流图:

上面Source,Sink,Flow代表具备线性步骤linear-stage的流图,属于最基础的组件,可以用来构建数据处理链条。而Fan-In合并型,Fan-Out扩散型则具备多个输入或输出端口,可以用来构建更复杂的数据流图。我们可以用以上这些基础Graph来构建更复杂的复合流图,而这些复合流图又可以被重复利用去构建更复杂的复合流图。下面就是一些常见的复合流图:

注意上面的Composite Flow(from Sink and Source)可以用Flow.fromSinkAndSource函数构建:

def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
    fromSinkAndSourceMat(sink, source)(Keep.none)

这个Flow从流向来说先Sink再Source是反的,形成的Flow上下游间无法协调,即Source端终结信号无法到达Sink端,因为这两端是相互独立的。我们必须用CoupledTermination对象中的fromSinkAndSource函数构建的Flow来解决这个问题:

/**
 * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them.
 * Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages.
 */
object CoupledTerminationFlow {
  @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2")
  def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =
    Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)

从上面图列里的Composite BidiFlow可以看出:一个复合Graph的内部可以是很复杂的,但从外面看到的只是简单的几个输入输出端口。不过Graph内部构件之间的端口必须按照功能逻辑进行正确的连接,剩下的就变成直接向外公开的界面端口了。这种机制支持了层级式的模块化组合方式,如下面的图示:

最后变成:

在DSL里我们可以用name("???")来分割模块:

val nestedFlow =
  Flow[Int].filter(_ != 0) // an atomic processing stage
    .map(_ - 2) // another atomic processing stage
    .named("nestedFlow") // wraps up the Flow, and gives it a name

val nestedSink =
  nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
    .named("nestedSink") // wrap it up

// Create a RunnableGraph
val runnableGraph = nestedSource.to(nestedSink)

在下面这个示范里我们自定义一个某种功能的流图模块:它有2个输入和3个输出。然后我们再使用这个自定义流图模块组建一个完整的闭合流图:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import scala.collection.immutable

object GraphModules {
  def someProcess[I, O]: I => O = i => i.asInstanceOf[O]

  case class TwoThreeShape[I, I2, O, O2, O3](
                                              in1: Inlet[I],
                                              in2: Inlet[I2],
                                              out1: Outlet[O],
                                              out2: Outlet[O2],
                                              out3: Outlet[O3]) extends Shape {

    override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil

    override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil

    override def deepCopy(): Shape = TwoThreeShape(
      in1.carbonCopy(),
      in2.carbonCopy(),
      out1.carbonCopy(),
      out2.carbonCopy(),
      out3.carbonCopy()
    )
  }
//a functional module with 2 input 3 output
  def TwoThreeGraph[I, I2, O, O2, O3] = GraphDSL.create() { implicit builder =>
    val balancer = builder.add(Balance[I](2))
    val flow = builder.add(Flow[I2].map(someProcess[I2, O2]))

    TwoThreeShape(balancer.in, flow.in, balancer.out(0), balancer.out(1), flow.out)
  }

  val closedGraph = GraphDSL.create() {implicit builder =>
    import GraphDSL.Implicits._
    val inp1 = builder.add(Source(List(1,2,3))).out
    val inp2 = builder.add(Source(List(10,20,30))).out
    val merge = builder.add(Merge[Int](2))
    val mod23 = builder.add(TwoThreeGraph[Int,Int,Int,Int,Int])

     inp1 ~> mod23.in1
     inp2 ~> mod23.in2
     mod23.out1 ~> merge.in(0)
     mod23.out2 ~> merge.in(1)
     mod23.out3 ~> Sink.foreach(println)
     merge ~> Sink.foreach(println)
     ClosedShape

  }
}

object TailorGraph extends App {
  import GraphModules._

  implicit val sys = ActorSystem("streamSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer()

  RunnableGraph.fromGraph(closedGraph).run()

  scala.io.StdIn.readLine()
  sys.terminate()


}

这个自定义的TwoThreeGraph是一个复合的流图模块,是可以重复使用的。注意这个~>符合的使用:akka-stream只提供了对预设定Shape作为连接对象的支持如:

      def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
      def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
      def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
      def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit =
        b.addEdge(importAndGetPort(b), b.add(to).in)

      def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit =
        b.addEdge(importAndGetPort(b), to.in)
...

所以对于我们自定义的TwoThreeShape就只能使用直接的端口连接了:

   def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit =
        b.addEdge(importAndGetPort(b), to)

以上的过程显示:通过akka的GraphDSL,对复合型Graph的构建可以实现形象化,大部分工作都在如何对组件之间的端口进行连接。我们再来看个较复杂复合流图的构建过程,下面是这个流图的图示:

可以说这是一个相对复杂的数据处理方案,里面甚至包括了数据流回路(feedback)。无法想象如果用纯函数数据流如scalaz-stream应该怎样去实现这么复杂的流程,也可能根本是没有解决方案的。但用akka GraphDSL可以很形象的组合这个数据流图;

  import GraphDSL.Implicits._
  RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    val A: Outlet[Int]                  = builder.add(Source.single(0)).out
    val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
    val C: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
    val D: FlowShape[Int, Int]          = builder.add(Flow[Int].map(_ + 1))
    val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
    val F: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
    val G: Inlet[Any]                   = builder.add(Sink.foreach(println)).in

    C     <~      F
    A  ~>  B  ~>  C     ~>      F
    B  ~>  D  ~>  E  ~>  F
    E  ~>  G

    ClosedShape
  })

另一个端口连接方式的版本如下:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  val B = builder.add(Broadcast[Int](2))
  val C = builder.add(Merge[Int](2))
  val E = builder.add(Balance[Int](2))
  val F = builder.add(Merge[Int](2))

  Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)
  C.in(0) <~ F.out

  B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)
  E.out(1) ~> Sink.foreach(println)
  ClosedShape
})

如果把上面这个复杂的Graph切分成模块的话,其中一部分是这样的:

这个开放数据流复合图可以用GraphDSL这样构建:

val partial = GraphDSL.create() { implicit builder =>
    val B = builder.add(Broadcast[Int](2))
    val C = builder.add(Merge[Int](2))
    val E = builder.add(Balance[Int](2))
    val F = builder.add(Merge[Int](2))

    C  <~  F
    B  ~>                            C  ~>  F
    B  ~>  Flow[Int].map(_ + 1)  ~>  E  ~>  F
    FlowShape(B.in, E.out(1))
  }.named("partial")

模块化的完整Graph图示如下:

这部分可以用下面的代码来实现:

// Convert the partial graph of FlowShape to a Flow to get
// access to the fluid DSL (for example to be able to call .filter())
val flow = Flow.fromGraph(partial)

// Simple way to create a graph backed Source
val source = Source.fromGraph( GraphDSL.create() { implicit builder =>
  val merge = builder.add(Merge[Int](2))
  Source.single(0)      ~> merge
  Source(List(2, 3, 4)) ~> merge

  // Exposing exactly one output port
  SourceShape(merge.out)
})

// Building a Sink with a nested Flow, using the fluid DSL
val sink = {
  val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")
  nestedFlow.to(Sink.head)
}

// Putting all together
val closed = source.via(flow.filter(_ > 1)).to(sink)

和scalaz-stream不同的还有akka-stream的运算是在actor上进行的,除了大家都能对数据流元素进行处理之外,akka-stream还可以通过actor的内部状态来维护和返回运算结果。这个运算结果在复合流图中传播的过程是可控的,如下图示:

返回运算结果是通过viaMat, toMat来实现的。简写的via,to默认选择流图左边运算产生的结果。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏章鱼的慢慢技术路

用OpenGL实现跳跃的立体小球

1572
来自专栏自然语言处理

结巴中文分词原理分析1

更改分词器(默认为 jieba.dt)的 tmp_dir 和 cache_file 属性,可分别指定缓存文件所在的文件夹及其文件名,用于受限的文件系统。

1583
来自专栏杨建荣的学习笔记

shell中echo的显示格式 (r5笔记第58天)

有时候使用shell就是为了达到简化工作的目的,其实在shell本身强大的功能下,其实还可以更好一些,功能再好,如果界面有时候不够美观,清晰,效果也会受到直接影...

3005
来自专栏韩伟的专栏

Unity 2D 手册部分翻译

Unity 2D 原文地址 ? 本章包括Unity 2D的文档 你可以从以下 文档 得到如何切换2D/3D模式的更多细节。 参看 2D和3D项目 了解使用2D或...

3365
来自专栏菩提树下的杨过

Silverlight的自定义tooltip提示工具条

这种应用场景其实很多,比如游戏中装备/魔法的选择菜单,这里借用了"深蓝色右手"的一张图  再比如聊天室中的文本颜色设置  ? 虽然sl的ToolTipServ...

2126
来自专栏王亚昌的专栏

FFMPEG-如何对视频按时长切片与压缩

本文介绍如何用ffmpeg开源组件按时长进行切片,举一个例子,一个视频网站,拿到一个时长1.5小时的电影,用户点击播放时,常用的技术方案就是把一个完整的大文件,...

751
来自专栏数据小魔方

数据地图系列6|Stata数据地图(下)

今天要跟大家分享的是数据地图系列6——Stata数据地图(下)! 接着前一篇的节凑,这一篇会给大家介绍比较全面的Stata热力地图代码实现。 版本仍然是基于S...

4104
来自专栏黑泽君的专栏

day38_Spring学习笔记_06_CRM_02

注意:当前员工的职务所属的部门,此部门下的所有职务。代码表示:post.department.postSet editStaff.jsp

722
来自专栏函数式编程语言及工具

Akka(18): Stream:组合数据流,组件-Graph components

   akka-stream的数据流可以由一些组件组合而成。这些组件统称数据流图Graph,它描述了数据流向和处理环节。Source,Flow,Sink是最基础...

2256
来自专栏大数据钻研

手把手教你打造一个纯CSS图标库

来,干了这碗安利 写这篇文章的目的其实就是为了安利一下我的图标库:iconoo,所以,开门见山,star吧少年少妇们!(这样的我是不是应该要加个github互粉...

2934

扫码关注云+社区