FunDA(13)- 示范:用户自定义操作函数 - user defined tasks

   FunDA是一种函数式的编程工具,它所产生的程序是由许多功能单一的细小函数组合而成,这些函数就是用户自定义操作函数了。我们在前面曾经提过FunDA的运作原理模拟了数据流管道。流元素在管道流动的过程中被使用或者更新。在管道中流动的元素都必须继承FDAROW类型,可以细分成几个大类:

1、数据行(data-row):因为FunDA的数据行必须是强类型的,所以各种case class类型继承了FDAROW(extends FDAROW)之后最为适合

2、动作行(action-row):case class包嵌slick.DBIOAction的数据类型,如:FDAActionRow(slickQueryAction)

3、异常行(exception-row):case class包嵌Exception类型,是下面这样申明的:

case class FDAErrorRow(e: Exception) extends FDAROW

4、终止行(end-of-stream):数据流终止信号,用于通知下游节点已经没有流动元素了

FunDA自定义操作函数的主要目的是在某个流节点对流元素进行使用和处理。乍看好像直接用函数式编程中的map,flatMap函数都能达到同样的目标,如:

fdaStream.map(row => transformData(row)).map(action => runQueryAction(action))

但经过实验后发现标准流操作函数map,flatMap缺乏功能强大又可以灵活应用的流动操作,而这又是流式数据处理至关重要的一项功能。这就是为什么我们需要一套新的用户自定义函数了。

FunDA规范了一套标准的自定义函数操作流程,由一下几个步骤组成:

1、确定当前流元素类型

2、在该类型的框架内使用和变动流元素字段值

3、流动控制:控制元素向下游的流动

我们将在这篇讨论里示范各种形式和功能的自定义函数。承上篇的示范所产生的数据表AIRQM。这是一个直接导入cvs文件产生的数据表,所有字段都是String类型的。我们的示范就是把这个表里的字段属性转换成匹配的类型后生成一个新表AQMRPT,并把AIRQM里数据的字段值经过转换后并入新表。下面是这个表的结构:

  case class AQMRPTModel(rid: Long
                         , mid: Int
                         , state: String
                         , county: String
                         , year: Int
                         , value: Int
                         , total: Int
                         , valid: Boolean) extends FDAROW

  class AQMRPTTable(tag: Tag) extends Table[AQMRPTModel](tag, "AQMRPT") {
    def rid = column[Long]("ROWID",O.AutoInc,O.PrimaryKey)
    def mid = column[Int]("MEASUREID")
    def state = column[String]("STATENAME",O.Length(32))
    def county = column[String]("COUNTYNAME",O.Length(32))
    def year = column[Int]("REPORTYEAR")
    def value = column[Int]("VALUE")
    def total = column[Int]("TOTAL")
    def valid = column[Boolean]("VALID")

    def * = (rid,mid,state,county,year,value,total,valid) <> (AQMRPTModel.tupled, AQMRPTModel.unapply)
  }
  val AQMRPTQuery = TableQuery[AQMRPTTable]

注意我们用extends FDAROW把AQMRPTModel变成了强类型数据行类型,这是必须的。现在AQMRPTQuery就是这个新的数据表了。下面是这个表的创建和使用铺垫代码:

