FunDA(12)- 示范:强类型数据源 - strong typed data sources

    FunDA设计的主要目的是解决FRM(Functional Relation Mapping)如Slick这样的批次型操作工具库数据源行间游动操作的缺失问题。FRM产生的结果集就是一种静态集合,缺乏动态更新操作方式。FunDA提出的解决方案是把FRM产生的静态集合转变成动态流(stream),流内元素代表数据行(data row),一个完整的数据流代表一连串的数据行。用户可以利用数据流和FunDA提供的函数组件在数据流中游动进行数据更新操作。FunDA的数据流只支持单向游动(fda_next),但FunDA的数据流支持多种类型的数据元素,包括:数据行(data row)和指令行(action row)。指令行ActionRow是由Slick-DBIOAction构成,可以发送回后台数据库更新数据。FunDA可以通过函数组件从数据行中产生新数据行或者指令行并且在数据流的任何位置运算用户提供的功能函数,使其能使用该位置的数据行进行数据更新或者数据(指令)行产生操作。我们将在下面几个章节进行FunDA功能的使用示范。

    Slick运算Query返回的结果集合内的数据行类型一般是Tuple类型。因为无法使用字段名,是弱类型。除了从方便使用角度考虑,还因为FunDA开发是基于Scala函数式编程模式的,静态类型系统(static type system)对类型要求比较严格,所以FunDA的数据流内元素必须是强类型的,大部分是case class类型。这样用户可以使用名称来调用数据字段来进行数据处理编程。下面我们就示范一下如何把Slick的数据结果集合转变成强类型数据流:

从世界银行公开数据网站下载了一份美国州县空气质量报告原始数据,cvs格式的,30万条左右。导入h2数据库后作为示范数据。下面是示范数据表结构:

import slick.driver.H2Driver.api._

object Models {

  //表字段对应模版
  case class AQMRawModel(mid: String
                         , mtype: String
                         , state: String
                         , fips: String
                         , county: String
                         , year: String
                         , value: String)

  //表结构: 定义字段类型, * 代表结果集字段
  class AQMRawTable(tag: Tag) extends Table[AQMRawModel](tag, "AIRQM") {
    def mid = column[String]("MEASUREID")
    def mtype = column[String]("MEASURETYPE")
    def state = column[String]("STATENAME")
    def fips = column[String]("COUNTYFIPS")
    def county = column[String]("COUNTYNAME")
    def year = column[String]("REPORTYEAR")
    def value = column[String]("VALUE")

    def * = (mid,mtype,state,fips,county,year,value) <> (AQMRawModel.tupled, AQMRawModel.unapply)
  }

  //库表实例
  val AQMRawQuery = TableQuery[AQMRawTable]

}

下面是这个示范软件的sbt设置文件build.sbt:

name := "funda-demo"

version := "1.0"

scalaVersion := "2.11.8"

resolvers += Resolver.mavenLocal

libraryDependencies ++= Seq(
  "com.typesafe.slick" %% "slick" % "3.1.1",
  "com.typesafe.slick" %% "slick-testkit" % "3.1.1" % "test",
  "org.slf4j" % "slf4j-nop" % "1.7.21",
  "com.h2database" % "h2" % "1.4.191",
  "com.typesafe.slick" %% "slick-hikaricp" % "3.1.1",
  "com.bayakala" % "funda_2.11" % "1.0.0-SNAPSHOT" withSources() withJavadoc()
)

数据库设置在前面Slick系列讨论里已经示范过了。在这里就不再多说了。

强类型转换可以在读取数据库时进行,生成强类型元素的数据流。或者在使用数据流时即时转换。我们先看看如何构建强类型元素数据流:

  val aqmraw = Models.AQMRawQuery

  val db = Database.forConfig("h2db")
// aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
// user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
// strong typed resultset conversion function. declared implicit to remind during compilation
  implicit def toTypedRow(row: (String,String,String,String)): TypedRow =
    TypedRow(row._1,row._2,row._3,row._4)

