FunDA(16)- 示范:整合并行运算 - total parallelism solution

   在对上两篇讨论中我们介绍了并行运算的两种体现方式:并行构建数据源及并行运算用户自定义函数。我们分别对这两部分进行了示范。本篇我准备示范把这两种情况集成一体的并行运算模式。这次介绍的数据源并行构建方式也与前面描述的有所不同:在前面讨论里我们预知需要从三个独立流来并行构建数据源。但如果我们有一个不知长度的数据流,它的每个元素代表不同的数据流,应该如何处理。我们知道在AQMRPT表里有从1999年到2xxx年的空气质量测量数据,我们可以试着并行把按年份生成的数据流构建成一个数据源。直接使用上期示范中的铺垫代码包括NORMAQM表初始化和从STATES和COUNTIES里用名称搜索对应id的函数:

  val db = Database.forConfig("h2db")

  //drop original table schema
  val futVectorTables = db.run(MTable.getTables)

  val futDropTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.drop)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //create new table to refine AQMRawTable
  val actionCreateTable = Models.NORMAQMQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


  //truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.truncate)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //a conceived task for the purpose of resource consumption
  //getting id with corresponding name from STATES table
  def getStateID(state: String): Int = {
    //create a stream for state id with state name
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name)
    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val stateStream =  fda_staticSource(stateSeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case StateModel(stid,stname) =>   //target row type
          if (stname.contains(state)) {
            id = stid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    stateStream.appendTask(getid).startRun
    id
  }
  //another conceived task for the purpose of resource consumption
  //getting id with corresponding names from COUNTIES table
  def getCountyID(state: String, county: String): Int = {
    //create a stream for county id with state name and county name
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name)
    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _)
    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val countyStream =  fda_staticSource(countySeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case CountyModel(cid,cname) =>   //target row type
          if (cname.contains(state) && cname.contains(county)) {
            id = cid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    countyStream.appendTask(getid).startRun
    id
  }

以及两个用户自定义函数:

  //process input row and produce action row to insert into NORMAQM
  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => {
    row match {
      case aqm: AQMRPTModel =>
        if (aqm.valid) {
          val stateId = getStateID(aqm.state)
          val countyId = getCountyID(aqm.state,aqm.county)
          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total)
          fda_next(FDAActionRow(action))
        }
        else fda_skip
      case _ => fda_skip
    }
  }
  //runner for the action rows
  val runner = FDAActionRunner(slick.jdbc.H2Profile)
  def runInsertAction: FDAUserTask[FDAROW] = row =>
    row match {
      case FDAActionRow(action) =>
        runner.fda_execAction(action)(db)
        fda_skip
      case _ => fda_skip
    }

跟着是本篇新增代码,我们先构建一个所有年份的流:

 //create parallel sources
  //get a stream of years
  val qryYears = AQMRPTQuery.map(_.year).distinct
  case class Years(year: Int) extends FDAROW

  implicit def toYears(y: Int) = Years(y)

  val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _)
  val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq
  val yearStream = fda_staticSource(yearSeq)()

下面是一个按年份从AQMRPT表读取数据的函数:

  //strong row type
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid)

  //shared stream loader when operate in parallel mode
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)

  //loading rows with year yr
  def loadRowsInYear(yr: Int) = {
    //a new query
    val query = AQMRPTQuery.filter(row => row.year === yr)
    //reuse same loader
    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)()
  }

我们可以预见多个loadRowsInYear函数实例会共享统一的FDAStreamLoader AQMRPTLoader。用户自定义数据读取函数类型是FDASourceLoader。下面是FDASourceLoader示范代码:

  //loading rows by year
  def loadRowsByYear: FDASourceLoader = row => {
    row match {
      case Years(y) => loadRowsInYear(y) //produce stream of the year
      case _ => fda_appendRow(FDANullRow)
    }

  }

我们用toParSource构建一个并行数据源:

  //get parallel source constructor
  val parSource = yearStream.toParSource(loadRowsByYear)

用fda_par_source来把并行数据源转换成统一数据流:

  //produce a stream from parallel sources
  val source = fda_par_source(parSource)(3)

source是个FDAPipeLine,可以直接运算:source.startRun,也可以在后面挂上多个环节。下面我们把其它两个用户自定义函数转成并行运算函数后接到source后面:

  //the following is a process of composition of stream combinators
  //get parallel source constructor
  val parSource = yearStream.toParSource(loadRowsByYear)

  //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool")
  //produce a stream from parallel sources
  val source = fda_par_source(parSource)(3)
  //turn getIdsThenInsertAction into parallel task
  val parTasks = source.toPar(getIdsThenInsertAction)
  //runPar to produce a new stream
  val actionStream =fda_runPar(parTasks)(3)
  //turn runInsertAction into parallel task
  val parRun = actionStream.toPar(runInsertAction)
  //runPar and carry out by startRun
  fda_runPar(parRun)(2).startRun

下面是本次示范的完整源代码: 

import slick.jdbc.meta._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import slick.jdbc.H2Profile.api._
import Models._
import fs2.Strategy

object ParallelExecution extends App {

  val db = Database.forConfig("h2db")

  //drop original table schema
  val futVectorTables = db.run(MTable.getTables)

