FunDA(13)- 示范:用户自定义操作函数 - user defined tasks

   FunDA是一种函数式的编程工具,它所产生的程序是由许多功能单一的细小函数组合而成,这些函数就是用户自定义操作函数了。我们在前面曾经提过FunDA的运作原理模拟了数据流管道。流元素在管道流动的过程中被使用或者更新。在管道中流动的元素都必须继承FDAROW类型,可以细分成几个大类:

1、数据行(data-row):因为FunDA的数据行必须是强类型的,所以各种case class类型继承了FDAROW(extends FDAROW)之后最为适合

2、动作行(action-row):case class包嵌slick.DBIOAction的数据类型,如:FDAActionRow(slickQueryAction)

3、异常行(exception-row):case class包嵌Exception类型,是下面这样申明的:

case class FDAErrorRow(e: Exception) extends FDAROW

4、终止行(end-of-stream):数据流终止信号,用于通知下游节点已经没有流动元素了

FunDA自定义操作函数的主要目的是在某个流节点对流元素进行使用和处理。乍看好像直接用函数式编程中的map,flatMap函数都能达到同样的目标,如:

fdaStream.map(row => transformData(row)).map(action => runQueryAction(action))

但经过实验后发现标准流操作函数map,flatMap缺乏功能强大又可以灵活应用的流动操作,而这又是流式数据处理至关重要的一项功能。这就是为什么我们需要一套新的用户自定义函数了。

FunDA规范了一套标准的自定义函数操作流程,由一下几个步骤组成:

1、确定当前流元素类型

2、在该类型的框架内使用和变动流元素字段值

3、流动控制:控制元素向下游的流动

我们将在这篇讨论里示范各种形式和功能的自定义函数。承上篇的示范所产生的数据表AIRQM。这是一个直接导入cvs文件产生的数据表,所有字段都是String类型的。我们的示范就是把这个表里的字段属性转换成匹配的类型后生成一个新表AQMRPT,并把AIRQM里数据的字段值经过转换后并入新表。下面是这个表的结构:

  case class AQMRPTModel(rid: Long
                         , mid: Int
                         , state: String
                         , county: String
                         , year: Int
                         , value: Int
                         , total: Int
                         , valid: Boolean) extends FDAROW

  class AQMRPTTable(tag: Tag) extends Table[AQMRPTModel](tag, "AQMRPT") {
    def rid = column[Long]("ROWID",O.AutoInc,O.PrimaryKey)
    def mid = column[Int]("MEASUREID")
    def state = column[String]("STATENAME",O.Length(32))
    def county = column[String]("COUNTYNAME",O.Length(32))
    def year = column[Int]("REPORTYEAR")
    def value = column[Int]("VALUE")
    def total = column[Int]("TOTAL")
    def valid = column[Boolean]("VALID")

    def * = (rid,mid,state,county,year,value,total,valid) <> (AQMRPTModel.tupled, AQMRPTModel.unapply)
  }
  val AQMRPTQuery = TableQuery[AQMRPTTable]

注意我们用extends FDAROW把AQMRPTModel变成了强类型数据行类型,这是必须的。现在AQMRPTQuery就是这个新的数据表了。下面是这个表的创建和使用铺垫代码:

