FunDA(15)- 示范:任务并行运算 - user task parallel execution

    FunDA的并行运算施用就是对用户自定义函数的并行运算。原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例。这些函数运行实例同时在各自不同的线程里同步运算直至耗尽所有输入。并行运算的具体函数实例数是用fs2-nondeterminism的算法根据CPU内核数、线程池配置和用户指定的最大运算实例数来决定的。我们在这次示范里可以对比一下同样工作内容的并行运算和串形运算效率。在前面示范里我们获取了一个AQMRPT表。但这个表不够合理化(normalized):state和county还没有实现编码与STATES和COUNTIES表的连接。在这次示范里我们就创建一个新表NORMAQM,把AQMRPT表内数据都搬进来。并在这个过程中把STATENAME和COUNTYNAME字段转换成STATES和COUNTIES表的id字段。下面就是NORMAQM表结构:

  case class NORMAQMModel(rid: Long
                         , mid: Int
                         , state: Int
                         , county: Int
                         , year: Int
                         , value: Int
                         , average: Int
                         ) extends FDAROW

  class NORMAQMTable(tag: Tag) extends Table[NORMAQMModel](tag, "NORMAQM") {
    def rid = column[Long]("ROWID",O.AutoInc,O.PrimaryKey)
    def mid = column[Int]("MEASUREID")
    def state = column[Int]("STATID")
    def county = column[Int]("COUNTYID")
    def year = column[Int]("REPORTYEAR")
    def value = column[Int]("VALUE")
    def average = column[Int]("AVG")

    def * = (rid,mid,state,county,year,value,average) <> (NORMAQMModel.tupled, NORMAQMModel.unapply)
  }


  val NORMAQMQuery = TableQuery[NORMAQMTable]

下面是这个表的初始化铺垫代码: 

  val db = Database.forConfig("h2db")

  //drop original table schema
  val futVectorTables = db.run(MTable.getTables)

  val futDropTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.drop)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //create new table to refine AQMRawTable
  val actionCreateTable = Models.NORMAQMQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


  //truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.truncate)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

我们需要设计一个函数从STATES表里用AQMRPT表的STATENAME查询ID。我故意把这个函数设计成一个完整的FunDA程序。这样可以模拟一个比较消耗io和计算资源的独立过程(不要理会任何合理性,目标是增加io和运算消耗): 

  //a conceived task for the purpose of resource consumption
  //getting id with corresponding name from STATES table
  def getStateID(state: String): Int = {
    //create a stream for state id with state name
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name)
    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val stateStream =  fda_staticSource(stateSeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case StateModel(stid,stname) =>   //target row type
          if (stname.contains(state)) {
            id = stid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    stateStream.appendTask(getid).startRun
    id
  }

可以看到getStateID函数每次运算都重复构建stateStream。这样可以达到增加io操作的目的。

同样,我们也需要设计另一个函数来从COUNTIES表里获取id字段:

  //another conceived task for the purpose of resource consumption
  //getting id with corresponding names from COUNTIES table
  def getCountyID(state: String, county: String): Int = {
    //create a stream for county id with state name and county name
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name)
    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _)
    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val countyStream =  fda_staticSource(countySeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case CountyModel(cid,cname) =>   //target row type
          if (cname.contains(state) && cname.contains(county)) {
            id = cid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    countyStream.appendTask(getid).startRun
    id
  }

我们可以如下这样获取这个程序的数据源:

  //original table listing
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid)
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)
  val AQMRPTStream = AQMRPTLoader.fda_typedStream(AQMRPTQuery.result)(db)(256,256)()