  val futDropTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.drop)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //create new table to refine AQMRawTable
  val actionCreateTable = Models.NORMAQMQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


  //truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.truncate)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //a conceived task for the purpose of resource consumption
  //getting id with corresponding name from STATES table
  def getStateID(state: String): Int = {
    //create a stream for state id with state name
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name)
    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val stateStream =  fda_staticSource(stateSeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case StateModel(stid,stname) =>   //target row type
          if (stname.contains(state)) {
            id = stid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    stateStream.appendTask(getid).startRun
    id
  }
  //another conceived task for the purpose of resource consumption
  //getting id with corresponding names from COUNTIES table
  def getCountyID(state: String, county: String): Int = {
    //create a stream for county id with state name and county name
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name)
    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _)
    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val countyStream =  fda_staticSource(countySeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case CountyModel(cid,cname) =>   //target row type
          if (cname.contains(state) && cname.contains(county)) {
            id = cid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    countyStream.appendTask(getid).startRun
    id
  }

  //process input row and produce action row to insert into NORMAQM
  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => {
    row match {
      case aqm: AQMRPTModel =>
        if (aqm.valid) {
          val stateId = getStateID(aqm.state)
          val countyId = getCountyID(aqm.state,aqm.county)
          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total)
          fda_next(FDAActionRow(action))
        }
        else fda_skip
      case _ => fda_skip
    }
  }
  //runner for the action rows
  val runner = FDAActionRunner(slick.jdbc.H2Profile)
  def runInsertAction: FDAUserTask[FDAROW] = row =>
    row match {
      case FDAActionRow(action) =>
        runner.fda_execAction(action)(db)
        fda_skip
      case _ => fda_skip
    }

  //create parallel sources
  //get a stream of years
  val qryYears = AQMRPTQuery.map(_.year).distinct
  case class Years(year: Int) extends FDAROW

  implicit def toYears(y: Int) = Years(y)

  val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _)
  val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq
  val yearStream = fda_staticSource(yearSeq)()

  //strong row type
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid)

  //shared stream loader when operate in parallel mode
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)

  //loading rows with year yr
  def loadRowsInYear(yr: Int) = {
    //a new query
    val query = AQMRPTQuery.filter(row => row.year === yr)
    //reuse same loader
    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)()
  }

  //loading rows by year
  def loadRowsByYear: FDASourceLoader = row => {
    row match {
      case Years(y) => loadRowsInYear(y) //produce stream of the year
      case _ => fda_appendRow(FDANullRow)
    }

  }


  //start counter
  val cnt_start = System.currentTimeMillis()

  def showRecord: FDAUserTask[FDAROW] = row => {
    row match {
      case Years(y) => println(y); fda_skip
      case aqm: AQMRPTModel =>
        println(s"${aqm.year}  $aqm")
        fda_skip
      case FDAActionRow(action) =>
        println(s"${action}")
        fda_skip
      case _ => fda_skip
    }
  }

  //the following is a process of composition of stream combinators
  //get parallel source constructor
  val parSource = yearStream.toParSource(loadRowsByYear)

  //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool")
  //produce a stream from parallel sources
  val source = fda_par_source(parSource)(3)
  //turn getIdsThenInsertAction into parallel task
  val parTasks = source.toPar(getIdsThenInsertAction)
  //runPar to produce a new stream
  val actionStream =fda_runPar(parTasks)(3)
  //turn runInsertAction into parallel task
  val parRun = actionStream.toPar(runInsertAction)
  //runPar and carry out by startRun
  fda_runPar(parRun)(2).startRun

  println(s"processing 219400 rows parallelly  in ${(System.currentTimeMillis - cnt_start)/1000} seconds")



}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏DOTNET

asp.net web api 版本控制

版本控制 版本控制的方法有很多,这里提供一种将Odata与普通web api版本控制机制统一的方法,但也可以单独控制,整合控制与单独控制主要的不同是:整合控制通...

62560
来自专栏GIS讲堂

一个GISER 6.7的祝福

一年一度的高考今天开始了,回想10年前,那是我第一次高考;10年后,作为一个GISER,在此给大家献上一个GISER的祝福,祝愿各位考生:考神附体,考完报考GI...

15440
来自专栏跟着阿笨一起玩NET

浅谈WebService开发(一)

       简单通俗来说,就是企业之间、网站之间通过Internet来访问并使用在线服务,一些数据,由于安全性问题,不能提供数据库给其他单位使用,这时候可以使...

41230
来自专栏海天一树

小朋友学C语言(29):switch case语句

switch case语句与if elseif语句类似,都是从多个选择条件里选取一个来执行。 (一)先来看一个if elseif程序 #include <std...

32790
来自专栏技术之路

wpf listBox 多列大图片效果

修改ListBox的模版 多列大图片效果,加上删除button 看图 ? 上代码! <Window x:Class="Thunder.SetCenter.Roo...

51770
来自专栏Java成神之路

Java钉钉开发_03_通讯录管理之 人员管理 和 部门管理

31620
来自专栏ASP.NET MVC5 后台权限管理系统

ASP.NET MVC5+EF6+EasyUI 后台管理系统(33)-MVC 表单验证

注:本节阅读需要有MVC 自定义验证的基础,否则比较吃力 一直以来表单的验证都是不可或缺的,微软的东西还是做得比较人性化的,从webform到MVC,都做到了双...

25950
来自专栏日常工作总结

MVC中JSON字符长度超出限制的异常处理

使用 JSON JavaScriptSerializer 进行序列化或反序列化时出错。字符串的长度超过了为 maxJsonLength 属性设置的值。

31930
来自专栏me的随笔

模板方法模式实践

在实际编程中,会经常遇到多个类中的某些方法实现逻辑类似的情况,这时我们可以将这些类中的相同部分抽象到父类中,对于有差异的地方,子类根据自身的实际需求来各自实现。

11020
来自专栏大内老A

我的WCF之旅(7):面向服务架构(SOA)和面向对象编程(OOP)的结合——如何实现Service Contract的继承

当今的IT领域,SOA已经成为了一个非常时髦的词,对SOA风靡的程度已经让很多人对SOA,对面向服务产生误解。其中很大一部分人甚至认为面向服务将是面向对象的终结...

19650

扫码关注云+社区

领取腾讯云代金券