在读取数据库前用户提供强类型结构case class TypedRow, 及Seq[(...)]到TypeRow类型转换函数toTypedRow,如上。在构建数据读取工具类FDAViewLoader时提供这个转换函数:

// loader to read from database and convert result collection to strong typed collection
  val viewLoader = FDAViewLoader(slick.driver.H2Driver)(toTypedRow _)
  val dataSeq = viewLoader.fda_typedRows(aqmQuery.result)(db).toSeq

现在这个dataSeq是Seq[TypedRow]类型了。用dataSeq构建静态数据流:

// turn Seq collection into fs2 stream
  val aqmStream =  fda_staticSource(dataSeq)()()

fd_staticSource是基于bracket函数的资源使用模式:

  /**
    * produce a static view source from a Seq[ROW] collection using famous 'bracket'
    * provide facade to error handling and cleanup
    * @param acquirer       the Seq[ROW] collection
    * @param errhandler     error handle callback
    * @param finalizer      cleanup callback
    * @tparam ROW           type of row
    * @return               a new stream
    */
  def fda_staticSource[ROW](acquirer: => Seq[ROW])(
                            errhandler: Throwable => FDAPipeLine[ROW] = null)(
                            finalizer: => Unit = ()): FDAPipeLine[ROW] = {...}

上面的调用省略了异常和事后处理。下面的例子示范了完整的调用:

  val safeSource = fda_staticSource(dataSeq) {
    case e: Exception => fda_appendRow(FDAErrorRow(new Exception(e)))
  }(println("the end finally!"))

在这个调用例子里如果出现异常,新数据流状态是一个代表异常的元素类型。无论在正常完成或中断情况下都会显示“the end finally!“信息。

aqmStream是个一个以TypedRow为元素的强类型数据流。我们可以在组件函数里使用字段名:

  // use stream combinators with field names
  aqmStream.filter{r => r.year > "1999"}.take(3).appendTask(showRecord).startRun

当然我们也可以在用户定义的任务FDAUserTask函数中调用字段名:

// now access fields in the strong typed resultset
  def showRecord: FDAUserTask[FDAROW] = row => {
    row match {
      case qmr: TypedRow =>
        println(s"州名: ${qmr.state}")
        println(s"县名:${qmr.county}")
        println(s"年份:${qmr.year}")
        println(s"取值:${qmr.value}")
        println("-------------")
        fda_skip
      case _ => fda_skip
    }
  }

运算aqmStream得出以下结果:

州名: Ohio
县名:Stark
年份:2013
取值:0
-------------
州名: New Mexico
县名:Lea
年份:2002
取值:0
-------------
州名: Texas
县名:Bowie
年份:2003
取值:0
-------------

Process finished with exit code 0

我们也可以先构建一个弱类型数据流后再用map来把它转换成强类型的,如下:

  val allState = aqmraw.map(_.state)
  val stateLoader = FDAViewLoader[String,String](slick.driver.H2Driver)()
  val stateSeq = stateLoader.fda_plainRows(allState.distinct.result)(db).toSeq
  val stateStream =  fda_staticSource(stateSeq)()()
  case class StateRow(state: String) extends FDAROW
  def showState: FDAUserTask[FDAROW] = row => {
     row match {
       case StateRow(sname) =>
         println(s"州名称:$sname")
         fda_skip
       case _ => fda_skip
     }
  }
  stateStream.map{s => StateRow(s)}
    .filter{r => r.state > "Alabama"}.take(3)
    .appendTask(showState).startRun

allState返回结果类型Seq[String]。注意如果没有提供类型转换函数来辅助类型推导就必须在构建FDAViewLoader时提供SOURCE和TARGET类型参数。stateStream是一个弱类型的数据流,我们用map{s => StateRow(s))把流元素转换成StateRow类型。运算stateStream结果为:

州名称:North Dakota
州名称:Maryland
州名称:Louisiana

