FunDA(4)- 数据流内容控制:Stream data element control

    上节我们探讨了通过scalaz-stream-fs2来驱动一套数据处理流程,用fs2的Pipe类型来实现对数据流的逐行操作。本篇讨论准备在上节讨论的基础上对数据流的流动和元素操作进行优化完善。如数据流动中增加诸如next、skip、eof功能、内容控制中增加对行元素的append、insert、update、remove等操作方法。但是经过一番对fs2的再次解读,发现这些操作模式并不像我所想象那样的方式,实际上用fs2来实现数据行控制可能会更加简单和直接。这是因为与传统数据库行浏览方式不同的是fs2是一种拖式流(pull-model stream),它的数据行集合是一种泛函不可变集合。每一行一旦读取就等于直接消耗了断(consumed),所以只支持一种向前逐行读取模式。如果形象地描述的话,我们习惯的所谓数据集浏览可能是下面这样的场景:

读取一行数据 >>> (使用或更新行字段值)>>> 向下游发送新的一行数据。只有停止发送动作才代表终止运算。完成对上游的所有行数据读取并不代表终止操作,因为我们还可以不断向下游发送自定义产生的数据行。

我们用fs2模拟一套数据流管道FDAPipeLine,管道中间有不定数量的作业节点FDAWorkNode。作业方式包括从管道上游截取一个数据元素、对其进行处理、然后选择是否向下游的管道接口(FDAPipeJoint)发送。下面是这套模拟的类型:fdapipes/package.scala

 1 package com.bayakala.funda {
 2 
 3   import fs2._
 4 
 5   package object fdapipes {
 6     //数据行类型
 7     trait FDAROW
 8 
 9     //数据处理管道
10     type FDAPipeLine[ROW] = Stream[Task, ROW]
11     //数据作业节点
12     type FDAWorkNode[ROW] = Pipe[Task, ROW, ROW]
13     //数据管道开关阀门,从此处获得管道内数据
14     type FDAValve[ROW] = Handle[Task, ROW]
15     //管道连接器
16     type FDAPipeJoint[ROW] = Pull[Task, ROW, Unit]
17 
18     //作业类型
19     type FDATask[ROW] = ROW => Option[List[ROW]]
20 
21   }
22 
23 }

注意这个FDAROW类型:这是一种泛类型,因为在管道中流动的数据可能有多重类型,如数据行和QueryAction行。

流动控制方法:FDAValves.scala

 1 package com.bayakala.funda.fdapipes
 2 import fs2._
 3 object FDAValves {  //流动控制方法
 4 //跳过本行(不向下游发送)
 5   def fda_skip[ROW] = Some(List[ROW]())
 6 //将本行发送至下游连接管道
 7   def fda_next[ROW](r: ROW) = Some(List[ROW](r))
 8 //终止流动
 9   def fda_break = None
10 
11 }

数据发送方法:FDAPipes.scala

1 package com.bayakala.funda.fdapipes
2 import fs2._
3 object FDAJoints {  //数据发送方法
4 //write rows down the pipeline
5   def fda_pushRow[ROW](row: ROW) = Pull.output1(row)
6   def fda_pushRows[ROW](rows: List[ROW]) = Pull.output(Chunk.seq(rows))
7 }

作业节点工作方法:

 1 package com.bayakala.funda.fdapipes
 2 import FDAJoints._
 3 object FDANodes { //作业节点工作方法
 4  def fda_execUserTask[ROW](task: FDATask[ROW]): FDAWorkNode[ROW] = {
 5    def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
 6      h.receive1Option {
 7        case Some((r, h)) => task(r) match {
 8          case Some(xr) => xr match {
 9            case Nil => go(h)
10            case _ => fda_pushRows(xr) >> go(h)
11          }
12          case None => fda_halt
13        }
14        case None => fda_halt
15      }
16    }
17    in => in.pull(go)
18  }
19 
20 }

下面我们就示范这个工具库的具体使用方法:examples/Example1.scala 设置示范环境:

 1 package com.bayakala.funda.fdapipes.examples
 2 import fs2._
 3 import com.bayakala.funda.fdapipes._
 4 import FDANodes._
 5 import FDAValves._
 6 import Helpers._
 7 object Example1 extends App {
 8 
 9 
10   case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW
11 // test data set
12   val r1 = Employee(1, "John", 23, 100.00)
13   val r2 = Employee(2, "Peter", 25,100.00)
14   val r3 = Employee(3, "Kay", 35,100.00)
15   val r4 = Employee(4, "Cain", 45,100.00)
16   val r5 = Employee(5, "Catty", 35,100.00)
17   val r6 = Employee(6, "Little", 19,80.00)

注意Employee是一种行类型,因为它extends FDAROW。

我们再写一个跟踪显示当前流动数据行的函数:examples/Helpers.scala

1 package com.bayakala.funda.fdapipes.examples
2 import com.bayakala.funda.fdapipes._
3 import fs2.Task
4 object Helpers {
5   def log[ROW](prompt: String): FDAWorkNode[ROW] =
6     _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
7 }

下面我们就用几个有不同要求的例子来示范流动控制和数据处理功能,这些例子就是给最终用户的标准编程示范版本,然后由用户照版编写:

1、根据每条数据状态逐行进行处理:

 1 // 20 - 30岁加10%, 30岁> 加20%,其它加 5%
 2   def raisePay: FDATask[FDAROW] = row => {
 3     row match {
 4       case emp: Employee => {
 5         val cur = emp.age match {
 6           case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10)
 7           case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20)
 8           case _ => emp.copy(salary = emp.salary * 1.05)
 9         }
10         fda_next(cur)
11       }
12       case _ => fda_skip
13     }
14   }