//drop original table schema
  val futVectorTables = db.run(MTable.getTables)
  
  val futDropTable = futVectorTables.flatMap{ tables => {
      val tableNames = tables.map(t => t.name.name)
      if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
        db.run(AQMRPTQuery.schema.drop)
      else Future()
    }
  }.andThen {
    case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${AQMRPTQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)
 
//create new table to refine AQMRawTable
  val actionCreateTable = Models.AQMRPTQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
//would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


//truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
     val tableNames = tables.map(t => t.name.name)
     if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
       db.run(AQMRPTQuery.schema.truncate)
     else Future()
    }
  }.andThen {
    case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${AQMRPTQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

通过上面这段铺垫代码(boiler-code)使我们保证获得一个空的AQMRPTQuery表。下一步我们把AQMRaw载入内存作为FunDA程序的一个数据源(source)来使用:

//load original table content
//original table strong-typed-row
  case class AQMRaw(mid: String, state: String,
                    county: String, year: String, value: String) extends FDAROW
  implicit def toAQMRaw(row: (String,String,String,String,String)) =
    AQMRaw(row._1,row._2,row._3,row._4,row._5)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRaw _)
//  val queryAQMRaw = for { r <- AQMRawQuery } yield (r.mid,r.state,r.county,r.year,r.value)
  val queryAQMRaw = sql"""
    SELECT MEASUREID,STATENAME,COUNTYNAME,REPORTYEAR,VALUE FROM AIRQM
  """.as[(String,String,String,String,String)]

  val streamAQMRaw: FDAPipeLine[FDAROW] = streamLoader.fda_typedStream(queryAQMRaw)(db)(512,512)()

注意我们使用了slick的plain sql来示范创建这个数据源。AQMRaw类型是这个源的强类型数据行类型,所以又必须extends FDAROW。再就是类型转换函数toAQMRaw是implicit def的,这是一种确保转换函数存在的措施,提供给compiler在编译时使用(试试如果不用implicit def会怎样,仔细阅读compiler的错误提示)。我们把数据导入的流程分成下面几个步骤:

1、载入数据源 >>> 数据行类型转换:从AQMRowModel转成AQMRPTModel >>> 把新类型的数据行传给下游  

2、把上游传来的数据行转换成动作行FDAActionRow(queryAction),然后把这个动作行传给下游

3、对上游传来的动作行进行运算

上面这三个大步骤代表三个功能单一,细化的用户自定义函数。我们先看看第一步:这是一个典型格式的自定义函数:

//filter out rows with inconvertible value strings and out of ranged year
  def filterRows: FDAUserTask[FDAROW] = row => {
    row match {
      case r: AQMRaw => {
        try {
          val yr = r.year.toInt
          val v = r.value.toInt
          val vlu = if ( v > 10  ) 10 else v
          val data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)
          if ((yr > 1960 && yr < 2018))
            fda_next(data)   //this row ok. pass downstream
          else
            fda_skip    //filter out this row
        } catch {
          case e: Exception =>
            fda_next(AQMRPTModel(0,r.mid.toInt,r.state,r.county,2000,0,0,false))
            //pass a invalid row
        }
      }
      case _ => fda_skip   //wrong type, skip
    }
  }

下面我们把用户自定义函数filterRows与自定义函数的标准操作流程对应一下:

1、确定数据行类型:row match { case r: AQMRow => ??? 通过这段明确了数据行是AQMRow类型的

2、下面是数据行的内容的具体应用:

data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)

我们使用了AQMRaw的行字段r.??来构建AQMRPTModel数据行。

3、fda_next(???)把新构建的AQMRPTModel行传到下游

以上几步证明filterRows是按照自定义函数操作标准来运作的。

第二步是把新类型的数据行转换成一条动作行,然后传给下游。由下面这个用户自定义函数来实现:

//transform data to action for later execution
  def toAction: FDAUserTask[FDAROW] = row => {
    row match {
      case r: AQMRPTModel =>
        val queryAction = AQMRPTQuery += r
        fda_next(FDAActionRow(queryAction))
      case _ => fda_skip
    }
  }

toAction同样遵循自定义函数的操作标准。我们需要需要一个运算器来运算动作行:

//get a query runner and an action task
  val actionRunner = FDAActionRunner(slick.jdbc.H2Profile)
  def runActionRow: FDAUserTask[FDAROW] = action => {
    action match {
      case FDAActionRow(q) => actionRunner.fda_execAction(q)(db)
        fda_skip
      case _ => fda_skip
    }
  }

runActionRow在程序的最后一个节点,是个终点函数,不传送任何数据行到下游。把这三个函数组合成一个FunDA程序然后startRun:

/start the program
  val streamAllTasks =  streamAQMRaw.appendTask(filterRows)
    .appendTask(toAction)
    .appendTask(runActionRow)

  val streamToRun = streamAllTasks.onError { case e: Exception => println("Error:"+e.getMessage); fda_appendRow(FDAErrorRow(new Exception(e))) }

  streamToRun.startRun

注意在startRun之前我们可以对FunDA stream进行任何组合。运行startRun后检验数据库表清单里是否增加了AQMRPT表。

除了每行数据的独立应用外,很多时候我们都会对一组串联的数据行进行某种汇总操作(aggregation),比如清点行数、对行内某字段进行汇总计算等。FunDA提供了自定义汇总函数(user-defined-aggregation)来实现这个目的。下面是一个自定义汇总函数例子:

//user defined aggregation task
  def aggregateValue: FDAAggrTask[Accu,FDAROW] = (accu,row) => {
    row match {
      case aqmr: AQMRPTModel =>
        if (accu.state == "" || (aqmr.state == accu.state && aqmr.year == accu.year))
          //same condition: inc count and add sum, pass no row downstream
          (Accu(aqmr.state,aqmr.county,aqmr.year,accu.count+1, accu.sumOfValue+aqmr.value),fda_skip)
        else
          //reset accumulator, create a new aggregated row and pass downstream
          (Accu(aqmr.state,aqmr.county,aqmr.year,1, aqmr.value)
            ,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
            ,accu.count,accu.sumOfValue/accu.count,true)))
      case FDANullRow =>
          //last row encountered. create and pass new aggregated row
        (Accu(accu.state,accu.county,accu.year,1, 0)
          ,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
          ,accu.count,accu.sumOfValue/accu.count,true)))
         //incorrect row type, do nothing
      case _ => (accu,fda_skip)
    }
  }

