Akka(27): Stream:Use case-Connecting Slick-dbStream & Scalaz-stream-fs2

 在以前的博文中我们介绍了Slick,它是一种FRM(Functional Relation Mapper)。有别于ORM,FRM的特点是函数式的语法可以支持灵活的对象组合(Query Composition)实现大规模的代码重复利用,但同时这些特点又影响了编程人员群体对FRM的接受程度,阻碍了FRM成为广为流行的一种数据库编程方式。所以我们只能从小众心态来探讨如何改善Slick现状,希望通过与某些Stream库集成,在Slick FRM的基础上恢复一些人们熟悉的Recordset数据库光标(cursor)操作方式,希望如此可以降低FRM数据库编程对函数式编程水平要求,能够吸引更多的编程人员接受FRM。刚好,在这篇讨论里我们希望能介绍一些Akka-Stream和外部系统集成对接的实际用例,把Slick数据库数据载入连接到Akka-Stream形成streaming-dataset应该是一个挺好的想法。Slick和Akka-Stream可以说是自然匹配的一对,它们都是同一个公司产品,都支持Reactive-Specification。Reactive系统的集成对象之间是通过公共界面Publisher来实现对接的。Slick提供了个Dababase.stream函数可以构建这个Publisher:

 /** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified
      * `DBIOAction` and return the result directly as a stream without buffering everything first.
      * This method is only supported for streaming actions.
      *
      * The Publisher itself is just a stub that holds a reference to the action and this Database.
      * The action does not actually start to run until the call to `onSubscribe` returns, after
      * which the Subscriber is responsible for reading the full response or cancelling the
      * Subscription. The created Publisher can be reused to serve a multiple Subscribers,
      * each time triggering a new execution of the action.
      *
      * For the purpose of combinators such as `cleanup` which can run after a stream has been
      * produced, cancellation of a stream by the Subscriber is not considered an error. For
      * example, there is no way for the Subscriber to cause a rollback when streaming the
      * results of `someQuery.result.transactionally`.
      *
      * When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row
      * is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers
      * from within `onNext`. If streaming is interrupted due to back-pressure signaling, the next
      * row will be prefetched (in order to buffer the next result page from the server when a page
      * boundary has been reached). */
    final def stream[T](a: DBIOAction[_, Streaming[T], Nothing]): DatabasePublisher[T] = streamInternal(a, false)

这个DatabasePublisher[T]就是一个Publisher[T]:

/** A Reactive Streams `Publisher` for database Actions. */
abstract class DatabasePublisher[T] extends Publisher[T] { self =>
...
}

然后Akka-Stream可以通过Source.fromPublisher(publisher)构建Akka Source构件:

  /**
   * Helper to create [[Source]] from `Publisher`.
   *
   * Construct a transformation starting with given publisher. The transformation steps
   * are executed by a series of [[org.reactivestreams.Processor]] instances
   * that mediate the flow of elements downstream and the propagation of
   * back-pressure upstream.
   */
  def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =
    fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))

理论上Source.fromPublisher(db.stream(query))就可以构建一个Reactive-Stream-Source了。下面我们就建了例子来做示范:首先是Slick的铺垫代码boiler-code:

  val aqmraw = Models.AQMRawQuery
  val db = Database.forConfig("h2db")
  // aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
  // type alias
  type RowType = (String,String,String,String)
  // user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
  // strong typed resultset conversion function. declared implicit to remind during compilation
  implicit def toTypedRow(row: RowType): TypedRow =
    TypedRow(row._1,row._2,row._3,row._4)

我们需要的其实就是aqmQuery,用它来构建DatabasePublisher:

  // construct DatabasePublisher from db.stream
  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)
  // construct akka source
  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)