用户提供的功能函数类型必须是FDATask[FDAROW]。类型参数FDAROW代表数据行通用类型。如果用户指定了FDATask[Employee]函数类型,那么必须保证管道中流动的数据行只有Employee一种类型。完成对当前行数据的处理后用fda_next(emp)把它发送到下一节连接管道。我们用下面的组合函数来进行运算:

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
      .through(fda_execUserTask[FDAROW](raisePay))
      .through(log("加薪后>"))
    .run.unsafeRun
-----
运算结果:
加薪前>> Employee(1,John,23,100.0)
加薪后>> Employee(1,John,23,110.00)
加薪前>> Employee(2,Peter,25,100.0)
加薪后>> Employee(2,Peter,25,110.00)
加薪前>> Employee(3,Kay,35,100.0)
加薪后>> Employee(3,Kay,35,120.00)
加薪前>> Employee(4,Cain,45,100.0)
加薪后>> Employee(4,Cain,45,120.00)
加薪前>> Employee(5,Catty,35,100.0)
加薪后>> Employee(5,Catty,35,120.00)
加薪前>> Employee(6,Little,19,80.0)
加薪后>> Employee(6,Little,19,84.000)

2、在一组数据行内根据每条数据状态进行筛选:

  // 筛选40岁以上员工
  def filter40: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        if (emp.age > 40)
          Some(List(emp))
        else fda_skip[Employee]
      }
      case _ => fda_break
    }
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("年龄>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun
---
运算结果:
年龄>> Employee(1,John,23,100.0)
年龄>> Employee(2,Peter,25,100.0)
年龄>> Employee(3,Kay,35,100.0)
年龄>> Employee(4,Cain,45,100.0)
合格>> Employee(4,Cain,45,100.0)
年龄>> Employee(5,Catty,35,100.0)
年龄>> Employee(6,Little,19,80.0)
-

3、根据当前数据行状态终止作业:

 1   // 浏览至第一个30岁以上员工,跳出
 2   def stopOn30: FDATask[Employee] = emp => {
 3         if (emp.age > 30)
 4           fda_break
 5         else
 6           Some(List(emp))
 7   }
 8   println("---------")
 9   Stream(r1,r2,r3,r4,r5,r6)
10     .through(log("当前员工>"))
11     .through(fda_execUserTask[Employee](stopOn30))
12     .through(log("选入名单>"))
13     .run.unsafeRun
14 ---
15 运算结果:
16 当前员工>> Employee(1,John,23,100.0)
17 选入名单>> Employee(1,John,23,100.0)
18 当前员工>> Employee(2,Peter,25,100.0)
19 选入名单>> Employee(2,Peter,25,100.0)
20 当前员工>> Employee(3,Kay,35,100.0)

在这个例子里用户指定了行类型统一为Employee。

我们还可以把多个功能串接起来。像下面这样把1和2两个功能连起来:

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
    .through(fda_execUserTask[FDAROW](raisePay))
    .through(log("加薪后>"))
    .through(log("年龄>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun
---
运算结果:
加薪前>> Employee(1,John,23,100.0)
加薪后>> Employee(1,John,23,110.00)
年龄>> Employee(1,John,23,110.00)
加薪前>> Employee(2,Peter,25,100.0)
加薪后>> Employee(2,Peter,25,110.00)
年龄>> Employee(2,Peter,25,110.00)
加薪前>> Employee(3,Kay,35,100.0)
加薪后>> Employee(3,Kay,35,120.00)
年龄>> Employee(3,Kay,35,120.00)
加薪前>> Employee(4,Cain,45,100.0)
加薪后>> Employee(4,Cain,45,120.00)
年龄>> Employee(4,Cain,45,120.00)
合格>> Employee(4,Cain,45,120.00)
加薪前>> Employee(5,Catty,35,100.0)
加薪后>> Employee(5,Catty,35,120.00)
年龄>> Employee(5,Catty,35,120.00)
加薪前>> Employee(6,Little,19,80.0)
加薪后>> Employee(6,Little,19,84.000)
年龄>> Employee(6,Little,19,84.000)

下面我把完整的示范代码提供给大家:

package com.bayakala.funda.fdapipes.examples
import fs2._
import com.bayakala.funda.fdapipes._
import FDANodes._
import FDAValves._
import Helpers._
object Example1 extends App {


  case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW
// test data set
  val r1 = Employee(1, "John", 23, 100.00)
  val r2 = Employee(2, "Peter", 25,100.00)
  val r3 = Employee(3, "Kay", 35,100.00)
  val r4 = Employee(4, "Cain", 45,100.00)
  val r5 = Employee(5, "Catty", 35,100.00)
  val r6 = Employee(6, "Little", 19,80.00)



// 20 - 30岁加10%, 30岁> 加20%,其它加 5%
  def raisePay: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        val cur = emp.age match {
          case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10)
          case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20)
          case _ => emp.copy(salary = emp.salary * 1.05)
        }
        fda_next(cur)
      }
      case _ => fda_skip
    }
  }

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
      .through(fda_execUserTask[FDAROW](raisePay))
      .through(log("加薪后>"))
    .run.unsafeRun


  // 筛选40岁以上员工
  def filter40: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        if (emp.age > 40)
          Some(List(emp))
        else fda_skip[Employee]
      }
      case _ => fda_break
    }
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("年龄>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun

  // 浏览至第一个30岁以上员工,跳出
  def stopOn30: FDATask[Employee] = emp => {
        if (emp.age > 30)
          fda_break
        else
          Some(List(emp))
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("当前员工>"))
    .through(fda_execUserTask[Employee](stopOn30))
    .through(log("选入名单>"))
    .run.unsafeRun


  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
    .through(fda_execUserTask[FDAROW](raisePay))
    .through(log("加薪后>"))
    .through(log("年龄>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java架构解析

网上的人说 Java 的性能已经达到甚至超过 C++,是真的吗?

好多Java程序员都说由于JIT技术的引入,Java的性能已经和C++一样了,而且Java的开发效率极高,可以省下60%的时间。请问事实真的是这样吗?我平常也都...

831
来自专栏令仔很忙

设计模式总结之一三五

通过学习设计模式来提高写出的代码的可维护性、可复用性、可扩展性和灵活性。也就是说让系统能够达到“高内聚、低耦合”的状态。

712
来自专栏Java帮帮-微信公众号-技术文章全总结

数据库三大范式【面试+工作】

设计良好结构的数据库,可以有效减小数据冗余,减少增删改中出现的问题。深入理解数据库设计的三范式,对于设计“健壮的数据库“十分有必要。数据库三范式是设计数据库 时...

3334
来自专栏顶级程序员

什么才是Java的基础知识?

近日里,很多人邀请我回答各种j2ee开发的初级问题,我无一都强调java初学者要先扎实自己的基础知识,那什么才是java的基础知识?又怎么样才算掌握了java的...

914
来自专栏牛客网

cvte面经

一面:现场面去的很早明显焦虑并问不到面经(50分钟) (1)自我介绍这里介绍完提到自己熟悉的知识,项目 (2)项目介绍项目中遇到的难点如何解决的 (3)集合框架...

3677
来自专栏Java架构

每个 JavaScript 工程师都应当知道的 10 个面试题以人为本1. 能说出来两种对于 JavaScript 工程师很重要的编程范式么?2. 什么是函数式编程?3. 类继承和原型继承有什么区别?

1726
来自专栏牛客网

Java面经:小米暑期实习+秋招真题分享一面 1小时7分钟总结一面二面三面总结

秋招结束,总结了一下从寒假回来开始的实习生招聘和秋招面经,过来回馈一下牛客网。 上学期寒假回来就开始投简历,找人内推的小米,过几天后约时间面试,部门未知。 一面...

5346
来自专栏陈满iOS

iOS面试经验总结(某PA金融科技篇)

也许面试官自己也没自己实现过,毕竟有些东西苹果为什么这样设计,若不是苹果公司的工程师无法知道。

533
来自专栏青玉伏案

设计模式(一):“穿越火线”中的“策略模式”(Strategy Pattern)

在前段时间呢陆陆续续的更新了一系列关于重构的文章。在重构我们既有的代码时,往往会用到设计模式。在之前重构系列的博客中,我们在重构时用到了“工厂模式”、“策略模式...

1896
来自专栏程序人生

[技术] 谈谈编程思想

这段时间又攒了很多答应了,但还未动手的文章。大概一两周前,有个读者留言:「程序君,能发篇文章有关编程思想的吗?我是编程初学者,对编程思想没啥概念,求传授点经验!...

3436

扫码关注云+社区