FunDA(2)- Streaming Data Operation:流式数据操作

   在上一集的讨论里我们介绍并实现了强类型返回结果行。使用强类型主要的目的是当我们把后端数据库SQL批次操作搬到内存里转变成数据流式按行操作时能更方便、准确、高效地选定数据字段。在上集讨论示范里我们用集合的foreach方式模拟了一个最简单的数据流,并把从数据库里批次读取的数据集转换成一串连续的数据行来逐行使用。一般来说完整的流式数据处理流程包括了从数据库中读取数据、根据读取的每行数据状态再对后台数据库进行更新,包括:插入新数据、更新、删除等。那么在上篇中实现的流式操作基础上再添加一种指令行类型就可以完善整个数据处理流程了,就像下面这个图示:

Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database 

如果我们还是以Slick为目标FRM,那么这个ActionRow的类型就是Slick的DBIO[T]了:

1 package com.bayakala.funda.rowtypes
2 import slick.dbio._
3 object ActionType {
4   type FDAAction[T] = DBIO[T]
5 }

记得有一次在一个Scala讨论区里遇到这样一个问题:如何把a表里的status字段更新成b表的status字段值,转化成SQL语句如下:

 update a,b set a.status=b.status where a.id=b.id

那位哥们的问题是如何用Slick来实现对a表的更新,不能用sql"???" interpolation 直接调用SQL语句,可能因为要求compile time语法check保障吧。这个问题用Slick Query还真的不太容易解决(能不能解决就不想费功夫去想了),这是因为FRM的SQL批次处理弱点。如果用FunDA的流式操作思路就会很容易解决了,只要用join Query把b.status读出来再用b.id=a.id逐个更新a.status。刚好,下面我们就示范通过ActionRow来解决这个问题。先用下面这段代码来设置测试数据:

 1 import slick.dbio.DBIO
 2 import slick.driver.H2Driver.api._
 3 
 4 import scala.concurrent.duration._
 5 import scala.concurrent.{Await, Future}
 6 import scala.util.{Failure, Success}
 7 import scala.concurrent.ExecutionContext.Implicits.global
 8 import slick.jdbc.meta.MTable
 9 object ActionRowTest extends App {
10 
11   class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA")  {
12     def id = column[Int]("id",O.PrimaryKey)
13     def flds = column[String]("aflds")
14     def status = column[Int]("status")
15     def * = (id,flds,status)
16   }
17   val tableA = TableQuery[ATable]
18 
19   class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB")  {
20     def id = column[Int]("id",O.PrimaryKey)
21     def flds = column[String]("bflds")
22     def status = column[Int]("status")
23     def * = (id,flds,status)
24   }
25   val tableB = TableQuery[BTable]
26 
27   val insertAAction =
28     tableA ++= Seq (
29         (1,"aaa",0),
30         (2,"bbb",3),
31         (3,"ccc",1),
32         (4,"ddd",0),
33         (16,"kkk",16)
34     )
35    val insertBAction =
36      tableB ++= Seq (
37        (1,"aaa",1),
38        (2,"bbb",2),
39        (3,"ccc",3),
40        (4,"ddd",4),
41        (5,"kkk",5)
42      )
43 
44    val db = Database.forConfig("h2db")
45 
46 
47    def tableExists(tables: Vector[MTable], tblname: String) =
48     tables.exists {t =>t.name.toString.contains(tblname)}
49 
50    def createSchemaIfNotExists(): Future[Unit] = {
51     db.run(MTable.getTables).flatMap {
52       case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") =>
53         println("Creating schemas for TA and TB...")
54         db.run((tableA.schema ++ tableB.schema).create)
55       case tables if !tableExists(tables,".TA") =>
56         println("Creating schema for TA ...")
57         db.run(tableA.schema.create)
58       case tables if !tableExists(tables,".TB") =>
59         println("Creating schema for TB ...")
60         db.run(tableB.schema.create)
61       case _ =>
62         println("Schema for TA, TB already created.")
63         Future.successful()
64     }
65    }
66 
67    def insertInitialData(): Future[Unit] = {
68     val cleanInsert = DBIO.seq(
69       tableA.delete, tableB.delete,
70       insertAAction,
71       insertBAction)
72     db.run(cleanInsert).andThen {
73       case Success(_) => println("Data insert completed.")
74       case Failure(e) => println(s"Data insert failed [${e.getMessage}]")
75     }
76    }
77 
78    Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf)
79 
80    val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()}
81    Await.ready(initResult,Duration.Inf)
82 
83 
84 
85 
86 }

用join query先把这两个表相关的字段值搬到内存转成强类型行FDADataRow: 

 1 val selectAB = for {
 2      a <- tableA
 3      b <- tableB
 4      if (a.id === b.id)
 5    } yield (a.id,b.id,a.status,b.status)
 6 
 7    case class ABRow (id: Int, asts: Int, bsts: Int)
 8    def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4)
 9 