有了dbPublisher就可以用Source.fromPublisher函数构建source了。现在我们试着运算这个Akka-Stream:

  implicit val actorSys = ActorSystem("actor-system")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer()

  source.take(6).map{row => toTypedRow(row)}.runWith(
    Sink.foreach(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"县名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }))

  scala.io.StdIn.readLine()
  actorSys.terminate()

下面是运算结果:

州名: Alabama
县名:Elmore
年份:1999
取值:5
-------------
州名: Alabama
县名:Jefferson
年份:1999
取值:39
-------------
州名: Alabama
县名:Lawrence
年份:1999
取值:28
-------------
州名: Alabama
县名:Madison
年份:1999
取值:31
-------------
州名: Alabama
县名:Mobile
年份:1999
取值:32
-------------
州名: Alabama
县名:Montgomery
年份:1999
取值:15
-------------

显示我们已经成功的连接了Slick和Akka-Stream。

现在我们有了Reactive stream source,它是个akka-stream,该如何对接处于下游的scalaz-stream-fs2呢?我们知道:akka-stream是Reactive stream,而scalaz-stream-fs2是纯“拖式”pull-model stream,也就是说上面这个Reactive stream source必须被动等待下游的scalaz-stream-fs2来读取数据。按照Reactive-Stream规范,下游必须通过backpressure信号来知会上游是否可以发送数据状态,也就是说我们需要scalaz-stream-fs2来产生backpressure。scalaz-stream-fs2 async包里有个Queue结构:

/**
 * Asynchronous queue interface. Operations are all nonblocking in their
 * implementations, but may be 'semantically' blocking. For instance,
 * a queue may have a bound on its size, in which case enqueuing may
 * block until there is an offsetting dequeue.
 */
trait Queue[F[_], A] { self =>
  /**
   * Enqueues one element in this `Queue`.
   * If the queue is `full` this waits until queue is empty.
   *
   * This completes after `a`  has been successfully enqueued to this `Queue`
   */
  def enqueue1(a: A): F[Unit]

  /**
   * Enqueues each element of the input stream to this `Queue` by
   * calling `enqueue1` on each element.
   */
  def enqueue: Sink[F, A] = _.evalMap(enqueue1)
  /** Dequeues one `A` from this queue. Completes once one is ready. */
  def dequeue1: F[A]
  /** Repeatedly calls `dequeue1` forever. */
  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
...
}

这个结构支持多线程操作,也就是说enqueue和dequeue可以在不同的线程里操作。值得关注的是:enqueue会block,只有在完成了dequeue后才能继续。这个dequeue就变成了抵消backpressure的有效方法了。具体操作方法是:上游在一个线程里用enqueue发送一个数据元素,然后等待下游完成在另一个线程里的dequeue操作,完成这个循环后再进行下一个元素的enqueue。enqueue代表akka-stream向scalaz-stream-fs2发送数据,可以用akka-stream的Sink构件来实现:

 class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {
  val in = Inlet[T]("inport")
  val shape = SinkShape.of(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
      override def preStart(): Unit = {
        pull(in)          //initiate stream elements movement
        super.preStart()
      }

      override def onPush(): Unit = {
        q.enqueue1(Some(grab(in))).unsafeRun()
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        q.enqueue1(None).unsafeRun()
        println("the end of stream !")
        completeStage()
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        q.enqueue1(None).unsafeRun()
        completeStage()
      }

      setHandler(in,this)

    }
}

以上这个akka-stream GraphStage描述了对上游每一个元素的enqueue动作。我们可以用scalaz-stream-fs2的flatMap来序列化运算两个线程里的enqueue和dequeue: 

   val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))
     .flatMap { q =>
       Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture  //enqueue Task(new thread)
       pipe.unNoneTerminate(q.dequeue)      //dequeue in current thread
     }

