FunDA(9)- Stream Source:reactive data streams

    上篇我们讨论了静态数据源(Static Source, snapshot)。这种方式只能在预知数据规模有限的情况下使用,对于超大型的数据库表也可以说是不安全的资源使用方式。Slick3.x已经增加了支持Reactive-Streams功能,可以通过Reactive-Streams API来实现有限内存空间内的无限规模数据读取,这正符合了FunDA的设计理念:高效、便捷、安全的后台数据处理工具库。我们在前面几篇讨论里介绍了Iteratee模式,play-iteratees支持Reactive-Streams并且提供与Slick3.x的接口API,我们就在这篇讨论里介绍如何把Slick-Reactive-Streams转换成fs2-Streams。根据Slick官方文档:Slick可以通过db.stream函数用Reactive-Stream方式来读取后台数据,具体的配置如下:

  val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
  val action = queryAction.withStatementParameters(fetchSize = 512)
  val publisher = db.stream(disableAutocommit andThen action)

首先,我们需要取消自动提交(disableAutocommit)。fetchSize是缓存数据页长度(每批次读取数据字数),然后用db.stream来构成一个Reactive-Streams标准的数据源publisher。Slick官方网页只提供了下面这个使用publisher的例子:

  val fut = publisher.foreach(s => println(s))
  Await.ready(fut,Duration.Inf)

除了数据枚举外就没什么用处,也无法提供更细节点的示范。FunDA的具体解决方案是把publisher转换成play-iteratee的Enumerator。play-iteratee支持Reactive-Streams,所以这个Enumerator应该具备协调后台数据和内存缓冲之间关系(back-pressure)的功能。play-iteratee是如下构建Enumerator的;

import play.api.libs.iteratee._
val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

enumerator从后台数据库表中产生的数据源通过Iteratee把数据元素enqueue推送给一个fs2的queue:

    private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont {
      case Input.EOF => {
        q.enqueue1(None).unsafeRun
        Done((), Input.Empty)
      }
      case Input.Empty => pushData(q)
      case Input.El(e) => {
        q.enqueue1(Some(e)).unsafeRun
        pushData(q)
      }
    }

然后fs2进行dequeue后生成fs2的Stream:

      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
        pipe.unNoneTerminate(q.dequeue)
      }

整个构建Stream的过程在FunDA的fdasources包是这样定义的:

package com.bayakala.funda.fdasources
import fs2._
import play.api.libs.iteratee._
import com.bayakala.funda.fdapipes._
import slick.driver.JdbcProfile

object FDADataStream {

  class FDAStreamLoader[SOURCE, TARGET](slickProfile: JdbcProfile, convert: SOURCE => TARGET) {

    import slickProfile.api._

    def fda_typedStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[TARGET] = {
      val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
      val action_ = action.withStatementParameters(fetchSize = fetchSize)
      val publisher = slickDB.stream(disableAutocommit andThen action)
      val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
        pipe.unNoneTerminate(q.dequeue).map {row => convert(row)}
      }

    }
    def fda_plainStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[SOURCE] = {
      val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
      val action_ = action.withStatementParameters(fetchSize = fetchSize)
      val publisher = slickDB.stream(disableAutocommit andThen action)
      val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
        pipe.unNoneTerminate(q.dequeue)
      }

    }
    private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont {
      case Input.EOF => {
        q.enqueue1(None).unsafeRun
        Done((), Input.Empty)
      }
      case Input.Empty => pushData(q)
      case Input.El(e) => {
        q.enqueue1(Some(e)).unsafeRun
        pushData(q)
      }
    }

  }
  object FDAStreamLoader {
    def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDAStreamLoader[SOURCE, TARGET] =
      new FDAStreamLoader[SOURCE, TARGET](slickProfile, converter)
  }
}

FDADataStream对象内主要实现了fda_typedStream和fda_plainStream。fda_typedStream提供了SOURCE=>TARGET的转换。从Enumerator转换到Stream整个过程和原理我们在FunDA(7)里已经详细介绍过了。下面我们看看FunDA-Example中fda_typedStream的具体应用例子:

