FunDA(11)- 数据库操作的并行运算:Parallel data processing

   FunDA最重要的设计目标之一就是能够实现数据库操作的并行运算。我们先重温一下fs2是如何实现并行运算的。我们用interleave、merge、either这几种方式来同时处理两个Stream里的元素。interleave保留了固定的交叉排列顺序,而merge和either则会产生不特定顺序,这个现象可以从下面的例子里看到:

implicit val strategy = Strategy.fromFixedDaemonPool(4)

implicit val scheduler = Scheduler.fromFixedDaemonPool(2)
 
//当前元素跟踪显示
def log[A](pre: String): Pipe[Task,A,A] = _.evalMap { row =>
  Task.delay {println(s"${pre}>${row}");row}
}                                                
def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a => {
  val delay: Task[Int] = Task.delay {scala.util.Random.nextInt(max.toMillis.toInt)}
  delay.flatMap {d => Task.now(a).schedule(d.millis)}
 }
}  


val s1: Stream[Task,Int] = Stream(1,2,3,4,5).through(randomDelay(100.millis))
 
val s2 = Stream(11,22,33,44,55,66).through(randomDelay(30.millis))

val s3: Stream[Task,String] = Stream("a","b","c").through(randomDelay(200.millis))


(s1 interleave s2).through(log("")).run.unsafeRun //> >1
                                                  //| >11
                                                  //| >2
                                                  //| >22
                                                  //| >3
                                                  //| >33
                                                  //| >4
                                                  //| >44
                                                  //| >5
                                                  //| >55

(s1 merge s2).through(log("")).run.unsafeRun      //> >11
                                                  //| >1
                                                  //| >22
                                                  //| >2
                                                  //| >33
                                                  //| >44
                                                  //| >3
                                                  //| >55
                                                  //| >4
                                                  //| >5
                                                  //| >66
(s1 either s3).through(log("")).run.unsafeRun     //> >Left(1)
                                                  //| >Left(2)
                                                  //| >Right(a)
                                                  //| >Right(b)
                                                  //| >Left(3)
                                                  //| >Left(4)
                                                  //| >Left(5)
                                                  //| >Right(c)

从上面的例子我们可以看到merge产生的不规则顺序。fs2的nondeterministic算法可以保证两个队列元素处理顺序的合理分配最大化。如果我们需要对两个以上数据流进行并行处理的话,fs2提供了join(mergeN)函数:

def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}

从这个函数的款式我们看到它的入参数outer是个Stream[F,Stream[F,O]]类型,是个内外两层的流。现实场景如外层是多个数据库连接(connections),内层是多个客户端(clients)。在FunDA的功能描述里外层是多个数据源(sources),内层是多个读取函数(reader),又或者外层是多个数据行(元素),内层是数据处理函数。我们先看看如何实现多个数据源的并行产生:

val ss: Stream[Task,Stream[Task,Int]] = Stream(s1,s2,s1,s2)
                 //> ss  : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(Seg

从ss的类型款式来看,我们可以直接用Stream构建器来生成这个Stream[Task,Stream[Task,A]]类型。在前面我们已经掌握了用Slick来产生Stream[Task,FDAROW]的方法,例如:

  val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()

albumStream1是个Reactive-Stream数据源。这样我们可以在FunDA里增加一个并行Source构建函数:

  def fda_par_load(sources: FDAPipeLine[FDAROW]*)(maxOpen: Int) = {
     concurrent.join(maxOpen)(Stream(sources: _*))
  }

maxOpen代表最多可以同时运行的运算数,最好取小于机器内核数的一个数。用这个函数来并行构建数据源:

package com.bayakala.funda.fdapars.examples
import slick.driver.H2Driver.api._
import com.bayakala.funda.samples._
import com.bayakala.funda.fdarows.FDAROW
import com.bayakala.funda.fdasources.FDADataStream._
import scala.concurrent.duration._
import com.bayakala.funda.fdapipes._
import FDAValves._
import com.bayakala.funda.fdapars.FDAPars._
object Example1 extends App {
  val albums = SlickModels.albums
  val companies = SlickModels.companies