按照正常的FunDA流程我们设计了两个用户自定义函数:一个根据数据行内的state和county字段调用函数getStateID和getCountyID获取相应id后构建一条新的NORMAQM表插入指令行,然后传给下个自定义函数。下个自定义函数就直接运算收到的动作行:

  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => {
    row match {
      case aqm: AQMRPTModel =>
        if (aqm.valid) {
          val stateId = getStateID(aqm.state)
          val countyId = getCountyID(aqm.state,aqm.county)
          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total)
          fda_next(FDAActionRow(action))
        }
        else fda_skip
      case _ => fda_skip
    }
  }
  val runner = FDAActionRunner(slick.jdbc.H2Profile)
  def runInsertAction: FDAUserTask[FDAROW] = row =>
   row match {
    case FDAActionRow(action) =>
      runner.fda_execAction(action)(db)
      fda_skip
    case _ => fda_skip
  }

像前面几篇示范那样我们把这两个用户自定义函数与数据源组合起来成为完整的FunDA程序后startRun就可以得到实际效果了:

    AQMRPTStream.take(10000)
      .appendTask(getIdsThenInsertAction)
      .appendTask(runInsertAction)
      .startRun

这个程序运算了579秒,不过这是个单一线程运算。我们想知道并行运算结果。那么我们首先要把这个getIdsThenInsertAction转成一个并行运算函数FDAParTask:

AQMRPTStream.toPar(getIdsThenInsertAction)

FunDA提供了并行运算器fda_runPar:

      implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool")
      fda_runPar(AQMRPTStream.take(100000).toPar(getIdsThenInsertAction))(8)  //max 8 open computations
        .appendTask(runInsertAction)
        .startRun

我们可以自定义线程池。fda_runPar返回标准的FunDA FDAPipeLine,所以我们可以在后面挂上runInsertAction函数。下面是不同行数的运算时间对比结果:

    //processing 10000 rows in a single thread in 570 seconds
          // processing 10000 rows parallelly  in 316 seconds

    //processing 20000 rows in a single thread in 1090 seconds
            //processing 20000 rows parallelly  in 614 seconds

    //processing 100000 rows in a single thread in 2+ hrs
      //processing 100000 rows parallelly  in 3885 seconds

可以得出,并行运算对越大数据集有更大的效率提高。下面就是这次示范的源代码:

import slick.jdbc.meta._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import slick.jdbc.H2Profile.api._
import Models._
import fs2.Strategy

object ParallelTasks extends App {

  val db = Database.forConfig("h2db")

  //drop original table schema
  val futVectorTables = db.run(MTable.getTables)

  val futDropTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.drop)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //create new table to refine AQMRawTable
  val actionCreateTable = Models.NORMAQMQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
  //would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


  //truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
    val tableNames = tables.map(t => t.name.name)
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))
      db.run(NORMAQMQuery.schema.truncate)
    else Future()
  }
  }.andThen {
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

  //a conceived task for the purpose of resource consumption
  //getting id with corresponding name from STATES table
  def getStateID(state: String): Int = {
    //create a stream for state id with state name
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name)
    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val stateStream =  fda_staticSource(stateSeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case StateModel(stid,stname) =>   //target row type
          if (stname.contains(state)) {
            id = stid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    stateStream.appendTask(getid).startRun
    id
  }
  //another conceived task for the purpose of resource consumption
  //getting id with corresponding names from COUNTIES table
  def getCountyID(state: String, county: String): Int = {
    //create a stream for county id with state name and county name
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name)
    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _)
    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq
    //constructed a Stream[Task,String]
    val countyStream =  fda_staticSource(countySeq)()
    var id  = -1
    def getid: FDAUserTask[FDAROW] = row => {
      row match {
        case CountyModel(cid,cname) =>   //target row type
          if (cname.contains(state) && cname.contains(county)) {
            id = cid
            fda_break      //exit
          }
          else fda_skip   //take next row
        case _ => fda_skip
      }
    }
    countyStream.appendTask(getid).startRun
    id
  }

  //original table listing
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid)
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)
  val AQMRPTStream = AQMRPTLoader.fda_typedStream(AQMRPTQuery.result)(db)(256,256)()

  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => {
    row match {
      case aqm: AQMRPTModel =>
        if (aqm.valid) {
          val stateId = getStateID(aqm.state)
          val countyId = getCountyID(aqm.state,aqm.county)
          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total)
          fda_next(FDAActionRow(action))
        }
        else fda_skip
      case _ => fda_skip
    }
  }
  val runner = FDAActionRunner(slick.jdbc.H2Profile)
  def runInsertAction: FDAUserTask[FDAROW] = row =>
   row match {
    case FDAActionRow(action) =>
      runner.fda_execAction(action)(db)
      fda_skip
    case _ => fda_skip
  }

  val cnt_start = System.currentTimeMillis()