//drop original table schema
  val futVectorTables = db.run(MTable.getTables)
  
  val futDropTable = futVectorTables.flatMap{ tables => {
      val tableNames = tables.map(t => t.name.name)
      if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
        db.run(AQMRPTQuery.schema.drop)
      else Future()
    }
  }.andThen {
    case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${AQMRPTQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)
 
//create new table to refine AQMRawTable
  val actionCreateTable = Models.AQMRPTQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
//would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


//truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
     val tableNames = tables.map(t => t.name.name)
     if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
       db.run(AQMRPTQuery.schema.truncate)
     else Future()
    }
  }.andThen {
    case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${AQMRPTQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

通过上面这段铺垫代码(boiler-code)使我们保证获得一个空的AQMRPTQuery表。下一步我们把AQMRaw载入内存作为FunDA程序的一个数据源(source)来使用:

//load original table content
//original table strong-typed-row
  case class AQMRaw(mid: String, state: String,
                    county: String, year: String, value: String) extends FDAROW
  implicit def toAQMRaw(row: (String,String,String,String,String)) =
    AQMRaw(row._1,row._2,row._3,row._4,row._5)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRaw _)
//  val queryAQMRaw = for { r <- AQMRawQuery } yield (r.mid,r.state,r.county,r.year,r.value)
  val queryAQMRaw = sql"""
    SELECT MEASUREID,STATENAME,COUNTYNAME,REPORTYEAR,VALUE FROM AIRQM
  """.as[(String,String,String,String,String)]

  val streamAQMRaw: FDAPipeLine[FDAROW] = streamLoader.fda_typedStream(queryAQMRaw)(db)(512,512)()

注意我们使用了slick的plain sql来示范创建这个数据源。AQMRaw类型是这个源的强类型数据行类型,所以又必须extends FDAROW。再就是类型转换函数toAQMRaw是implicit def的,这是一种确保转换函数存在的措施,提供给compiler在编译时使用(试试如果不用implicit def会怎样,仔细阅读compiler的错误提示)。我们把数据导入的流程分成下面几个步骤:

1、载入数据源 >>> 数据行类型转换:从AQMRowModel转成AQMRPTModel >>> 把新类型的数据行传给下游  

2、把上游传来的数据行转换成动作行FDAActionRow(queryAction),然后把这个动作行传给下游

3、对上游传来的动作行进行运算

上面这三个大步骤代表三个功能单一,细化的用户自定义函数。我们先看看第一步:这是一个典型格式的自定义函数:

//filter out rows with inconvertible value strings and out of ranged year
  def filterRows: FDAUserTask[FDAROW] = row => {
    row match {
      case r: AQMRaw => {
        try {
          val yr = r.year.toInt
          val v = r.value.toInt
          val vlu = if ( v > 10  ) 10 else v
          val data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)
          if ((yr > 1960 && yr < 2018))
            fda_next(data)   //this row ok. pass downstream
          else
            fda_skip    //filter out this row
        } catch {
          case e: Exception =>
            fda_next(AQMRPTModel(0,r.mid.toInt,r.state,r.county,2000,0,0,false))
            //pass a invalid row
        }
      }
      case _ => fda_skip   //wrong type, skip
    }
  }

下面我们把用户自定义函数filterRows与自定义函数的标准操作流程对应一下:

1、确定数据行类型:row match { case r: AQMRow => ??? 通过这段明确了数据行是AQMRow类型的

2、下面是数据行的内容的具体应用:

data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)

我们使用了AQMRaw的行字段r.??来构建AQMRPTModel数据行。

3、fda_next(???)把新构建的AQMRPTModel行传到下游

以上几步证明filterRows是按照自定义函数操作标准来运作的。

第二步是把新类型的数据行转换成一条动作行,然后传给下游。由下面这个用户自定义函数来实现:

//transform data to action for later execution
  def toAction: FDAUserTask[FDAROW] = row => {
    row match {
      case r: AQMRPTModel =>
        val queryAction = AQMRPTQuery += r
        fda_next(FDAActionRow(queryAction))
      case _ => fda_skip
    }
  }

toAction同样遵循自定义函数的操作标准。我们需要需要一个运算器来运算动作行:

//get a query runner and an action task
  val actionRunner = FDAActionRunner(slick.jdbc.H2Profile)
  def runActionRow: FDAUserTask[FDAROW] = action => {
    action match {
      case FDAActionRow(q) => actionRunner.fda_execAction(q)(db)
        fda_skip
      case _ => fda_skip
    }
  }

runActionRow在程序的最后一个节点,是个终点函数,不传送任何数据行到下游。把这三个函数组合成一个FunDA程序然后startRun:

/start the program
  val streamAllTasks =  streamAQMRaw.appendTask(filterRows)
    .appendTask(toAction)
    .appendTask(runActionRow)

  val streamToRun = streamAllTasks.onError { case e: Exception => println("Error:"+e.getMessage); fda_appendRow(FDAErrorRow(new Exception(e))) }

  streamToRun.startRun

注意在startRun之前我们可以对FunDA stream进行任何组合。运行startRun后检验数据库表清单里是否增加了AQMRPT表。

除了每行数据的独立应用外,很多时候我们都会对一组串联的数据行进行某种汇总操作(aggregation),比如清点行数、对行内某字段进行汇总计算等。FunDA提供了自定义汇总函数(user-defined-aggregation)来实现这个目的。下面是一个自定义汇总函数例子:

//user defined aggregation task
  def aggregateValue: FDAAggrTask[Accu,FDAROW] = (accu,row) => {
    row match {
      case aqmr: AQMRPTModel =>
        if (accu.state == "" || (aqmr.state == accu.state && aqmr.year == accu.year))
          //same condition: inc count and add sum, pass no row downstream
          (Accu(aqmr.state,aqmr.county,aqmr.year,accu.count+1, accu.sumOfValue+aqmr.value),fda_skip)
        else
          //reset accumulator, create a new aggregated row and pass downstream
          (Accu(aqmr.state,aqmr.county,aqmr.year,1, aqmr.value)
            ,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
            ,accu.count,accu.sumOfValue/accu.count,true)))
      case FDANullRow =>
          //last row encountered. create and pass new aggregated row
        (Accu(accu.state,accu.county,accu.year,1, 0)
          ,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
          ,accu.count,accu.sumOfValue/accu.count,true)))
         //incorrect row type, do nothing
      case _ => (accu,fda_skip)
    }
  }

