# FunDA（11）－ 数据库操作的并行运算：Parallel data processing

FunDA最重要的设计目标之一就是能够实现数据库操作的并行运算。我们先重温一下fs2是如何实现并行运算的。我们用interleave、merge、either这几种方式来同时处理两个Stream里的元素。interleave保留了固定的交叉排列顺序，而merge和either则会产生不特定顺序，这个现象可以从下面的例子里看到：

```implicit val strategy = Strategy.fromFixedDaemonPool(4)

implicit val scheduler = Scheduler.fromFixedDaemonPool(2)

//当前元素跟踪显示
def log[A](pre: String): Pipe[Task,A,A] = _.evalMap { row =>
}
def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a => {
}
}

val s2 = Stream(11,22,33,44,55,66).through(randomDelay(30.millis))

(s1 interleave s2).through(log("")).run.unsafeRun //> >1
//| >11
//| >2
//| >22
//| >3
//| >33
//| >4
//| >44
//| >5
//| >55

(s1 merge s2).through(log("")).run.unsafeRun      //> >11
//| >1
//| >22
//| >2
//| >33
//| >44
//| >3
//| >55
//| >4
//| >5
//| >66
(s1 either s3).through(log("")).run.unsafeRun     //> >Left(1)
//| >Left(2)
//| >Right(a)
//| >Right(b)
//| >Left(3)
//| >Left(4)
//| >Left(5)
//| >Right(c)```

`def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}`

```val ss: Stream[Task,Stream[Task,Int]] = Stream(s1,s2,s1,s2)

`  val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()`

albumStream1是个Reactive-Stream数据源。这样我们可以在FunDA里增加一个并行Source构建函数：

```  def fda_par_load(sources: FDAPipeLine[FDAROW]*)(maxOpen: Int) = {
concurrent.join(maxOpen)(Stream(sources: _*))
}```

maxOpen代表最多可以同时运行的运算数，最好取小于机器内核数的一个数。用这个函数来并行构建数据源：

```package com.bayakala.funda.fdapars.examples
import slick.driver.H2Driver.api._
import com.bayakala.funda.samples._
import com.bayakala.funda.fdarows.FDAROW
import scala.concurrent.duration._
import com.bayakala.funda.fdapipes._
import FDAValves._
import com.bayakala.funda.fdapars.FDAPars._
object Example1 extends App {
val albums = SlickModels.albums
val companies = SlickModels.companies

//数据源query
val albumsInfo = for {
(a,c) <- albums join companies on (_.company === _.id)
} yield (a.title,a.artist,a.year,c.name)

//query结果强类型（用户提供）
case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
//强类型转换函数（用户提供）
def toTypedRow(row: (String, String, Option[Int], String)): Album =
Album(row._1, row._2, row._3.getOrElse(2000), row._4)

val db = Database.forConfig("h2db")

val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
val albumStream2 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
val albumStream3 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()

def printAlbums: FDATask[FDAROW] = row => {
row match {
case album: Album =>
println("____________________")
println(s"品名：\${album.title}")
println(s"演唱：\${album.artist}")
println(s"年份：\${album.year}")
println(s"发行：\${album.publisher}")

fda_skip
//        fda_next(album)
case r@_ => fda_next(r)
}
}

startRun后显示结果：

```*** (c.z.hikari.HikariDataSource) HikariCP pool h2db is starting.
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
____________________

____________________

____________________

____________________

____________________

____________________

____________________

____________________

____________________

____________________

____________________

____________________

____________________

____________________

____________________

Process finished with exit code 0```

FunDA的另一个并行运算需求是并行对一长串数据元素进行一个函数的施用。先看看这个函数的款式：

```    //作业类型
type FDATask[ROW] = ROW => Option[List[ROW]]```

```  def fda_runPar(parTask: FDAParTask)(maxOpen: Int) =

//并行作业类型

```    implicit class toFDAOps(fs2Stream: FDAPipeLine[FDAROW]) {

def startRun = fs2Stream.run.unsafeRun

def startFuture = fs2Stream.run.unsafeRunAsyncFuture

fs2Stream.map { row =>
st(row)
})
}
}```

```  //并行作业函数
def updateYear: FDATask[FDAROW] = row => {
row match {
case album: Album =>
val action = albums.filter{r => r.title === album.title}.map(_.year).update(Some(2016))
//把原数据和新构建的Action一起传下去
fda_next(List(album,FDAActionRow(action)))
case others@ _ => fda_next(others)
}
}