Process finished with exit code 0

上面的示范例子我们可以用Reactive-Streams方式实现,如下:

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver)(toTypedRow _)
  val streamSource = streamLoader.fda_typedStream(aqmQuery.result)(db)(
    10.seconds,512,512)()()
  streamSource.filter{r => r.year > "1999"}.take(3).appendTask(showRecord).startRun

  val stateStreamLoader = FDAStreamLoader[String,String](slick.driver.H2Driver)()
  val stateStreamSource = stateStreamLoader.fda_plainStream(allState.distinct.result)(db)(
    10.seconds,512,512)()()

  //first convert to StateRows to turn Stream[Task,FDAROW] typed stream
  stateStreamSource.map{s => StateRow(s)}
    .filter{r => r.state > "Alabama"}.take(3)
    .appendTask(showState).startRun
}

fda_typeStream产生强类型元素的数据流。它的函数款式是这样的:

   /**
      * returns a reactive-stream from Slick DBIOAction result
      * using play-iteratees and fs2 queque to connect to slick data stream publisher
      * provide facade for error handler and finalizer to support exception and cleanup handling
      * also provide stream element conversion from SOURCE type to TARGET type
      * @param action       a Slick DBIOAction to produce query results
      * @param slickDB      Slick database object
      * @param maxInterval  max time wait on iteratee to consume of next element
      *                     exceeding presumed streaming failure or completion
      *                     use 0.milli to represent infinity
      *                     inform enumerator to release its resources
      * @param fetchSize    number of rows cached during database read
      * @param queSize      size of queque used by iteratee as cache to pass elements to fs2 stream
      * @param errhandler   error handler callback
      * @param finalizer    cleanup callback
      * @param convert      just a measure to guarantee conversion function is defined
      *                     when this function is used there has to be a converter defined
      *                     implicitly in compile time
      * @return             a reactive-stream of TARGET row type elements
      */
    def fda_typedStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(
      slickDB: Database)(
      maxInterval: FiniteDuration, fetchSize: Int, queSize: Int)(
      errhandler: Throwable => FDAPipeLine[TARGET] = null)(
      finalizer: => Unit = ())(
      implicit convert: SOURCE => TARGET): FDAPipeLine[TARGET] = {...}

注意maxInterval,fetchSize,queSize这几个参数的用途。上面这个streaming的示范例子产生相同结果。

下面是示范源代码:

import slick.driver.H2Driver.api._
import com.bayakala.funda._
import API._
import scala.language.implicitConversions
import scala.concurrent.duration._

object StrongTypedSource extends App {

  val aqmraw = Models.AQMRawQuery

  val db = Database.forConfig("h2db")
// aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
// user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
// strong typed resultset conversion function. declared implicit to remind during compilation
  implicit def toTypedRow(row: (String,String,String,String)): TypedRow =
    TypedRow(row._1,row._2,row._3,row._4)
// loader to read from database and convert result collection to strong typed collection
  val viewLoader = FDAViewLoader(slick.driver.H2Driver)(toTypedRow _)
  val dataSeq = viewLoader.fda_typedRows(aqmQuery.result)(db).toSeq
// turn Seq collection into fs2 stream
  val aqmStream =  fda_staticSource(dataSeq)()()
// now access fields in the strong typed resultset
  def showRecord: FDAUserTask[FDAROW] = row => {
    row match {
      case qmr: TypedRow =>
        println(s"州名: ${qmr.state}")
        println(s"县名:${qmr.county}")
        println(s"年份:${qmr.year}")
        println(s"取值:${qmr.value}")
        println("-------------")
        fda_skip
      case _ => fda_skip
    }
  }
  // use stream combinators with field names
  aqmStream.filter{r => r.year > "1999"}.take(3).appendTask(showRecord).startRun