  //数据源query
  val albumsInfo = for {
    (a,c) <- albums join companies on (_.company === _.id)
  } yield (a.title,a.artist,a.year,c.name)

  //query结果强类型(用户提供)
  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
  //强类型转换函数(用户提供)
  def toTypedRow(row: (String, String, Option[Int], String)): Album =
    Album(row._1, row._2, row._3.getOrElse(2000), row._4)

  val db = Database.forConfig("h2db")

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)
  val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
  val albumStream2 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
  val albumStream3 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()

  def printAlbums: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        println("____________________")
        println(s"品名:${album.title}")
        println(s"演唱:${album.artist}")
        println(s"年份:${album.year}")
        println(s"发行:${album.publisher}")

        fda_skip
      //        fda_next(album)
      case r@_ => fda_next(r)
    }
  }

  fda_par_load(albumStream1,albumStream1,albumStream1)(3).appendTask(printAlbums).startRun

startRun后显示结果:

*** (c.z.hikari.HikariDataSource) HikariCP pool h2db is starting.
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
____________________
品名:Keyboard Cat's Greatest Hits
演唱:Keyboard Cat
年份:2016
发行:Sony Music Inc
____________________
品名:Keyboard Cat's Greatest Hits
演唱:Keyboard Cat
年份:2016
发行:Sony Music Inc
____________________
品名:Keyboard Cat's Greatest Hits
演唱:Keyboard Cat
年份:2016
发行:Sony Music Inc
____________________
品名:Spice
演唱:Spice Girls
年份:2016
发行:Columbia Records
____________________
品名:Spice
演唱:Spice Girls
年份:2016
发行:Columbia Records
____________________
品名:Spice
演唱:Spice Girls
年份:2016
发行:Columbia Records
____________________
品名:Whenever You Need Somebody
演唱:Rick Astley
年份:2016
发行:Sony Music Inc
____________________
品名:Whenever You Need Somebody
演唱:Rick Astley
年份:2016
发行:Sony Music Inc
____________________
品名:Whenever You Need Somebody
演唱:Rick Astley
年份:2016
发行:Sony Music Inc
____________________
品名:The Triumph of Steel
演唱:Manowar
年份:2016
发行:The K-Pops Singers
____________________
品名:The Triumph of Steel
演唱:Manowar
年份:2016
发行:The K-Pops Singers
____________________
品名:The Triumph of Steel
演唱:Manowar
年份:2016
发行:The K-Pops Singers
____________________
品名:Believe
演唱:Justin Bieber
年份:2016
发行:Columbia Records
____________________
品名:Believe
演唱:Justin Bieber
年份:2016
发行:Columbia Records
____________________
品名:Believe
演唱:Justin Bieber
年份:2016
发行:Columbia Records

Process finished with exit code 0

FunDA的另一个并行运算需求是并行对一长串数据元素进行一个函数的施用。先看看这个函数的款式:

    //作业类型
    type FDATask[ROW] = ROW => Option[List[ROW]]

也就是我们前面使用过的,由用户提供的那个作业函数类型。但是再看看fda_runPar函数,只能对下面这种类型进行并行运算:

  def fda_runPar(parTask: FDAParTask)(maxOpen: Int) =
    concurrent.join(maxOpen)(parTask).through(fda_afterPar)

  //并行作业类型
  type FDAParTask = Stream[Task,Stream[Task,Option[List[FDAROW]]]]

我们首先必须把Stream[Task,A]转成Stream[Task,Stream[Task,A]]: 

    implicit class toFDAOps(fs2Stream: FDAPipeLine[FDAROW]) {
      def appendTask(t: FDATask[FDAROW]) = fs2Stream.through(fda_execUserTask(t))

      def startRun = fs2Stream.run.unsafeRun

      def startFuture = fs2Stream.run.unsafeRunAsyncFuture

      def toPar(st: FDATask[FDAROW]): Stream[Task, Stream[Task, Option[List[FDAROW]]]] =
        fs2Stream.map { row =>
          Stream.eval(Task {
            st(row)
          })
        }
    }

我们可以用toPar来实现并行运算类型转换。下面是一个调用例子:

  //并行作业函数
  def updateYear: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        val action = albums.filter{r => r.title === album.title}.map(_.year).update(Some(2016))
 //把原数据和新构建的Action一起传下去
        fda_next(List(album,FDAActionRow(action)))
      case others@ _ => fda_next(others)
    }
  }

