FunDA(3)- 流动数据行操作:FDAPipeLine operations using scalaz-stream-fs2

在上节讨论里我们介绍了数据行流式操作的设想,主要目的是把后台数据库的数据载入前端内存再拆分为强类型的数据行,这样我们可以对每行数据进行使用和处理。形象点描述就是对内存里的一个数据流(data-stream)进行逐行操作。我们在上节用foreach模拟了一个流控来示范数据行的操作处理。在这节我们讨论一下用scalaz-stream-fs2作为数据流管理工具来实现FunDA的数据行流动管理功能。fs2的Stream是一种自然的拖动型(pull-model)数据流。而fs2的Pipe类型则像是管道的阀门(valve),我们可以在Pipe里截获流动中的数据行。我们看看下面的fs2 Stream例子:

1   def log[ROW](prompt: String): Pipe[Task,ROW,ROW] =
2     _.evalMap {row => Task.delay {println(s"$prompt> $row"); row}}
3                                                   //> log: [ROW](prompt: String)fs2.Pipe[fs2.Task,ROW,ROW]
4   Stream.range(1,5).through(log("")).run.unsafeRun//> > 1
5                                                   //| > 2
6                                                   //| > 3
7                                                   //| > 4

函数log是个Pipe类型。我们看到Pipe类型可以截获Stream中的流动元素,在函数log里我们通过evalMap来立即运算了println把当前的元素内容显示出来。所以我们并没有用runLog来收集Stream的元素(runLog也只能在完成所有元素的收集后才能显示结果)。

按照FunDA设计要求:从后台数据库中读取数据、载入内存然后逐行进行处理,那么我们可以用这个Pipe类型来实现数据的逐行处理,包括控制数据流动以及任意插入一些自定义数据元素。下面我们就试试通过定义Pipe类型的不同功能来实现行数据处理:

 1   def stopOn3[ROW]: Pipe[Task,ROW,ROW] = in => {
 2     def go: Handle[Task,ROW] => Pull[Task,ROW,Unit] = h => {
 3       h.receive1Option {
 4         case Some((r,h)) => if ( 3 == r) Pull.done
 5                             else Pull.output1(r) >> go(h)
 6         case None => Pull.done
 7       }
 8     }
 9     in.pull(go)
10   }                                               //> stopOn3: [ROW]=> fs2.Pipe[fs2.Task,ROW,ROW]
11   Stream(4,2,9,3,8,1)
12    .through(log("before"))
13    .through(stopOn3)
14    .through(log("after"))
15    .run
16    .unsafeRun                                     //> before> 4
17                                                   //| after> 4
18                                                   //| before> 2
19                                                   //| after> 2
20                                                   //| before> 9
21                                                   //| after> 9
22                                                   //| before> 3

stopOn3是个自定义Pipe。它的功能是截取当前元素、检查当前元素值、如果遇到3则终止数据流。从运算结果看:当before> 3时数据流停止流动(停止向下游发送元素)。虽然成功地实现了它的目的,函数stopOn3的设计者必须对fs2有较深的了解。而对于FunDA的终端用户来说不要说需要掌握fs2的运算机制,就连那些复杂的fs2类型就已经不可接受了。我想了一下:如果我们提供一个像stopOn3这样的Pipe函数、由用户提供有关的功能函数作为传入参数,这样的方式应该有比较大的接收空间。我们先从类型开始:重新模拟一套简明的与fs2类型相对应的FunDA类型:

1   //数据处理管道
2   type FDAPipeLine[ROW] = Stream[Task,ROW]
3   //数据作业节点
4   type FDAWorkNode[ROW] = Pipe[Task,ROW,ROW]
5   //数据管道开关阀门,从此处获得管道内数据
6   type FDAValve[ROW] = Handle[Task,ROW]
7   //管道连接器
8   type FDAPipeJoint[ROW] = Pull[Task,ROW,Unit]

下面是用这些类型向用户提供的帮助函数(helpers):

 1   //库提供:停止数据流动
 2   def fda_haltFlow = Pull.done                        //> fda_haltFlow: => fs2.Pull[Nothing,Nothing,Nothing]
 3   //库提供:向下游发送一个ROW
 4   def fda_sendRow[ROW](row: ROW) = Pull.output1(row)  //> fda_sendRow: [ROW](row: ROW)fs2.Pull[Nothing,ROW,Unit]
 5   //库提供:处理当前数据。运行用户提供的功能wf
 6   def fda_doWork[ROW](wf: ROW => FDAPipeJoint[ROW]): FDAWorkNode[ROW] = {
 7     def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
 8       h.receive1Option {
 9         case Some((r,h)) => wf(r) >> go(h)
10         case None => fda_haltFlow
11       }
12     }
13     in => in.pull(go)
14   }   //> fda_doWork: [ROW](wf: ROW => demo.ws.FDAPipe.FDAPipeJoint[ROW])demo.ws.FDAPipe.FDAWorkNode[ROW]

现在看来貌似一旦用户可以提供一个ROW => FDAPipeJoint[ROW]函数,就可以用fda_doWork函数来运算这个函数了。我们按上面例子的功能要求来设计一个这样的函数:

 1  //样板用户提供数据处理功能函数
 2   def breakOn3[ROW]: ROW => FDAPipeJoint[ROW] = row => {
 3      if (3 == row ) fda_haltFlow
 4      else fda_sendRow(row)
 5   }                                               //> breakOn3: [ROW]=> ROW => demo.ws.FDAPipe.FDAPipeJoint[ROW]
 6   //测试运算
 7   Stream(4,2,9,3,8,1)
 8    .through(log("before"))
 9    .through(fda_doWork(breakOn3))