10    import com.bayakala.funda.rowtypes.DataRowType
11   
12    val loader = FDADataRow(slick.driver.H2Driver, toABRow _)
13    loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
14      println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
15    }

初始结果如下:

ID:1 Status A = 0, B = 1
ID:2 Status A = 3, B = 2
ID:3 Status A = 1, B = 3
ID:4 Status A = 0, B = 4

现在我们把每条数据行DataRow转成动作行ActionRow。然后把每条DataRow的asts字段值替换成bsts的字段值:

 1 import com.bayakala.funda.rowtypes.ActionType.FDAAction
 2    def updateAStatus(row: ABRow): FDAAction[Int] = {
 3      tableA.filter{r => r.id === row.id}
 4           .map(_.status)
 5           .update(row.asts)
 6    }
 7 
 8 
 9    loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach {
10      actionRow =>
11        println(s"${actionRow.toString}")
12    }

显示结果如下:

slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@492691d7
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@27216cd
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@558bdf1f
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@8576fa0

现在每条DataRow已经被转化成jdbc action类型了。 下一步我们只需要运行这些ActionRow就可以完成任务了:

1   def execAction(act: FDAAction[Int]) = db.run(act)
2   
3    loader.getTypedRows(selectAB.result)(db)
4        .map(updateAStatus(_))
5        .map(execAction(_))

现在再看看数据库中的TA表状态:

  loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
    println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
  }

结果:
ID:1 Status A = 1, B = 1
ID:2 Status A = 2, B = 2
ID:3 Status A = 3, B = 3
ID:4 Status A = 4, B = 4

我们看到已经正确更新了TA的status字段值。

在这个示范中明显有很多不足之处:如果a.status=b.status应该省略更新步骤。这是因为foreach只能模拟最基本的数据流动。如果我们使用了具备强大功能的Stream工具库如scalaz-stream-fs2,就可以更好控制数据元素的流动。更重要的是scalaz-stream-fs2支持并行运算,那么上面所描述的流程:

Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database

几个 => 环节:Query、Streaming、QueryAction、execAction将可以并行运算,从而实现充分利用多核CPU硬件资源,提高运算效率的目的。

下面是这次讨论涉及的源代码:

 1 package com.bayakala.funda.rowtypes
 2 
 3 import scala.concurrent.duration._
 4 import scala.concurrent.Await
 5 import slick.driver.JdbcProfile
 6 
 7 object DataRowType {
 8   class FDADataRow[SOURCE, TARGET](slickProfile: JdbcProfile,convert: SOURCE  => TARGET){
 9     import slickProfile.api._
10 
11     def getTypedRows(slickAction: DBIO[Iterable[SOURCE]])(slickDB: Database): Iterable[TARGET] =
12       Await.result(slickDB.run(slickAction), Duration.Inf).map(raw => convert(raw))
13   }
14 
15   object FDADataRow {
16     def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDADataRow[SOURCE, TARGET] =
17       new FDADataRow[SOURCE, TARGET](slickProfile, converter)
18   }
19 
20 }
1 package com.bayakala.funda.rowtypes
2 import slick.dbio._
3 object ActionType {
4   type FDAAction[T] = DBIO[T]
5 }
  1 import slick.dbio.DBIO
  2 import slick.driver.H2Driver.api._
  3 
  4 import scala.concurrent.duration._
  5 import scala.concurrent.{Await, Future}
  6 import scala.util.{Failure, Success}
  7 import scala.concurrent.ExecutionContext.Implicits.global
  8 import slick.jdbc.meta.MTable
  9 object ActionRowTest extends App {
 10 
 11   class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA")  {
 12     def id = column[Int]("id",O.PrimaryKey)
 13     def flds = column[String]("aflds")
 14     def status = column[Int]("status")
 15     def * = (id,flds,status)
 16   }
 17   val tableA = TableQuery[ATable]
 18 
 19   class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB")  {
 20     def id = column[Int]("id",O.PrimaryKey)
 21     def flds = column[String]("bflds")
 22     def status = column[Int]("status")
 23     def * = (id,flds,status)
 24   }
 25   val tableB = TableQuery[BTable]
 26 
 27   val insertAAction =
 28     tableA ++= Seq (
 29         (1,"aaa",0),
 30         (2,"bbb",3),
 31         (3,"ccc",1),
 32         (4,"ddd",0),
 33         (16,"kkk",16)
 34     )
 35    val insertBAction =
 36      tableB ++= Seq (
 37        (1,"aaa",1),
 38        (2,"bbb",2),
 39        (3,"ccc",3),
 40        (4,"ddd",4),
 41        (5,"kkk",5)
 42      )
 43 
 44    val db = Database.forConfig("h2db")
 45 
 46 
 47    def tableExists(tables: Vector[MTable], tblname: String) =
 48     tables.exists {t =>t.name.toString.contains(tblname)}
 49 
 50    def createSchemaIfNotExists(): Future[Unit] = {
 51     db.run(MTable.getTables).flatMap {
 52       case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") =>
 53         println("Creating schemas for TA and TB...")
 54         db.run((tableA.schema ++ tableB.schema).create)
 55       case tables if !tableExists(tables,".TA") =>
 56         println("Creating schema for TA ...")
 57         db.run(tableA.schema.create)
 58       case tables if !tableExists(tables,".TB") =>
 59         println("Creating schema for TB ...")
 60         db.run(tableB.schema.create)
 61       case _ =>
 62         println("Schema for TA, TB already created.")
 63         Future.successful()
 64     }
 65    }
 66 
 67    def insertInitialData(): Future[Unit] = {
 68     val cleanInsert = DBIO.seq(
 69       tableA.delete, tableB.delete,
 70       insertAAction,
 71       insertBAction)
 72     db.run(cleanInsert).andThen {
 73       case Success(_) => println("Data insert completed.")
 74       case Failure(e) => println(s"Data insert failed [${e.getMessage}]")
 75     }
 76    }
 77 
 78    Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf)
 79 
 80    val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()}
 81    Await.ready(initResult,Duration.Inf)
 82 
 83 
 84    val selectAB = for {
 85      a <- tableA
 86      b <- tableB
 87      if (a.id === b.id)
 88    } yield (a.id,b.id,a.status,b.status)
 89 
 90    case class ABRow (id: Int, asts: Int, bsts: Int)
 91    def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4)
 92 
 93    import com.bayakala.funda.rowtypes.DataRowType.FDADataRow
 94 
 95    val loader = FDADataRow(slick.driver.H2Driver, toABRow _)
 96    loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
 97      println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
 98    }
 99 
