FunDA(10)- 用户功能函数模式:User Function Model

   前面我们提过:FunDA就像一个管道(PipeLine)。管道内流动着一串数据(Data)或者运算指令(Action)。管道的源头就是能产生纯数据的数据源(Source),跟着在管道的中间会有一些节点(WorkNode),我们可以在这些节点施用(apply)用户提供的功能函数(Task)。用户功能函数可以截取并使用管道中流动的数据或者指令,然后利用一种水龙头开关机制(Valve)来影响流动元素:可以截住、直接传送、传送修改版本、插入新数据。作为FunDA的用户,需要掌握用户功能函数编写模式。我们先从一个简单的用户函数开始介绍:

//定义一个用户作业函数:列印数据,完全不影响数据流
  def printAlbums: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        println("____________________")
        println(s"品名:${album.title}")
        println(s"演唱:${album.artist}")
        println(s"年份:${album.year}")
        println(s"发行:${album.publisher}")
//原封不动直接传下去
        fda_next(album)
      case r@ _ => fda_next(r)
    }
  }

上面这个用户函数的类型是FDATask[FDAROW],这是一个函数类型:

    //作业类型
    type FDATask[ROW] = ROW => Option[List[ROW]]

所以我们用lambda来代表函数内容:row => {函数功能}。lambda为用户函数提供了当前元素。我们用下面方式调用这个用户函数:

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)
  val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
//定义一个用户作业函数:列印数据,完全不影响数据流
  def printAlbums: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        println("____________________")
        println(s"品名:${album.title}")
        println(s"演唱:${album.artist}")
        println(s"年份:${album.year}")
        println(s"发行:${album.publisher}")
//原封不动直接传下去
        fda_next(album)
      case r@ _ => fda_next(r)
    }
  }

 albumStream.appendTask(printAlbums).startRun

我们把用户函数printAlbums传入appendTask来对数据流进行施用。我们可以在appendTask后面再接一个用户函数,这个用户函数截取到的数据流元素是原装的数据源,因为在任何情况下printAlbums都会原封不动地把截获的元素用fda_next()传下去。运行一下下面这个就清楚了:

 albumStream.appendTask(printAlbums).appendTask(printAlbums).startRun

相反情况我们只需要做下面的修改把fda_next替换成fda_skip就可以证实了:

//原封不动直接传下去
        fda_skip
//        fda_next(album)

我们也可以根据当前元素情况生成一条FDAActionROW,它的定义是这样的:

  type FDAAction = DBIO[Int]

  case class FDAActionRow(action: FDAAction) extends FDAROW
  def fda_mkActionRow(action: FDAAction): FDAActionRow = FDAActionRow(action)

  class FDAActionRunner(slickProfile: JdbcProfile) {

    import slickProfile.api._

    def fda_execAction(action: FDAAction)(slickDB: Database): Int =
      Await.result(slickDB.run(action), Duration.Inf)
  }
  object FDAActionRunner {
    def apply(slickProfile: JdbcProfile): FDAActionRunner = new FDAActionRunner(slickProfile)
  }

我们可以把一条FDAActionRow传下去:

  def updateYear: FDATask[FDAROW] = row => {
      row match {
        case album: Album => {
          val updateAction = albums.filter(r => r.title === album.title)
          .map(_.year)
          .update(Some(2017))
          fda_next(FDAActionRow(updateAction))
        }
        case others@ _ => fda_next(others)
      }
  }

我们也可以把原数据同时传下去:

  def updateYear: FDATask[FDAROW] = row => {
      row match {
        case album: Album => {
          val updateAction = albums.filter(r => r.title === album.title)
          .map(_.year)
          .update(Some(2017))
          fda_next(FDAActionRow(updateAction))
          fda_next(album)
        }
        case others@ _ => fda_next(others)
      }
  }

我们需要FDAActionRunner来运算action:

val runner = FDAActionRunner(slick.driver.H2Driver)
  def runActions: FDATask[FDAROW] = row => {
    row match {
      case FDAActionRow(action) =>
        runner.fda_execAction(action)(db)
        fda_skip
      case others@ _ => fda_next(others)
    }
  }

现在试试运转这个管道:

  albumStream.appendTask(updateYear).appendTask(runActions).appendTask(printAlbums).startRun

实际上updateYear和runActions可以一步完成。但细化拆分功能就是函数式编程的一个特点,因为能够更自由的进行组合,这其中就包括了并行运算组合。

下面是这篇讨论的示范源代码:

package com.bayakala.funda.fdasources.examples
import slick.driver.H2Driver.api._
import com.bayakala.funda.fdasources.FDADataStream._
import com.bayakala.funda.samples._
import com.bayakala.funda.fdarows._
import com.bayakala.funda.fdapipes._
import FDAValves._
import com.bayakala.funda.fdarows.FDARowTypes._
import scala.concurrent.duration._

object Example2 extends App {
   val albums = SlickModels.albums
   val companies = SlickModels.companies

//数据源query
   val albumsInfo = for {
     (a,c) <- albums join companies on (_.company === _.id)
   } yield (a.title,a.artist,a.year,c.name)

//query结果强类型(用户提供)
  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
//转换函数(用户提供)
  def toTypedRow(row: (String, String, Option[Int], String)): Album =
    Album(row._1, row._2, row._3.getOrElse(2000), row._4)

  val db = Database.forConfig("h2db")

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)
  val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()
//定义一个用户作业函数:列印数据,完全不影响数据流
  def printAlbums: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        println("____________________")
        println(s"品名:${album.title}")
        println(s"演唱:${album.artist}")
        println(s"年份:${album.year}")
        println(s"发行:${album.publisher}")
//原封不动直接传下去
//        fda_skip
        fda_next(album)
      case r@ _ => fda_next(r)
    }
  }

// albumStream.appendTask(printAlbums).appendTask(printAlbums).startRun



  def updateYear: FDATask[FDAROW] = row => {
      row match {
        case album: Album => {
          val updateAction = albums.filter(r => r.title === album.title)
          .map(_.year)
          .update(Some(2017))
          fda_next(FDAActionRow(updateAction))
          fda_next(album)
        }
        case others@ _ => fda_next(others)
      }
  }
  val runner = FDAActionRunner(slick.driver.H2Driver)
  def runActions: FDATask[FDAROW] = row => {
    row match {
      case FDAActionRow(action) =>
        runner.fda_execAction(action)(db)
        fda_skip
      case others@ _ => fda_next(others)
    }
  }

  albumStream.appendTask(updateYear).appendTask(runActions).appendTask(printAlbums).startRun

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏

Cassandra Java 使用TimeUUIDType

参考地址 http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java

1033
来自专栏派森公园

docker的reap问题

在使用docker容器的时候,应该了解“PID1僵尸进程reap”问题。如果使用的时候不加注意,可能会导致出现一些意想不到的问题。

893
来自专栏杨建荣的学习笔记

关于分区表的move操作(r2笔记90天)

关于分区表的move操作还是很值得深究的一个问题。如果分区表中含有lob字段,难度还会加大。 对于普通的表而言,做move操作室理所当然,oracle提供的方...

3145
来自专栏牛肉圆粉不加葱

[Spark源码剖析]Spark 延迟调度策略

在 Spark 中,若 task 与其输入数据在同一个 jvm 中,我们称 task 的本地性为 PROCESS_LOCAL,这种本地性(locality le...

763
来自专栏battcn

一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列

初探RabbitMQ消息队列中介绍了 RabbitMQ的简单用法,顺带提及了下延迟队列的作用。所谓 延时消息就是指当消息被发送以后,并不想让消费者立即拿到消息,...

1071
来自专栏微信终端开发团队的专栏

MMKV for Android 多进程设计与实现

MMKV 是基于 mmap 内存映射的移动端通用 key-value 组件,底层序列化/反序列化使用 protobuf 实现,性能高,稳定性强。从 2015 ...

751
来自专栏软件测试经验与教训

一个比较实用的测试方法

3686
来自专栏函数式编程语言及工具

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub

  在现实中我们会经常遇到这样的场景:有一个固定的数据源Source,我们希望按照程序运行状态来接驳任意数量的下游接收方subscriber、又或者我需要在程序...

2448
来自专栏H2Cloud

C++ 多进程并发框架FFLIB之Tutorial

      FFLIB框架是为简化分布式/多进程并发而生的。它起始于本人尝试解决工作中经常遇到的问题如消息定义、异步、多线程、单元测试、性能优化等。基本介绍可以...

4396
来自专栏互联网杂技

SpringBoot ( 十一 ) :SpringBoot 中 mongodb 的使用

mongodb是最早热门非关系数据库的之一,使用也比较普遍,一般会用做离线数据分析来使用,放到内网的居多。由于很多公司使用了云服务,服务器默认都开放了外网地址,...

972

扫码关注云+社区