自定义汇总函数的款式是FDAAggrTask,如下定义:

  type FDAAggrTask[AGGR,ROW] = (AGGR,ROW) => (AGGR,Option[List[ROW]])

AGGR是个用户自定义类型,用来记录汇总当前状态。ROW类型代表数据行类型。自定义汇总函数aggregateValue的功能如下:

1、对AQMRPT表里的数据按statename,year进行汇总

2、产生一条新的汇总数据行并把它插入AQMRPT表里。

汇总函数就是一种状态函数,它的典型函数表现形式就是输入原状态,输出新状态。自定义汇总函数必须用aggregateTask来组合:

  aqmrStream.aggregateTask(Accu("","",0,0,0),aggregateValue)
    .appendTask(toAction)
    .appendTask(runActionRow)
    .startRun

Accu是个自定义case class。在调用startRun之前我们把初始状态Accused("","",0,0,0)传入aggregateTask。aqmrStream是个数据源,它的铺垫代码如下:

//aggregate-task demo: get count and sum of value for each state and year
  val orderedAQMRPT = AQMRPTQuery.sortBy(r => (r.state,r.year))
//TableElementType conversion. must declare implicit
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid)
  val aqmrStreamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)
  val aqmrStream: FDAPipeLine[FDAROW] = aqmrStreamLoader.fda_typedStream(orderedAQMRPT.result)(db)(512,512)()

注意我们这次使用了slick TableQuery原始行类型AQMRPTTable#TableElementType来进行强类型转换。

本次示范的源代码如下:

import slick.jdbc.meta._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import slick.jdbc.H2Profile.api._
import Models._

object UserDefinedTasks extends App {

  val db = Database.forConfig("h2db")

//drop original table schema
  val futVectorTables = db.run(MTable.getTables)

  val futDropTable = futVectorTables.flatMap{ tables => {
      val tableNames = tables.map(t => t.name.name)
      if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
        db.run(AQMRPTQuery.schema.drop)
      else Future()
    }
  }.andThen {
    case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} dropped successfully! ")
    case Failure(e) => println(s"Failed to drop Table ${AQMRPTQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)

//create new table to refine AQMRawTable
  val actionCreateTable = Models.AQMRPTQuery.schema.create
  val futCreateTable = db.run(actionCreateTable).andThen {
    case Success(_) => println("Table created successfully!")
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
  }
//would carry on even fail to create table
  Await.ready(futCreateTable,Duration.Inf)


//truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => {
     val tableNames = tables.map(t => t.name.name)
     if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
       db.run(AQMRPTQuery.schema.truncate)
     else Future()
    }
  }.andThen {
    case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} truncated successfully!")
    case Failure(e) => println(s"Failed to truncate Table ${AQMRPTQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
  }
  Await.ready(futDropTable,Duration.Inf)


//load original table content
//original table strong-typed-row
  case class AQMRaw(mid: String, state: String,
                    county: String, year: String, value: String) extends FDAROW
  implicit def toAQMRaw(row: (String,String,String,String,String)) =
    AQMRaw(row._1,row._2,row._3,row._4,row._5)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRaw _)
//  val queryAQMRaw = for { r <- AQMRawQuery } yield (r.mid,r.state,r.county,r.year,r.value)
  val queryAQMRaw = sql"""
    SELECT MEASUREID,STATENAME,COUNTYNAME,REPORTYEAR,VALUE FROM AIRQM
  """.as[(String,String,String,String,String)]

  val streamAQMRaw: FDAPipeLine[FDAROW] = streamLoader.fda_typedStream(queryAQMRaw)(db)(512,512)()


//filter out rows with inconvertible value strings and out of ranged value and year
  def filterRows: FDAUserTask[FDAROW] = row => {
    row match {
      case r: AQMRaw => {
        try {
          val yr = r.year.toInt
          val v = r.value.toInt
          val vlu = if ( v > 10  ) 10 else v
          val data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)
          if ((yr > 1960 && yr < 2018))
            fda_next(data)   //this row ok. pass downstream
          else
            fda_skip    //filter out this row
        } catch {
          case e: Exception =>
            fda_next(AQMRPTModel(0,r.mid.toInt,r.state,r.county,2000,0,0,false))
            //pass a invalid row
        }
      }
      case _ => fda_skip   //wrong type, skip
    }
  }

//transform data to action for later execution
  def toAction: FDAUserTask[FDAROW] = row => {
    row match {
      case r: AQMRPTModel =>
        val queryAction = AQMRPTQuery += r
        fda_next(FDAActionRow(queryAction))
      case _ => fda_skip
    }
  }