100    import com.bayakala.funda.rowtypes.ActionType.FDAAction
101    def updateAStatus(row: ABRow): FDAAction[Int] = {
102      tableA.filter{r => r.id === row.id}
103           .map(_.status)
104           .update(row.bsts)
105    }
106 
107 
108    loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach {
109      actionRow =>
110        println(s"${actionRow.toString}")
111    }
112 
113    def execAction(act: FDAAction[Int]) = db.run(act)
114 
115    loader.getTypedRows(selectAB.result)(db)
116        .map(updateAStatus(_))
117        .map(execAction(_))
118 
119   loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
120     println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
121   }
122 
123 }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏岑玉海

Hbase 学习(三)Coprocessors

Coprocessors 之前我们的filter都是在客户端定义,然后传到服务端去执行的,这个Coprocessors是在服务端定义,在客户端调用,然后在服...

35411
来自专栏菩提树下的杨过

java JAXB 学习

JAXB(Java Architecture for XML Binding)是JDK的一部分,用于Object <-> XML的转换(有点类似于.NET中的X...

2429
来自专栏函数式编程语言及工具

Akka(25): Stream:对接外部系统-Integration

   在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。所以,akka-stre...

3237
来自专栏芋道源码1024

哪个更快:Java 堆还是本地内存

使用Java的一个好处就是你可以不用亲自来管理内存的分配和释放。当你用new关键字来实例化一个对象时,它所需的内存会自动的在Java堆中分配。堆会被垃圾回收器进...

844
来自专栏编程心路

SSH框架之旅-spring(3)

Spring 框架是一站式的框架,针对 JavaEE 的三层结构,每一层都有解决的技术,在 DAO(数据操作层)使用 jdbcTempalte。并且 Sprin...

562
来自专栏JavaQ

深入理解Spring系列之二:BeanDefinition解析

《深入理解Spring系列之一:开篇》中提到在Spring容器启动的过程中,会将Bean解析成Spring内部的BeanDefinition结构,本篇将深入分析...

3345
来自专栏开发技术

spring事务源码解析

  在spring jdbcTemplate 事务,各种诡异,包你醍醐灌顶!最后遗留了一个问题:spring是怎么样保证事务一致性的? 当然,spring事务内...

541
来自专栏二进制文集

JDK源码分析 Float

对于JDK源码分析的文章,仅仅记录我认为重要的地方。源码的细节实在太多,不可能面面俱到地写清每个逻辑。所以我的JDK源码分析,着重在JDK的体系架构层面,具体源...

823
来自专栏码匠的流水账

聊聊rocketmq的FileAppender

org/apache/rocketmq/logging/inner/LoggingBuilder.java

431
来自专栏Spark学习技巧

重要 : 优化flink的四种方式

flink这个框架在逐步变为流处理的主流。本文,我们将针对flink性能调优讲四种不同的方法。

1112

扫码关注云+社区