这个函数返回fs2.Stream[Task,RowType],是一种运算方案,我们必须run来实际运算:

  fs2Stream.map{row => toTypedRow(row)}
      .map(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"县名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }).run.unsafeRun

通过测试运行,我们成功的为scalaz-stream-fs2实现了data streaming。

下面是本次示范的源代码:

import slick.jdbc.H2Profile.api._
import com.bayakala.funda._
import api._

import scala.language.implicitConversions
import scala.concurrent.duration._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import slick.basic.DatabasePublisher
import akka._
import fs2._
import akka.stream.stage.{GraphStage, GraphStageLogic}


 class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {
  val in = Inlet[T]("inport")
  val shape = SinkShape.of(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
      override def preStart(): Unit = {
        pull(in)          //initiate stream elements movement
        super.preStart()
      }

      override def onPush(): Unit = {
        q.enqueue1(Some(grab(in))).unsafeRun()
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        q.enqueue1(None).unsafeRun()
        println("end of stream !!!!!!!")
        completeStage()
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        q.enqueue1(None).unsafeRun()
        completeStage()
      }

      setHandler(in,this)

    }
}

object AkkaStreamSource extends App {

  val aqmraw = Models.AQMRawQuery
  val db = Database.forConfig("h2db")
  // aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
  // type alias
  type RowType = (String,String,String,String)
  // user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
  // strong typed resultset conversion function. declared implicit to remind during compilation
  implicit def toTypedRow(row: RowType): TypedRow =
    TypedRow(row._1,row._2,row._3,row._4)
  // construct DatabasePublisher from db.stream
  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)
  // construct akka source
  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)

  implicit val actorSys = ActorSystem("actor-system")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer()

  /*
  source.take(10).map{row => toTypedRow(row)}.runWith(
    Sink.foreach(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"县名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    })) */

   val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))
     .flatMap { q =>
       Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture  //enqueue Task(new thread)
       pipe.unNoneTerminate(q.dequeue)      //dequeue in current thread
     }

  fs2Stream.map{row => toTypedRow(row)}
      .map(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"县名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }).run.unsafeRun

  scala.io.StdIn.readLine()
  actorSys.terminate()

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏idba

死锁案例之八

死锁其实是一个很有意思也很有挑战的技术问题,大概每个DBA和部分开发朋友都会在工作过程中遇见。关于死锁我会持续写一个系列的案例分析,希望能够对想了解死锁的朋友有...

552
来自专栏Spring相关

快速初步了解Neo4j与使用

Neo4j是一个高性能的,NOSQL图形数据库,它将结构化数据存储在网络上而不是表中。它是一个嵌入式的、基于磁盘的、具备完全的事务特性的Java持久化引擎,但是...

691
来自专栏函数式编程语言及工具

Akka(25): Stream:对接外部系统-Integration

   在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。所以,akka-stre...

3107
来自专栏码匠的流水账

聊聊eureka的renewal

eureka client在实例化的时候注册了一个定时任务,每隔renewalIntervalInSecs,向eureka server发送一次renewal。...

541
来自专栏菩提树下的杨过

JeffreyZhao]正确使用异步操作

本想写一点有关LINQ to SQL异步调用的话题,但是在这之前我想还是先写一篇文章来阐述一下使用异步操作的一些原则,避免有些朋友误用导致程序性能反而降低。这篇...

18810
来自专栏Spark学习技巧

textFile构建RDD的分区及compute计算策略

1,textFile A),第一点,就是输入格式,key,value类型及并行度的意义。 def textFile( path: String, mi...

1957
来自专栏逸鹏说道

Python3 与 C# 并发编程之~ 上篇

其实逆天现在Coding已经是80%变成Python了,20%才是Net,也不确定是否一直在Net界干下去,所以只能尽可能的在说新知识的同时,尽量把脑子里面Ne...

994
来自专栏一名叫大蕉的程序员

你很有想法,跟我学做菜吧No.3

我是小蕉。 今天,小蕉我就跟大家聊聊JAVA虚拟机,是个什么玩意。 通俗来讲,JAVA虚拟机的作用呢,就跟一个转换插头,一样一样的。同志们!!!有没有遇...

1808
来自专栏芋道源码1024

注册中心 Eureka 源码解析 —— 应用实例注册发现 (二)之续租

本文主要分享 Eureka-Client 向 Eureka-Server 续租应用实例的过程。

1060
来自专栏desperate633

设计模式之工厂方法模式(FACTORY METHOD)问题模拟工厂方法模式分析依赖倒置原则小结

工厂方法模式定义了一个创建对象的接口,但由子类决定要实例化的类是哪一个。工厂方法让类把实例化推迟到子类。 我们依然接着简单工厂模式提出的披萨店问题继续探讨

764

扫码关注云+社区