/*
    AQMRPTStream.take(100000)
      .appendTask(getIdsThenInsertAction)
      .appendTask(runInsertAction)
      .startRun
    //println(s"processing 10000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds")
    //processing 10000 rows in a single thread in 570 seconds
    //println(s"processing 20000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds")
    //processing 20000 rows in a single thread in 1090 seconds
    //println(s"processing 100000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds")
    //processing 100000 rows in a single thread in 2+ hrs


      implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool")
      fda_runPar(AQMRPTStream.take(100000).toPar(getIdsThenInsertAction))(8)
        .appendTask(runInsertAction)
        .startRun

      //println(s"processing 10000 rows parallelly  in ${(System.currentTimeMillis - cnt_start)/1000} seconds")
      // processing 10000 rows parallelly  in 316 seconds
      //println(s"processing 20000 rows parallelly  in ${(System.currentTimeMillis - cnt_start)/1000} seconds")
      //processing 20000 rows parallelly  in 614 seconds
      println(s"processing 100000 rows parallelly  in ${(System.currentTimeMillis - cnt_start)/1000} seconds")
      //processing 100000 rows parallelly  in 3885 seconds

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏xingoo, 一个梦想做发明家的程序员

基于重叠IO模型的 回显TCP服务器设计

---------------------1 套接字对象---------------------- 为每个套接字创建一个SOCKET_OBJ对象,记录与之相关...

19010
来自专栏Hongten

java的poi技术写Excel的Sheet

那么在Excel里面什么叫做Sheet呢?如下图红色框里面的内容就是Excel的Sheet了。

633
来自专栏函数式编程语言及工具

SDP(6):分布式数据库运算环境- Cassandra-Engine

    现代信息系统应该是避不开大数据处理的。作为一个通用的系统集成工具也必须具备大数据存储和读取能力。cassandra是一种分布式的数据库,具备了分布式数据...

3014
来自专栏CodingToDie

JPA @Query实现,动态代理,注解, 正则,Spring扩展的使用

@Query 的实现 动态代理 注解 表设计 model repository 大体流程 代理使用 将生成代理放入 Spring IOC 容器中 invoke方...

4066
来自专栏函数式编程语言及工具

SDP(9):MongoDB-Scala - data access and modeling

    MongoDB是一种文件型数据库,对数据格式没有硬性要求,所以可以实现灵活多变的数据存储和读取。MongoDB又是一种分布式数据库,与传统关系数据库不同...

3504
来自专栏北京马哥教育

SQL函数汇总【精选篇】

1.绝对值 SQL:select abs(-1) value O:select abs(-1) value from dual 2.取整(大) ...

2739
来自专栏技术栈大杂烩

Python: 浅析 return 和 finally 共同挖的坑

  相信每一个用过Python函数的童鞋, 肯定会用过return语句, return顾名思义, 就是用来返回值给调用者, 例如:

704
来自专栏数据之美

Pig、Hive 自定义输入输出分隔符以及Map、Array嵌套分隔符冲突问题

PIG中输入输出分隔符默认是制表符\t,而到了hive中,默认变成了八进制的\001, 也就是ASCII: ctrl - A Oct   Dec   Hex  ...

2105
来自专栏函数式编程语言及工具

SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

  ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为...

3465
来自专栏菩提树下的杨过

thrift:swift项目笔记

先声明:此swift不是Apple公司的那个swift开发语言,而是facebook的另一个开源项目。 facebook的thrift IDL文件,如果默认用t...

2608

扫码关注云+社区