//并行读取
  val s1 = fda_par_load(albumStream1,albumStream1,albumStream1)(3)
//并行构建Action
  val s2 = fda_runPar(s1.toPar(updateYear))(3)

s1是并行构建的数据源,s2是对数据源产生的元素进行并行的函数updateYear施用。我们同样可以把产生的ActionRow用并行的方法来运算:

  val runner = FDAActionRunner(slick.driver.H2Driver)
  //并行运算函数
  def runActions: FDATask[FDAROW] = row => {
    row match {
      case FDAActionRow(action) =>
        runner.fda_execAction(action)(db)
        fda_skip
      case others@ _ => fda_next(others)
    }
  }

//并行运算Action
  val s3 = fda_runPar(s2.toPar(runActions))(3)
//开始运算
  s3.appendTask(printAlbums).startRun

从上面的例子里应该能够体会到函数式编程的灵活性:在startRun之前,我们可以任意进行函数组合,而且静态类型系统(static type system)会帮我们检查各组件的类型是否匹配。下面是具体运算结果显示:

*** (c.z.hikari.HikariDataSource) HikariCP pool h2db is starting.
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'
____________________
品名:Keyboard Cat's Greatest Hits
演唱:Keyboard Cat
年份:1999
发行:Sony Music Inc
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'
____________________
品名:Keyboard Cat's Greatest Hits
演唱:Keyboard Cat
年份:1999
发行:Sony Music Inc
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'
____________________
品名:Keyboard Cat's Greatest Hits
演唱:Keyboard Cat
年份:1999
发行:Sony Music Inc
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'
____________________
品名:Spice
演唱:Spice Girls
年份:1999
发行:Columbia Records
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'
____________________
品名:Spice
演唱:Spice Girls
年份:1999
发行:Columbia Records
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'
____________________
品名:Spice
演唱:Spice Girls
年份:1999
发行:Columbia Records
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'
____________________
品名:Whenever You Need Somebody
演唱:Rick Astley
年份:1999
发行:Sony Music Inc
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'
____________________
品名:Whenever You Need Somebody
演唱:Rick Astley
年份:1999
发行:Sony Music Inc
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'
____________________
品名:Whenever You Need Somebody
演唱:Rick Astley
年份:1999
发行:Sony Music Inc
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'
____________________
品名:The Triumph of Steel
演唱:Manowar
年份:1999
发行:The K-Pops Singers
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'
____________________
品名:The Triumph of Steel
演唱:Manowar
年份:1999
发行:The K-Pops Singers
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'
____________________
品名:The Triumph of Steel
演唱:Manowar
年份:1999
发行:The K-Pops Singers
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'
____________________
品名:Believe
演唱:Justin Bieber
年份:1999
发行:Columbia Records
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'
____________________
品名:Believe
演唱:Justin Bieber
年份:1999
发行:Columbia Records
*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'
____________________
品名:Believe
演唱:Justin Bieber
年份:1999
发行:Columbia Records

Process finished with exit code 0

注意:上面这个例子是存粹做出来作为函数调用示范的,不做任何逻辑和应用上的考虑。下面是本篇讨论的示范源代码:

package com.bayakala.funda.fdapars.examples
import slick.driver.H2Driver.api._
import com.bayakala.funda.samples._
import com.bayakala.funda.fdarows.FDARowTypes._
import com.bayakala.funda.fdarows.FDAROW
import com.bayakala.funda.fdasources.FDADataStream._

import scala.concurrent.duration._
import com.bayakala.funda.fdapipes._
import FDAValves._
import com.bayakala.funda.fdapars.FDAPars._
import com.bayakala.funda.fdarows.FDARowTypes.FDAActionRow
object Example1 extends App {
  val albums = SlickModels.albums
  val companies = SlickModels.companies

  //数据源query
  val albumsInfo = for {
    (a,c) <- albums join companies on (_.company === _.id)
  } yield (a.title,a.artist,a.year,c.name)