自定义汇总函数的款式是FDAAggrTask,如下定义:

  type FDAAggrTask[AGGR,ROW] = (AGGR,ROW) => (AGGR,Option[List[ROW]])

AGGR是个用户自定义类型,用来记录汇总当前状态。ROW类型代表数据行类型。自定义汇总函数aggregateValue的功能如下:

1、对AQMRPT表里的数据按statename,year进行汇总

2、产生一条新的汇总数据行并把它插入AQMRPT表里。

汇总函数就是一种状态函数,它的典型函数表现形式就是输入原状态,输出新状态。自定义汇总函数必须用aggregateTask来组合:

  aqmrStream.aggregateTask(Accu("","",0,0,0),aggregateValue)
    .appendTask(toAction)
    .appendTask(runActionRow)
    .startRun

Accu是个自定义case class。在调用startRun之前我们把初始状态Accused("","",0,0,0)传入aggregateTask。aqmrStream是个数据源,它的铺垫代码如下:

//aggregate-task demo: get count and sum of value for each state and year
  val orderedAQMRPT = AQMRPTQuery.sortBy(r => (r.state,r.year))
//TableElementType conversion. must declare implicit
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid)
  val aqmrStreamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)
  val aqmrStream: FDAPipeLine[FDAROW] = aqmrStreamLoader.fda_typedStream(orderedAQMRPT.result)(db)(512,512)()

注意我们这次使用了slick TableQuery原始行类型AQMRPTTable#TableElementType来进行强类型转换。

本次示范的源代码如下:

import slick.jdbc.meta._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import slick.jdbc.H2Profile.api._
import Models._

object UserDefinedTasks extends App {

  val db = Database.forConfig("h2db")

//drop original table schema
  val futVectorTables = db.run(MTable.getTables)

  val futDropTable = futVectorTables.flatMap{ tables => {
      val tableNames = tables.map(t => t.name.name)
      if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
        db.run(AQMRPTQuery.schema.drop)
      else Future()
    }
  }.andThen {
    case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${AQMRPTQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

//create new table to refine AQMRawTable
  val actionCreateTable = Models.AQMRPTQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
//would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


//truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
     val tableNames = tables.map(t => t.name.name)
     if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
       db.run(AQMRPTQuery.schema.truncate)
     else Future()
    }
  }.andThen {
    case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${AQMRPTQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)


//load original table content
//original table strong-typed-row
  case class AQMRaw(mid: String, state: String,
                    county: String, year: String, value: String) extends FDAROW
  implicit def toAQMRaw(row: (String,String,String,String,String)) =
    AQMRaw(row._1,row._2,row._3,row._4,row._5)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRaw _)
//  val queryAQMRaw = for { r <- AQMRawQuery } yield (r.mid,r.state,r.county,r.year,r.value)
  val queryAQMRaw = sql"""
    SELECT MEASUREID,STATENAME,COUNTYNAME,REPORTYEAR,VALUE FROM AIRQM
  """.as[(String,String,String,String,String)]

  val streamAQMRaw: FDAPipeLine[FDAROW] = streamLoader.fda_typedStream(queryAQMRaw)(db)(512,512)()


//filter out rows with inconvertible value strings and out of ranged value and year
  def filterRows: FDAUserTask[FDAROW] = row => {
    row match {
      case r: AQMRaw => {
        try {
          val yr = r.year.toInt
          val v = r.value.toInt
          val vlu = if ( v > 10  ) 10 else v
          val data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)
          if ((yr > 1960 && yr < 2018))
            fda_next(data)   //this row ok. pass downstream
          else
            fda_skip    //filter out this row
        } catch {
          case e: Exception =>
            fda_next(AQMRPTModel(0,r.mid.toInt,r.state,r.county,2000,0,0,false))
            //pass a invalid row
        }
      }
      case _ => fda_skip   //wrong type, skip
    }
  }

//transform data to action for later execution
  def toAction: FDAUserTask[FDAROW] = row => {
    row match {
      case r: AQMRPTModel =>
        val queryAction = AQMRPTQuery += r
        fda_next(FDAActionRow(queryAction))
      case _ => fda_skip
    }
  }

//get a query runner and an action task
  val actionRunner = FDAActionRunner(slick.jdbc.H2Profile)
  def runActionRow: FDAUserTask[FDAROW] = action => {
    action match {
      case FDAActionRow(q) => actionRunner.fda_execAction(q)(db)
        fda_skip
      case _ => fda_skip
    }
  }


