FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading

   FunDA的并行数据库读取功能是指在多个线程中同时对多个独立的数据源进行读取。这些独立的数据源可以是在不同服务器上的数据库表,又或者把一个数据库表分成几个独立部分形成的独立数据源。当然,并行读取的最终目的是提高程序的运算效率。在FunDA中具体的实现方式是对多个独立的数据流进行并行读取形成一个统一综合的数据流。我们还是用上次示范所产生的表AQMRPT作为样板数据。在这次示范里我们需要把AQMRPT表中的STATENAME,COUNTYNAME字段抽取出来形成两个独立的表STATE和COUNTY。这两个表结构如下:

  case class StateModel(id: Int, name: String) extends FDAROW
  class StateTable(tag: Tag) extends Table[StateModel](tag,"STATE") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(32))
    def * = (id,name)<>(StateModel.tupled,StateModel.unapply)
  }
  val StateQuery = TableQuery[StateTable]

  case class CountyModel(id: Int, name: String) extends FDAROW
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

首先我们用一些铺垫代码把这两个表结构创建出来:

  //assume two distinct db objects
  val db_a = Database.forConfig("h2db")
  //another db object
  val db_b = Database.forConfig("h2db")


  //create STATE table
  val actionCreateState = Models.StateQuery.schema.create
  val futCreateState = db_a.run(actionCreateState).andThen {
    case Success(_) => println("State Table created successfully!")
    case Failure(e) => println(s"State Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateState,Duration.Inf)

  //create COUNTY table
  val actionCreateCounty = Models.CountyQuery.schema.create
  val futCreateCounty = db_a.run(actionCreateCounty).andThen {
    case Success(_) => println("County Table created successfully!")
    case Failure(e) => println(s"County Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateCounty,Duration.Inf)

下一步我们把STATENAME从AQMRPT表里抽取出来形成一个数据源(data-source):

  //define query for extracting State names from AQMRPT
  val qryStates = AQMRPTQuery.map(_.state).distinct.sorted  //     .distinctOn(r => r)
  case class States(name: String) extends FDAROW
  implicit def toStates(row: String) = States(row)
  val stateLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toStates _)
  val statesStream = stateLoader.fda_typedStream(qryStates.result)(db_a)(64,64)()

由于COUNTYNAME比较多,我们可以把AQMRPT表按STATENAME拆成三部分A-K、K-P、P-Z。然后把这三部分构建成三个独立的数据源:

  //define query for extracting County names from AQMRPT in separate chunks
  //query with state name >A and <K
  val qryCountiesA_K = AQMRPTQuery.filter(r => (r.state.toUpperCase > "A" &&
    r.state.toUpperCase < "K")).map(r => (r.state,r.county))
    .distinctOn(r => (r._1,r._2))
    .sortBy(r => (r._1,r._2))

  //query with state name >K and <P
  val qryCountiesK_P = AQMRPTQuery.filter(r => (r.state.toUpperCase > "K" &&
    r.state.toUpperCase < "P")).map(r => (r.state,r.county))
    .distinctOn(r => (r._1,r._2))
    .sortBy(r => (r._1,r._2))

  //query with state name >P
  val qryCountiesP_Z = AQMRPTQuery.filter(r => r.state.toUpperCase > "P")
    .map(r => (r.state,r.county))
    .distinctOn(r => (r._1,r._2))
    .sortBy(r => (r._1,r._2))

  case class Counties(state: String, name: String) extends FDAROW
  implicit def toCounties(row: (String,String)) = Counties(row._1,row._2)
  val countyLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toCounties _)
  //3 separate streams to extract county names from the same database table AQMRPT
  val countiesA_KStream = countyLoader.fda_typedStream(qryCountiesA_K.result)(db_b)(64,64)()
  val countiesK_PStream = countyLoader.fda_typedStream(qryCountiesK_P.result)(db_b)(64,64)()
  val countiesP_ZStream = countyLoader.fda_typedStream(qryCountiesP_Z.result)(db_b)(64,64)()

然后对这四个数据源进行并行读取:

  //obtain a combined stream with parallel loading with max of 4 open computation
  val combinedStream = fda_par_load(statesStream,countiesA_KStream,countiesK_PStream,countiesP_ZStream)(4)

现在这个组合的数据流里最少有两种不同的数据元素,分别是:case class States和case class Counties。我们可以在combinedStream上连接两个用户自定义函数(user-defined-task)分别截取States和Counties数据行并且把它们转化成各自的插入数据指令行(ActionRow):

  //define separate rows for different actions
  case class StateActionRow(action: FDAAction) extends FDAROW
  case class CountyActionRow(action: FDAAction) extends FDAROW
  val actionRunner = FDAActionRunner(slick.jdbc.H2Profile)

  //user-task to catch rows of States type and transform them into db insert actions
  def processStates: FDAUserTask[FDAROW] = row => {
    row match {
        //catch states row and transform it into insert action
      case States(stateName) =>  //target row type
        println(s"State name: ${stateName}")
        val action = StateQuery += StateModel(0,stateName)
        fda_next(StateActionRow(action))
      case others@ _ => //pass other types to next user-defined-tasks
        fda_next(others)
    }
  }
  //user-task to catch rows of Counties type and transform them into db insert actions
  def processCounties: FDAUserTask[FDAROW] = row => {
    row match {
      //catch counties row and transform it into insert action
      case Counties(stateName,countyName) =>  //target row type
        println(s"County ${countyName} of ${stateName}")
        val action = CountyQuery += CountyModel(0,countyName+ " of "+stateName)
        fda_next(CountyActionRow(action))
      case others@ _ => //pass other types to next user-defined-tasks
        fda_next(others)
    }
  }

经过processStates和processCounties两个自定义函数处理后combinedStream里又多了两种不同的元素:StateActionRow和CountyActionRow。同样,我们可以用两个自定义函数来运算这两种动作行:

  //user-task to catch States insert action rows and run them
  def runStateAction: FDAUserTask[FDAROW] = row  => {
    row match {
      case StateActionRow(action) => //this is a state action row type
        println(s"runstate: ${action}")
        actionRunner.fda_execAction(action)(db_a)  //run this query with db_a context
        fda_skip
      case others@ _ => //otherwise pass alone to next user-defined-tasks
        fda_next(others)
    }
  }

  //user-task to catch Counties insert action rows and run them
  def runCountyAction: FDAUserTask[FDAROW] = row  => {
    row match {
      case CountyActionRow(action) => //this is a county action row type
        actionRunner.fda_execAction(action)(db_b)  //run this query with db_b context
        fda_skip
      case others@ _ => //otherwise pass alone to next user-defined-tasks
        fda_next(others)
    }
  }

好了,现在我们可以把这四个自定义函数在combinedStream上组合起来成为一个完整功能的程序:

  combinedStream.appendTask(processStates)
    .appendTask(processCounties)
    .appendTask(runStateAction)
    .appendTask(runCountyAction)
    .startRun

然后用startRun来正式运算这个程序。

下面就是本次示范的源代码:

import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import slick.jdbc.H2Profile.api._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import Models._
import scala.concurrent.ExecutionContext.Implicits.global

object ParallelLoading extends App {

  //assume two distinct db objects
  val db_a = Database.forConfig("h2db")
  //another db object
  val db_b = Database.forConfig("h2db")


  //create STATE table
  val actionCreateState = Models.StateQuery.schema.create
  val futCreateState = db_a.run(actionCreateState).andThen {
    case Success(_) => println("State Table created successfully!")
    case Failure(e) => println(s"State Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateState,Duration.Inf)

  //create COUNTY table
  val actionCreateCounty = Models.CountyQuery.schema.create
  val futCreateCounty = db_a.run(actionCreateCounty).andThen {
    case Success(_) => println("County Table created successfully!")
    case Failure(e) => println(s"County Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateCounty,Duration.Inf)

  //define query for extracting State names from AQMRPT
  val qryStates = AQMRPTQuery.map(_.state).distinct.sorted  //     .distinctOn(r => r)
  case class States(name: String) extends FDAROW
  implicit def toStates(row: String) = States(row)
  val stateLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toStates _)
  val statesStream = stateLoader.fda_typedStream(qryStates.result)(db_a)(64,64)()


  //define query for extracting County names from AQMRPT in separate chunks
  //query with state name >A and <K
  val qryCountiesA_K = AQMRPTQuery.filter(r => (r.state.toUpperCase > "A" &&
    r.state.toUpperCase < "K")).map(r => (r.state,r.county))
    .distinctOn(r => (r._1,r._2))
    .sortBy(r => (r._1,r._2))

  //query with state name >K and <P
  val qryCountiesK_P = AQMRPTQuery.filter(r => (r.state.toUpperCase > "K" &&
    r.state.toUpperCase < "P")).map(r => (r.state,r.county))
    .distinctOn(r => (r._1,r._2))
    .sortBy(r => (r._1,r._2))

  //query with state name >P
  val qryCountiesP_Z = AQMRPTQuery.filter(r => r.state.toUpperCase > "P")
    .map(r => (r.state,r.county))
    .distinctOn(r => (r._1,r._2))
    .sortBy(r => (r._1,r._2))

  case class Counties(state: String, name: String) extends FDAROW
  implicit def toCounties(row: (String,String)) = Counties(row._1,row._2)
  val countyLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toCounties _)
  //3 separate streams to extract county names from the same database table AQMRPT
  val countiesA_KStream = countyLoader.fda_typedStream(qryCountiesA_K.result)(db_b)(64,64)()
  val countiesK_PStream = countyLoader.fda_typedStream(qryCountiesK_P.result)(db_b)(64,64)()
  val countiesP_ZStream = countyLoader.fda_typedStream(qryCountiesP_Z.result)(db_b)(64,64)()

  //obtain a combined stream with parallel loading with max of 4 open computation
  val combinedStream = fda_par_load(statesStream,countiesA_KStream,countiesK_PStream,countiesP_ZStream)(4)


  //define separate rows for different actions
  case class StateActionRow(action: FDAAction) extends FDAROW
  case class CountyActionRow(action: FDAAction) extends FDAROW
  val actionRunner = FDAActionRunner(slick.jdbc.H2Profile)

  //user-task to catch rows of States type and transform them into db insert actions
  def processStates: FDAUserTask[FDAROW] = row => {
    row match {
        //catch states row and transform it into insert action
      case States(stateName) =>  //target row type
        println(s"State name: ${stateName}")
        val action = StateQuery += StateModel(0,stateName)
        fda_next(StateActionRow(action))
      case others@ _ => //pass other types to next user-defined-tasks
        fda_next(others)
    }
  }
  //user-task to catch rows of Counties type and transform them into db insert actions
  def processCounties: FDAUserTask[FDAROW] = row => {
    row match {
      //catch counties row and transform it into insert action
      case Counties(stateName,countyName) =>  //target row type
        println(s"County ${countyName} of ${stateName}")
        val action = CountyQuery += CountyModel(0,countyName+ " of "+stateName)
        fda_next(CountyActionRow(action))
      case others@ _ => //pass other types to next user-defined-tasks
        fda_next(others)
    }
  }

  //user-task to catch States insert action rows and run them
  def runStateAction: FDAUserTask[FDAROW] = row  => {
    row match {
      case StateActionRow(action) => //this is a state action row type
        println(s"runstate: ${action}")
        actionRunner.fda_execAction(action)(db_a)  //run this query with db_a context
        fda_skip
      case others@ _ => //otherwise pass alone to next user-defined-tasks
        fda_next(others)
    }
  }

  //user-task to catch Counties insert action rows and run them
  def runCountyAction: FDAUserTask[FDAROW] = row  => {
    row match {
      case CountyActionRow(action) => //this is a county action row type
        actionRunner.fda_execAction(action)(db_b)  //run this query with db_b context
        fda_skip
      case others@ _ => //otherwise pass alone to next user-defined-tasks
        fda_next(others)
    }
  }



  def showRows: FDAUserTask[FDAROW] = row => {
    row match {
      case States(nm) =>
        println("")
        println(s"State: $nm")
        println("************")
        fda_skip
      case Counties(s,c) =>
        println("")
        println(s"County: $c")
        println(s"state of $s")
        println("------------")
        fda_skip
      case _ => fda_skip
    }
  }

  combinedStream.appendTask(processStates)
    .appendTask(processCounties)
    .appendTask(runStateAction)
    .appendTask(runCountyAction)
    .startRun

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android 研究

APK安装流程详解15——PMS中的新安装流程下(装载)补充

代码位置在PackageManagerService的installPackageLI方法里面会调用到,代码如下: PackageManagerService...

1361
来自专栏石奈子的Java之路

原 荐 SpringBoot 2.0 系列0

2163
来自专栏后端之路

SpringBoot之内容协商器

背景 使用了restful的小伙伴对于导出这些需求本能就是拒绝的~破坏了restful的url的一致性【严格矫正 不是http json就是restful 很多...

4207
来自专栏林德熙的博客

win10 uwp httpClient 登陆CSDN

我们可以使用下面代码让 HttpClient 使用 Cookie ,有了这个才可以保存登陆,不然登陆成功下次访问网页还是没登陆。

652
来自专栏菩提树下的杨过

ExtJs学习笔记(2)_Basic GridPanel[基本网格]

这一节,将学习如何使用网络上最常见的UI控件_Grid 1.静态示例: 静态示例其实官方下载包里,就有sample,这里只贴出代码,后面的如何跟WCF结合,做出...

1979
来自专栏游戏杂谈

as3+php上传图片的三种方式

1)设置FlashDevelop使用flash player10(debug版本,因为有一个demo使用了本地预览)

844
来自专栏码匠的流水账

聊聊hystrix的fallback

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

1692
来自专栏程序猿

ASS II 码对照表

ASCII(American Standard Code for Information Interchange)定义从 0 到 127 的共128个数字所代表...

37514
来自专栏机器学习入门

LWC 62:742. Closest Leaf in a Binary Tree

LWC 62:742. Closest Leaf in a Binary Tree 传送门:742. Closest Leaf in a Binary Tree...

25510
来自专栏码匠的流水账

聊聊spring cloud的DefaultEurekaServerContext

本文主要研究一下spring cloud的DefaultEurekaServerContext

721

扫码关注云+社区