//并行读取
//并行构建Action
val s2 = fda_runPar(s1.toPar(updateYear))(3)```

s1是并行构建的数据源，s2是对数据源产生的元素进行并行的函数updateYear施用。我们同样可以把产生的ActionRow用并行的方法来运算：

```  val runner = FDAActionRunner(slick.driver.H2Driver)
//并行运算函数
def runActions: FDATask[FDAROW] = row => {
row match {
case FDAActionRow(action) =>
runner.fda_execAction(action)(db)
fda_skip
case others@ _ => fda_next(others)
}
}

//并行运算Action
val s3 = fda_runPar(s2.toPar(runActions))(3)
//开始运算

```*** (c.z.hikari.HikariDataSource) HikariCP pool h2db is starting.
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'
____________________

*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'
____________________

Process finished with exit code 0```

```package com.bayakala.funda.fdapars.examples
import slick.driver.H2Driver.api._
import com.bayakala.funda.samples._
import com.bayakala.funda.fdarows.FDARowTypes._
import com.bayakala.funda.fdarows.FDAROW

import scala.concurrent.duration._
import com.bayakala.funda.fdapipes._
import FDAValves._
import com.bayakala.funda.fdapars.FDAPars._
import com.bayakala.funda.fdarows.FDARowTypes.FDAActionRow
object Example1 extends App {
val albums = SlickModels.albums
val companies = SlickModels.companies

//数据源query
val albumsInfo = for {
(a,c) <- albums join companies on (_.company === _.id)
} yield (a.title,a.artist,a.year,c.name)

//query结果强类型（用户提供）
case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
//转换函数（用户提供）
def toTypedRow(row: (String, String, Option[Int], String)): Album =
Album(row._1, row._2, row._3.getOrElse(2000), row._4)

val db = Database.forConfig("h2db")

val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
val albumStream2 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
val albumStream3 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()

def printAlbums: FDATask[FDAROW] = row => {
row match {
case album: Album =>
println("____________________")
println(s"品名：\${album.title}")
println(s"演唱：\${album.artist}")
println(s"年份：\${album.year}")
println(s"发行：\${album.publisher}")

fda_skip
//        fda_next(album)
case r@_ => fda_next(r)
}
}

//并行作业函数
def updateYear: FDATask[FDAROW] = row => {
row match {
case album: Album =>
val action = albums.filter{r => r.title === album.title}.map(_.year).update(Some(2016))
//把原数据和新构建的Action一起传下去
fda_next(List(album,FDAActionRow(action)))
case others@ _ => fda_next(others)
}
}

val runner = FDAActionRunner(slick.driver.H2Driver)
//并行运算函数
def runActions: FDATask[FDAROW] = row => {
row match {
case FDAActionRow(action) =>
runner.fda_execAction(action)(db)
fda_skip
case others@ _ => fda_next(others)
}
}
//并行读取
//并行构建Action
val s2 = fda_runPar(s1.toPar(updateYear))(3)

//并行运算Action
val s3 = fda_runPar(s2.toPar(runActions))(3)
//开始运算

}```

0 条评论

## 相关文章

### HDU 4786Fibonacci Tree(最小生成树)

Problem Description 　　Coach Pang is interested in Fibonacci numbers while Un...

3996

2799

27410

### SDP（2）：ScalikeJDBC-Connection Pool Configuration

scalikeJDBC可以通过配置文件来设置连接池及全局系统参数。对配置文件的解析是通过TypesafeConfig工具库实现的。默认加载classpath...

3574

3621

1551

### 聊聊rocketmq的PushConsumerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

1882

1.7K6

3205

3093