  //query结果强类型(用户提供)
  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
  //转换函数(用户提供)
  def toTypedRow(row: (String, String, Option[Int], String)): Album =
    Album(row._1, row._2, row._3.getOrElse(2000), row._4)

  val db = Database.forConfig("h2db")

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)
  val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
  val albumStream2 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
  val albumStream3 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()

  def printAlbums: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        println("____________________")
        println(s"品名:${album.title}")
        println(s"演唱:${album.artist}")
        println(s"年份:${album.year}")
        println(s"发行:${album.publisher}")

        fda_skip
      //        fda_next(album)
      case r@_ => fda_next(r)
    }
  }

 // fda_par_load(albumStream1,albumStream1,albumStream1)(3).appendTask(printAlbums).startRun

  //并行作业函数
  def updateYear: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        val action = albums.filter{r => r.title === album.title}.map(_.year).update(Some(2016))
 //把原数据和新构建的Action一起传下去
        fda_next(List(album,FDAActionRow(action)))
      case others@ _ => fda_next(others)
    }
  }

  val runner = FDAActionRunner(slick.driver.H2Driver)
  //并行运算函数
  def runActions: FDATask[FDAROW] = row => {
    row match {
      case FDAActionRow(action) =>
        runner.fda_execAction(action)(db)
        fda_skip
      case others@ _ => fda_next(others)
    }
  }
//并行读取
  val s1 = fda_par_load(albumStream1,albumStream1,albumStream1)(3)
//并行构建Action
  val s2 = fda_runPar(s1.toPar(updateYear))(3)

//并行运算Action
  val s3 = fda_runPar(s2.toPar(runActions))(3)
//开始运算
  s3.appendTask(printAlbums).startRun

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数据结构与算法

BZOJ1058: [ZJOI2007]报表统计

Description   小Q的妈妈是一个出纳,经常需要做一些统计报表的工作。今天是妈妈的生日,小Q希望可以帮妈妈分担一些工 作,作为她的生日礼物之一。经过仔...

2566
来自专栏xingoo, 一个梦想做发明家的程序员

剑指OFFER之用两个栈实现队列(九度OJ1512)

题目描述: 用两个栈来实现一个队列,完成队列的Push和Pop操作。 队列中的元素为int类型。 输入: 每个输入文件包含一个测试样例。 对于每个测试样例,第一...

1825
来自专栏移动开发面面观

快速排序法及优化

1254
来自专栏ACM算法日常

Bessie的好牌(队列)- POJ 3629

Bessie is playing a card game with her N-1 (2 ≤ N ≤ 100) cow friends using a dec...

1013
来自专栏函数式编程语言及工具

FunDA(9)- Stream Source:reactive data streams

    上篇我们讨论了静态数据源(Static Source, snapshot)。这种方式只能在预知数据规模有限的情况下使用,对于超大型的数据库表也可以说是不...

18810
来自专栏用户2442861的专栏

2014腾讯软件开发类笔试题(广州站)

试卷类型:软件开发A1 考试时长:120分钟 一 不定项选择题(共25题,每题4分,共100分,少选、错选、多选均不得分) 1 已知一棵二叉树,如果先序...

833
来自专栏JavaQ

HashMap死循环精简说

在JDK1.8之前的版本中,HashMap的底层实现是数组+链表。当调用HashMap的put方法添加元素时,如果新元素的hash值或key在原Map中不存在,...

873
来自专栏codingforever

经典算法巡礼(七) -- 排序之堆排序

很多时候,我们需要处理有序的元素,但不一定要求它们全部有序,或是不一定要一次就将它们排序。比如你可能启动了若干个定时器,那么下一次处理定时器事件只需要考虑距离现...

222
来自专栏后端之路

优先级队列之PriorityQueue

背景 前面几篇的队列基本都是和插入顺序相关的,一般来说可以先进先出,当然也可以通过双向队列实现进后出等等 那么在开发中还存在如下一些需求 不同的任务过来可以进入...

2877
来自专栏技术换美食换不换

块状链表

的复杂度,而如果将整个块状链表维护成有序的,它甚至可以实现平衡树的一些操作[1],毕竟平衡树也可以看作是一种维护序列的方法。 又因为块状链表只在每个分块记录一...

642

扫码关注云+社区