  val allState = aqmraw.map(_.state)
  //no converter to help type inference. must provide type parameters explicitly
  val stateLoader = FDAViewLoader[String,String](slick.driver.H2Driver)()
  val stateSeq = stateLoader.fda_plainRows(allState.distinct.result)(db).toSeq
  //constructed a Stream[Task,String]
  val stateStream =  fda_staticSource(stateSeq)()()
  //strong typed row type. must extend FDAROW
  case class StateRow(state: String) extends FDAROW
  def showState: FDAUserTask[FDAROW] = row => {
     row match {
       case StateRow(sname) =>
         println(s"州名称:$sname")
         fda_skip
       case _ => fda_skip
     }
  }
  //first convert to StateRows to turn Stream[Task,FDAROW] typed stream
  stateStream.map{s => StateRow(s)}
    .filter{r => r.state > "Alabama"}.take(3)
    .appendTask(showState).startRun


  val streamLoader = FDAStreamLoader(slick.driver.H2Driver)(toTypedRow _)
  val streamSource = streamLoader.fda_typedStream(aqmQuery.result)(db)(
    10.seconds,512,512)()()
  streamSource.filter{r => r.year > "1999"}.take(3).appendTask(showRecord).startRun

  val stateStreamLoader = FDAStreamLoader[String,String](slick.driver.H2Driver)()
  val stateStreamSource = stateStreamLoader.fda_plainStream(allState.distinct.result)(db)(
    10.seconds,512,512)()()

  //first convert to StateRows to turn Stream[Task,FDAROW] typed stream
  stateStreamSource.map{s => StateRow(s)}
    .filter{r => r.state > "Alabama"}.take(3)
    .appendTask(showState).startRun
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大数据钻研

44 个 JavaScript 变态题解析

当初笔者做这套题的时候不仅怀疑智商, 连人生都开始怀疑了…. 不过, 对于基础知识的理解是深入编程的前提. 让我们一起来看看这些变态题到底变态不变态吧! 第1题...

3296
来自专栏静晴轩

JavaScript 字符串实用常操纪要

JavaScript 字符串用于存储和处理文本。因此在编写 JS 代码之时她总如影随形,在你处理用户的输入数据的时候,在读取或设置 DOM 对象的属性时,在操作...

3607
来自专栏web前端教室

聊一下JavaScript定时器

image.png 话说JS的定时器,常用的其实就是setTimeout和setInterval这二个。它们俩一个是运行一次就拉倒,另一个是你不叫我停我就一...

1869
来自专栏芋道源码1024

数据库分库分表中间件 Sharding-JDBC 源码分析 —— 分布式主键

本文主要基于 Sharding-JDBC 1.5.0 正式版 1. 概述 2.KeyGenerator 2.1 DefaultKeyGenerator 2.2...

40714
来自专栏WebHub

typeof最新原理解析

我们都知道 typeof(null) === 'object',关于原因,在小黄书《你不知道的JavaScript》中有这么一段解释:

761
来自专栏Elasticsearch实验室

Elasitcsearch 底层系列 Lucene 内核解析之 Doc Value

       Elasticsearch 支持行存和列存,行存用于以文档为单位顺序存储多个文档的原始内容,在 Elasitcsearch 底层系列 Lucene...

984
来自专栏贺贺的前端工程师之路

Angular2 之 单元测试

Angular的测试工具类包含了TestBed类和一些辅助函数方法,当时这不是唯一的,你可以不依赖Angular 的DI(依赖注入)系统,自己new出来测试类的...

712
来自专栏平凡文摘

Netty 实现原理浅析

1313
来自专栏从流域到海域

《笨办法学Python》 第38课手记

《笨办法学Python》 第38课手记 注意这是第三版的《笨办法学Python》的内容,我后来发现第三版存在很大的问题,就放弃了第三版开始使用第四版,第四版的第...

2378
来自专栏精讲JAVA

Netty 实现原理浅析

(点击上方公众号,可快速关注) 来源:kafka0102的博客 , www.kafka0102.com/2010/06/167.html Netty是JBoss...

2238

扫码关注云+社区