//get a query runner and an action task
  val actionRunner = FDAActionRunner(slick.jdbc.H2Profile)
  def runActionRow: FDAUserTask[FDAROW] = action => {
    action match {
      case FDAActionRow(q) => actionRunner.fda_execAction(q)(db)
        fda_skip
      case _ => fda_skip
    }
  }


//start the program
  val streamAllTasks =  streamAQMRaw.appendTask(filterRows)
    .appendTask(toAction)
    .appendTask(runActionRow)

  val streamToRun = streamAllTasks.onError { case e: Exception => println("Error:"+e.getMessage); fda_appendRow(FDAErrorRow(new Exception(e))) }

  streamToRun.startRun

//aggregate-task demo: get count and sum of value for each state and year
  val orderedAQMRPT = AQMRPTQuery.sortBy(r => (r.state,r.year))
//TableElementType conversion. must declare implicit
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
    AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid)
  val aqmrStreamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)
  val aqmrStream: FDAPipeLine[FDAROW] = aqmrStreamLoader.fda_typedStream(orderedAQMRPT.result)(db)(512,512)()
//user defined aggregator type.
  case class Accu(state: String, county: String, year: Int, count: Int, sumOfValue: Int)
//user defined aggregation task
  def aggregateValue: FDAAggrTask[Accu,FDAROW] = (accu,row) => {
    row match {
      case aqmr: AQMRPTModel =>
        if (accu.state == "" || (aqmr.state == accu.state && aqmr.year == accu.year))
          //same condition: inc count and add sum, pass no row downstream
          (Accu(aqmr.state,aqmr.county,aqmr.year,accu.count+1, accu.sumOfValue+aqmr.value),fda_skip)
        else
          //reset accumulator, create a new aggregated row and pass downstream
          (Accu(aqmr.state,aqmr.county,aqmr.year,1, aqmr.value)
            ,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
            ,accu.count,accu.sumOfValue/accu.count,true)))
      case FDANullRow =>
          //last row encountered. create and pass new aggregated row
        (Accu(accu.state,accu.county,accu.year,1, 0)
          ,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
          ,accu.count,accu.sumOfValue/accu.count,true)))
         //incorrect row type, do nothing
      case _ => (accu,fda_skip)
    }
  }


  aqmrStream.aggregateTask(Accu("","",0,0,0),aggregateValue)
    .appendTask(toAction)
    .appendTask(runActionRow)
    .startRun


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张善友的专栏

.NET 和Java 对象 XML序列化 库WOX

当 .NET 应用程序将对象转换为 XML 文档并存储该 XML。实际上,XML 被用作可移植数据存储机制,因为 .NET 对象被转换为 XML,然后又从 XM...

1925
来自专栏CodingToDie

FastSql ORM 实现

FastSql 中 ORM 的实现 Table of Contents 原理 实现 1. 使用注解 2. 反射工具类 3. 简单的 model 4. 注解解析 ...

4116
来自专栏函数式编程语言及工具

泛函编程(24)-泛函数据类型-Monad, monadic programming

    在上一节我们介绍了Monad。我们知道Monad是一个高度概括的抽象模型。好像创造Monad的目的是为了抽取各种数据类型的共性组件函数汇集成一套组件库从...

19110
来自专栏海说

单元测试基本方法

依照类型划分,单元测试方法可以划分为两大类。一类是针对public方法进行测试,另一类是针对private方法进行测试。 public方法测试 public方法...

1860
来自专栏函数式编程语言及工具

泛函编程(25)-泛函数据类型-Monad-Applicative

    上两期我们讨论了Monad。我们说Monad是个最有概括性(抽象性)的泛函数据类型,它可以覆盖绝大多数数据类型。任何数据类型只要能实现flatMap+u...

2109
来自专栏技术墨客

Java数据校验详解

一个健壮的系统都要对外部提交的数据进行完整性、合法性的校验。即使开发一个不面对最终用户的工具包,也需要对传入的数据进行缜密的校验来防止引发底层难以追踪的问题。各...

942
来自专栏恰同学骚年

剑指Offer面试题:35.将字符串转换为数字

  (3)考虑输入的字符串是否会发生上溢或下溢(正整数的最大值是0x7FFFFFFF,最小的负整数是0x80000000)

835
来自专栏函数式编程语言及工具

FunDA(15)- 示范:任务并行运算 - user task parallel execution

    FunDA的并行运算施用就是对用户自定义函数的并行运算。原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例。这些函数运行实例...

1739
来自专栏程序员宝库

徒手撸框架---实现 Aop

原文:犀利豆的博客(https://www.xilidou.com/2018/01/13/spring-aop/) 上一讲我们讲解了 Spring 的 IoC ...

28312
来自专栏java学习

面试题30(关于if的用法哪个是正确的?)

下列关于if的用法哪个是正确的? public class IfTest{ public static void main(string[]args){...

2725

扫码关注云+社区