//start the program
  val streamAllTasks =  streamAQMRaw.appendTask(filterRows)
    .appendTask(toAction)
    .appendTask(runActionRow)

  val streamToRun = streamAllTasks.onError { case e: Exception => println("Error:"+e.getMessage); fda_appendRow(FDAErrorRow(new Exception(e))) }

  streamToRun.startRun

//aggregate-task demo: get count and sum of value for each state and year
  val orderedAQMRPT = AQMRPTQuery.sortBy(r => (r.state,r.year))
//TableElementType conversion. must declare implicit
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid)
  val aqmrStreamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)
  val aqmrStream: FDAPipeLine[FDAROW] = aqmrStreamLoader.fda_typedStream(orderedAQMRPT.result)(db)(512,512)()
//user defined aggregator type.
  case class Accu(state: String, county: String, year: Int, count: Int, sumOfValue: Int)
//user defined aggregation task
  def aggregateValue: FDAAggrTask[Accu,FDAROW] = (accu,row) => {
    row match {
      case aqmr: AQMRPTModel =>
        if (accu.state == "" || (aqmr.state == accu.state && aqmr.year == accu.year))
          //same condition: inc count and add sum, pass no row downstream
          (Accu(aqmr.state,aqmr.county,aqmr.year,accu.count+1, accu.sumOfValue+aqmr.value),fda_skip)
        else
          //reset accumulator, create a new aggregated row and pass downstream
          (Accu(aqmr.state,aqmr.county,aqmr.year,1, aqmr.value)
            ,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
            ,accu.count,accu.sumOfValue/accu.count,true)))
      case FDANullRow =>
          //last row encountered. create and pass new aggregated row
        (Accu(accu.state,accu.county,accu.year,1, 0)
          ,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
          ,accu.count,accu.sumOfValue/accu.count,true)))
         //incorrect row type, do nothing
      case _ => (accu,fda_skip)
    }
  }


  aqmrStream.aggregateTask(Accu("","",0,0,0),aggregateValue)
    .appendTask(toAction)
    .appendTask(runActionRow)
    .startRun


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏DeveWork

jQuery仿极客公园火箭发射“返回顶部”效果(优化篇)

承接上一篇《jQuery仿极客公园火箭发射“返回顶部”效果(初始篇)》,本文将对前一篇的代码进行优化。还是转载自andyliu: 先给出个演示Demo:演示地址...

1716
来自专栏Java帮帮-微信公众号-技术文章全总结

秒懂,Java 注解 (Annotation)你可以这样学【面试+工作】

这处图片引自老罗的博客。为了避免不必要的麻烦,首先声明我个人比较尊敬老罗的。至于为什么放这张图,自然是为本篇博文服务,接下来我自会说明。好了,可以开始今天的博...

1013
来自专栏偏前端工程师的驿站

MyBatis魔法堂:ResultMap详解

一、前言                                  MyBatis是基于“数据库结构不可控”的思想建立的,也就是我们希望数据库遵循第三范...

1857
来自专栏美团技术团队

Android自定义Lint实践2——改进原生Detector

上篇博客《Android自定义Lint实践》中我们介绍了美团App如何使用自定义Lint进行代码检查。 在使用Lint的过程中,我们陆续又发现原生Lint的一些...

2804
来自专栏向治洪

ConcurrentHashMap和HashTable的区别

hashtable是做了同步的,hashmap未考虑同步。所以hashmap在单线程情况下效率较高。hashtable在的多线程情况下,同步操作能保证程序执行...

1946
来自专栏安恒网络空间安全讲武堂

Writeup丨国赛线上初赛解题最后一波~

1294
来自专栏程序猿DD

Spring框架中的设计模式(三)

在之前的两篇文章中,我们看到了一些在Spring框架中实现的设计模式。这一次我们会发现这个流行框架使用的3种新模式。 本文将从描述两个创意设计模式开始:原型和...

3997
来自专栏大内老A

WCF技术剖析之十四:泛型数据契约和集合数据契约(下篇)

在.NET中,所有的集合都实现了IEnumerable接口,比如Array、Hashtable、ArrayList、Stack、Queue等。有的集合要求元素具...

2426
来自专栏自由而无用的灵魂的碎碎念

通过写Java代码来对MyEclipse进行注册

import java.io.BufferedReader; import java.io.IOException; import java.io...

1094
来自专栏云瓣

从 0 到 1 实现 React 系列 —— 5.PureComponent 实现 && HOC 探幽

本系列文章在实现一个 cpreact 的同时帮助大家理顺 React 框架的核心内容(JSX/虚拟DOM/组件/生命周期/diff算法/setState/Pur...

591

扫码关注云+社区