FunDA(16)- 示范:整合并行运算 - total parallelism solution

   在对上两篇讨论中我们介绍了并行运算的两种体现方式:并行构建数据源及并行运算用户自定义函数。我们分别对这两部分进行了示范。本篇我准备示范把这两种情况集成一体的并行运算模式。这次介绍的数据源并行构建方式也与前面描述的有所不同:在前面讨论里我们预知需要从三个独立流来并行构建数据源。但如果我们有一个不知长度的数据流,它的每个元素代表不同的数据流,应该如何处理。我们知道在AQMRPT表里有从1999年到2xxx年的空气质量测量数据,我们可以试着并行把按年份生成的数据流构建成一个数据源。直接使用上期示范中的铺垫代码包括NORMAQM表初始化和从STATES和COUNTIES里用名称搜索对应id的函数:

  val db = Database.forConfig("h2db")

  //drop original table schema
  val futVectorTables = db.run(MTable.getTables)

  val futDropTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.drop)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //create new table to refine AQMRawTable
  val actionCreateTable = Models.NORMAQMQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


  //truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.truncate)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //a conceived task for the purpose of resource consumption
  //getting id with corresponding name from STATES table
  def getStateID(state: String): Int = {
    //create a stream for state id with state name
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name)
    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val stateStream =  fda_staticSource(stateSeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case StateModel(stid,stname) =>   //target row type
          if (stname.contains(state)) {
            id = stid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    stateStream.appendTask(getid).startRun
    id
  }
  //another conceived task for the purpose of resource consumption
  //getting id with corresponding names from COUNTIES table
  def getCountyID(state: String, county: String): Int = {
    //create a stream for county id with state name and county name
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name)
    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _)
    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val countyStream =  fda_staticSource(countySeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case CountyModel(cid,cname) =>   //target row type
          if (cname.contains(state) && cname.contains(county)) {
            id = cid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    countyStream.appendTask(getid).startRun
    id
  }

以及两个用户自定义函数:

  //process input row and produce action row to insert into NORMAQM
  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => {
    row match {
      case aqm: AQMRPTModel =>
        if (aqm.valid) {
          val stateId = getStateID(aqm.state)
          val countyId = getCountyID(aqm.state,aqm.county)
          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total)
          fda_next(FDAActionRow(action))
        }
        else fda_skip
      case _ => fda_skip
    }
  }
  //runner for the action rows
  val runner = FDAActionRunner(slick.jdbc.H2Profile)
  def runInsertAction: FDAUserTask[FDAROW] = row =>
    row match {
      case FDAActionRow(action) =>
        runner.fda_execAction(action)(db)
        fda_skip
      case _ => fda_skip
    }

跟着是本篇新增代码,我们先构建一个所有年份的流:

 //create parallel sources
  //get a stream of years
  val qryYears = AQMRPTQuery.map(_.year).distinct
  case class Years(year: Int) extends FDAROW

  implicit def toYears(y: Int) = Years(y)

  val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _)
  val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq
  val yearStream = fda_staticSource(yearSeq)()

下面是一个按年份从AQMRPT表读取数据的函数:

  //strong row type
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid)

  //shared stream loader when operate in parallel mode
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)

  //loading rows with year yr
  def loadRowsInYear(yr: Int) = {
    //a new query
    val query = AQMRPTQuery.filter(row => row.year === yr)
    //reuse same loader
    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)()
  }

我们可以预见多个loadRowsInYear函数实例会共享统一的FDAStreamLoader AQMRPTLoader。用户自定义数据读取函数类型是FDASourceLoader。下面是FDASourceLoader示范代码:

  //loading rows by year
  def loadRowsByYear: FDASourceLoader = row => {
    row match {
      case Years(y) => loadRowsInYear(y) //produce stream of the year
      case _ => fda_appendRow(FDANullRow)
    }

  }

我们用toParSource构建一个并行数据源:

  //get parallel source constructor
  val parSource = yearStream.toParSource(loadRowsByYear)

用fda_par_source来把并行数据源转换成统一数据流:

  //produce a stream from parallel sources
  val source = fda_par_source(parSource)(3)

source是个FDAPipeLine,可以直接运算:source.startRun,也可以在后面挂上多个环节。下面我们把其它两个用户自定义函数转成并行运算函数后接到source后面:

  //the following is a process of composition of stream combinators
  //get parallel source constructor
  val parSource = yearStream.toParSource(loadRowsByYear)

  //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool")
  //produce a stream from parallel sources
  val source = fda_par_source(parSource)(3)
  //turn getIdsThenInsertAction into parallel task
  val parTasks = source.toPar(getIdsThenInsertAction)
  //runPar to produce a new stream
  val actionStream =fda_runPar(parTasks)(3)
  //turn runInsertAction into parallel task
  val parRun = actionStream.toPar(runInsertAction)
  //runPar and carry out by startRun
  fda_runPar(parRun)(2).startRun

下面是本次示范的完整源代码: 

import slick.jdbc.meta._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import slick.jdbc.H2Profile.api._
import Models._
import fs2.Strategy

object ParallelExecution extends App {

  val db = Database.forConfig("h2db")

  //drop original table schema
  val futVectorTables = db.run(MTable.getTables)