package com.bayakala.funda.fdasources.examples
import slick.driver.H2Driver.api._
import com.bayakala.funda.fdasources.FDADataStream._
import com.bayakala.funda.samples._
import com.bayakala.funda.fdarows._
import com.bayakala.funda.fdapipes._
import FDANodes._
import FDAValves._
object Example2 extends App {
   val albums = SlickModels.albums
   val companies = SlickModels.companies

//数据源query
   val albumsInfo = for {
     (a,c) <- albums join companies on (_.company === _.id)
   } yield (a.title,a.artist,a.year,c.name)

//query结果强类型(用户提供)
  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
//转换函数(用户提供)
  def toTypedRow(row: (String, String, Option[Int], String)): Album =
    Album(row._1, row._2, row._3.getOrElse(2000), row._4)

  val db = Database.forConfig("h2db")

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)
  val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(512,128)

//定义一个用户作业函数:列印数据内容
  def printAlbums: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        println("____________________")
        println(s"品名:${album.title}")
        println(s"演唱:${album.artist}")
        println(s"年份:${album.year}")
        println(s"发行:${album.publisher}")
        fda_next(album)
      case _ => fda_skip
    }
  }

  albumStream.through(fda_execUserTask(printAlbums)).run.unsafeRun

}

运算结果:

品名:Keyboard Cat's Greatest Hits
演唱:Keyboard Cat
年份:1999
发行:Sony Music Inc
____________________
品名:Spice
演唱:Spice Girls
年份:1999
发行:Columbia Records
____________________
品名:Whenever You Need Somebody
演唱:Rick Astley
年份:1999
发行:Sony Music Inc
____________________
品名:The Triumph of Steel
演唱:Manowar
年份:1999
发行:The K-Pops Singers
____________________
品名:Believe
演唱:Justin Bieber
年份:1999
发行:Columbia Records

Process finished with exit code 0

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊spring cloud的EurekaServerInitializerConfiguration

本文主要研究一下spring cloud的EurekaServerInitializerConfiguration

1022
来自专栏温安适的blog

2个小bug,有点小门道

2017年的某日,小辉(我的同事)遇到了一个bug,解决了一下午还是没有找到,气的摔键盘,骂人,我看在眼里,急在心中。

4217
来自专栏一个会写诗的程序员的博客

13.12 Spring Boot集成Security中遇到的问题13.12 Spring Boot集成Security中遇到的问题问题1:Spring Boot集成Security使用数据库用户角色

sql语法手误。1?这地方写错了,应该是?1。这在敲代码的时候,手速一旦稍有不慎,就会导致前后顺序颠倒,而导致输入错误。这个虽然说是“低级错误”,但是错误搞起来...

742
来自专栏吴小龙同學

android之获取手机信息

android获取手机信息(号码,内存,CPU,分辨率,MAC,IP,SD卡,IMEI,经纬度,信号强度等等) 1 2 3 4 5 6 7 8 9 10 11 ...

3557
来自专栏菩提树下的杨过

java学习:Hibernate学习-用oracle sequence序列生成ID的配置示例

接上回继续,TMP_EMP中的ID是根据序列SQ_TMP_EMP来生成的,需要在TmpEmp.hbm.xml中设置:   <id name="id" type=...

1809
来自专栏Netkiller

Spring Cloud Netflix

本文节选自《Netkiller Java 手札》 http://www.netkiller.cn 12.2. Spring Cloud Netflix 12....

2767
来自专栏服务端技术杂谈

dubbo源码学习笔记----Provider和Consumer

provider <!-- provider's application name, used for tracing dependency relat...

2516
来自专栏流媒体人生

ATL源码学习3---接口的查询支持

b. _InternalQueryInterface函数调用InternalQueryInterface函数,定义在BEGIN_COM_MAP宏内部

663
来自专栏吴老师移动开发

【iOS开发】关于iOS统计埋点

对于一个移动App来说,统计用户的使用习惯已经是一个最基本的需求了。本文要讲的不是教你如何去实现一个统计模块,毕竟大部分的公司不会自己去开发一套统计系统。这里要...

2013
来自专栏一个会写诗的程序员的博客

第16章 Spring Boot + Kotlin: 下一代 Java 服务端开发

2017-11-22 11:55:17.205 INFO 14721 --- [ main] org.hibernate.Version ...

731

扫码关注云+社区