10    .through(log("after"))
11    .run
12    .unsafeRun                                     //> before> 4
13                                                   //| after> 4
14                                                   //| before> 2
15                                                   //| after> 2
16                                                   //| before> 9
17                                                   //| after> 9
18                                                   //| before> 3

成功实现功能。下面是这篇讨论中的示范代码:

 1 import fs2._
 2 object FDAPipe {
 3   def log[ROW](prompt: String): Pipe[Task,ROW,ROW] =
 4     _.evalMap {row => Task.delay {println(s"$prompt> $row"); row}}
 5   Stream.range(1,5).through(log("")).run.unsafeRun
 6   def stopOn3[ROW]: Pipe[Task,ROW,ROW] = in => {
 7     def go: Handle[Task,ROW] => Pull[Task,ROW,Unit] = h => {
 8       h.receive1Option {
 9         case Some((r,h)) => if ( 3 == r) Pull.done
10                             else Pull.output1(r) >> go(h)
11         case None => Pull.done
12       }
13     }
14     in.pull(go)
15   }
16   Stream(4,2,9,3,8,1)
17    .through(log("before"))
18    .through(stopOn3)
19    .through(log("after"))
20    .run
21    .unsafeRun
22   //数据处理管道
23   type FDAPipeLine[ROW] = Stream[Task,ROW]
24   //数据作业节点
25   type FDAWorkNode[ROW] = Pipe[Task,ROW,ROW]
26   //数据管道开关阀门,从此处获得管道内数据
27   type FDAValve[ROW] = Handle[Task,ROW]
28   //管道连接器
29   type FDAPipeJoint[ROW] = Pull[Task,ROW,Unit]
30   
31   //库提供:停止数据流动
32   def fda_haltFlow = Pull.done
33   //库提供:向下游发送一个ROW
34   def fda_sendRow[ROW](row: ROW) = Pull.output1(row)
35   //库提供:处理当前数据。运行用户提供的功能wf
36   def fda_doWork[ROW](wf: ROW => FDAPipeJoint[ROW]): FDAWorkNode[ROW] = {
37     def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
38       h.receive1Option {
39         case Some((r,h)) => wf(r) >> go(h)
40         case None => fda_haltFlow
41       }
42     }
43     in => in.pull(go)
44   }
45   //用户提供数据处理功能函数
46   def breakOn3[ROW]: ROW => FDAPipeJoint[ROW] = row => {
47      if (3 == row ) fda_haltFlow
48      else fda_sendRow(row)
49   }
50   //测试运算
51   Stream(4,2,9,3,8,1)
52    .through(log("before"))
53    .through(fda_doWork(breakOn3))
54    .through(log("after"))
55    .run
56    .unsafeRun
57 }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数据和云

按图索骥:SQL中数据倾斜问题的处理思路与方法

数据倾斜即表中某个字段的值分布不均匀,比如有100万条记录,其中字段A中有90万都是相同的值。这种情况下,字段A作为过滤条件时,可能会引起一些性能问题。 本文...

3236
来自专栏互联网开发者交流社区

数据库的总结

1114
来自专栏北京马哥教育

数据库基础知识:数据库中的约束和三大范式

? 一.数据库中的范式: 范式, 英文名称是 Normal Form,它是英国人 E.F.Codd(关系数据库的老祖宗)在上个世纪70年代提出关系数据库模型后...

2737
来自专栏林欣哲

MySQL数据库开发规范知识点速查

数据库设计规范 命名规范 基本设计规范 索引设计规范 字段设计规范 SQL开发规范 操作行为规范 命名规范 对象名称使用小写字母并用下划线分割 禁止使用MySQ...

34511
来自专栏分布式系统进阶

Kafka的Request和Response

每个Request和Response都由RequestHeader(ResponseHeader) + 具体的消费体构成;

592
来自专栏数据库

按图索骥:SQL中数据倾斜问题的处理思路与方法

数据倾斜即表中某个字段的值分布不均匀,比如有100万条记录,其中字段A中有90万都是相同的值。这种情况下,字段A作为过滤条件时,可能会引起一些性能问题。 本文通...

1789
来自专栏伦少的博客

利用Spark实现Oracle到Hive的历史数据同步

和上一篇文章Spark通过修改DataFrame的schema给表字段添加注释一样,通过Spark将关系型数据库(以Oracle为例)的表同步的Hive,这里讲...

593
来自专栏程序你好

怎么编写容易读懂的SQL查询

1062
来自专栏极客慕白的成长之路

MySQL的实战系列:大字段如何优化

除特别注明外,本站所有文章均为慕白博客原创,转载请注明出处来自https://geekmubai.com/programming/747.html

613
来自专栏java达人

Java开发者编写SQL语句时常见的10种错误

Java开发者对于面向对象编程思维与命令行编程思维的协调程度,取决于他们如下几种能力的水平: 1. 技巧(任何人都可以编写命令行形式的代码) 2. 教条(有的...

2105

扫码关注云+社区