  val futDropTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.drop)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //create new table to refine AQMRawTable
  val actionCreateTable = Models.NORMAQMQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


  //truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.truncate)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //a conceived task for the purpose of resource consumption
  //getting id with corresponding name from STATES table
  def getStateID(state: String): Int = {
    //create a stream for state id with state name
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name)
    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val stateStream =  fda_staticSource(stateSeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case StateModel(stid,stname) =>   //target row type
          if (stname.contains(state)) {
            id = stid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    stateStream.appendTask(getid).startRun
    id
  }
  //another conceived task for the purpose of resource consumption
  //getting id with corresponding names from COUNTIES table
  def getCountyID(state: String, county: String): Int = {
    //create a stream for county id with state name and county name
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name)
    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _)
    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val countyStream =  fda_staticSource(countySeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case CountyModel(cid,cname) =>   //target row type
          if (cname.contains(state) && cname.contains(county)) {
            id = cid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    countyStream.appendTask(getid).startRun
    id
  }

  //process input row and produce action row to insert into NORMAQM
  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => {
    row match {
      case aqm: AQMRPTModel =>
        if (aqm.valid) {
          val stateId = getStateID(aqm.state)
          val countyId = getCountyID(aqm.state,aqm.county)
          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total)
          fda_next(FDAActionRow(action))
        }
        else fda_skip
      case _ => fda_skip
    }
  }
  //runner for the action rows
  val runner = FDAActionRunner(slick.jdbc.H2Profile)
  def runInsertAction: FDAUserTask[FDAROW] = row =>
    row match {
      case FDAActionRow(action) =>
        runner.fda_execAction(action)(db)
        fda_skip
      case _ => fda_skip
    }

  //create parallel sources
  //get a stream of years
  val qryYears = AQMRPTQuery.map(_.year).distinct
  case class Years(year: Int) extends FDAROW

  implicit def toYears(y: Int) = Years(y)

  val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _)
  val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq
  val yearStream = fda_staticSource(yearSeq)()

  //strong row type
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid)

  //shared stream loader when operate in parallel mode
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)

  //loading rows with year yr
  def loadRowsInYear(yr: Int) = {
    //a new query
    val query = AQMRPTQuery.filter(row => row.year === yr)
    //reuse same loader
    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)()
  }

  //loading rows by year
  def loadRowsByYear: FDASourceLoader = row => {
    row match {
      case Years(y) => loadRowsInYear(y) //produce stream of the year
      case _ => fda_appendRow(FDANullRow)
    }

  }


  //start counter
  val cnt_start = System.currentTimeMillis()

  def showRecord: FDAUserTask[FDAROW] = row => {
    row match {
      case Years(y) => println(y); fda_skip
      case aqm: AQMRPTModel =>
        println(s"${aqm.year}  $aqm")
        fda_skip
      case FDAActionRow(action) =>
        println(s"${action}")
        fda_skip
      case _ => fda_skip
    }
  }

  //the following is a process of composition of stream combinators
  //get parallel source constructor
  val parSource = yearStream.toParSource(loadRowsByYear)

  //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool")
  //produce a stream from parallel sources
  val source = fda_par_source(parSource)(3)
  //turn getIdsThenInsertAction into parallel task
  val parTasks = source.toPar(getIdsThenInsertAction)
  //runPar to produce a new stream
  val actionStream =fda_runPar(parTasks)(3)
  //turn runInsertAction into parallel task
  val parRun = actionStream.toPar(runInsertAction)
  //runPar and carry out by startRun
  fda_runPar(parRun)(2).startRun

  println(s"processing 219400 rows parallelly  in ${(System.currentTimeMillis - cnt_start)/1000} seconds")



}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Ceph对象存储方案

Luminous版本PG 分布调优

Luminous版本开始新增的balancer模块在PG分布优化方面效果非常明显,操作也非常简便,强烈推荐各位在集群上线之前进行这一操作,能够极大的提升整个集群...

3655
来自专栏一个爱瞎折腾的程序猿

sqlserver使用存储过程跟踪SQL

USE [master] GO /****** Object: StoredProcedure [dbo].[sp_perfworkload_trace_s...

2830
来自专栏落花落雨不落叶

canvas画简单电路图

83111
来自专栏ASP.NETCore

ASP.NET Core 整合Autofac和Castle实现自动AOP拦截

除了ASP.NETCore自带的IOC容器外,我们还可以使用其他成熟的DI框架,如Autofac,StructureMap等(笔者只用过Unity,Ninjec...

754
来自专栏pangguoming

Spring Boot集成JasperReports生成PDF文档

由于工作需要,要实现后端根据模板动态填充数据生成PDF文档,通过技术选型,使用Ireport5.6来设计模板,结合JasperReports5.6工具库来调用渲...

1.4K7
来自专栏菩提树下的杨过

Flash/Flex学习笔记(23):运动学原理

先写一个公用的小球类Ball: package{ import flash.display.Sprite; //小球 类 public class B...

27210
来自专栏张善友的专栏

Miguel de Icaza 细说 Mix 07大会上的Silverlight和DLR

Mono之父Miguel de Icaza 详细报道微软Mix 07大会上的Silverlight和DLR ,上面还谈到了Mono and Silverligh...

2997
来自专栏芋道源码1024

熔断器 Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker

本文主要基于 Hystrix 1.5.X 版本 1. 概述 2. HystrixCircuitBreaker 3. HystrixCircuitBreaker....

5747
来自专栏张善友的专栏

Mix 10 上的asp.net mvc 2的相关Session

Beyond File | New Company: From Cheesy Sample to Social Platform Scott Hansel...

2767
来自专栏大内老A

The .NET of Tomorrow

Ed Charbeneau(http://developer.telerik.com/featured/the-net-of-tomorrow/) Exciti...

38410

扫码关注云+社区