Scalaz(59)- scalaz-stream: fs2-程序并行运算,fs2 running effects in parallel

    scalaz-stream-fs2是一种函数式的数据流编程工具。fs2的类型款式是:Stream[F[_],O],F[_]代表一种运算模式,O代表Stream数据元素的类型。实际上F就是一种延迟运算机制:F中间包含的类型如F[A]的A是一个可能会产生副作用不纯代码(impure code)的运算结果类型,我们必须用F对A运算的延迟机制才能实现编程过程中的函数组合(compositionality),这是函数式编程的标准做法。如果为一个Stream装备了F[A],就代表这个Stream会在处理数据元素O的过程中对O施用运算A,如果这个运算A会与外界交互(interact with outside world)如:文件、数据库、网络等的读写操作,那么这个Stream有数据元素I/O功能的需求。我们可以通过fs2 Stream的状态机器特性(state machine)及F[A]与外界交互功能来编写完整的数据处理(data processing)程序。如果能够在数据库程序编程中善用fs2的多线程运算模式来实现对数据库存取的并行运算,将会大大提高数据处理的效率。我们将在本篇着重讨论fs2在实现I/O程序中的有关方式方法。

首先,我们需要以整体Stream为程序运算框架,把与外界交互的运算A串联起来,然后通过Stream的节点来代表程序状态。我们首先需要某种方式把F[A]与Stream[F,A]关联起来,也就是我们所说的把一个F[A]升格成Stream[F,A]。fs2提供了Stream.eval函数,我们看看它的类型款式:

def eval[F[_], A](fa: F[A]): Stream[F, A] = attemptEval(fa) flatMap { _ fold(fail, emit) }

很明显,提供一个F[A],eval返回Stream[F,A]。这个返回结果Stream[F,A]的元素A是通过运算F[A]获取的:在一个数据库程序应用场景里这个A可能是个数据库连接(connection),那么F[A]就是一个连接数据库的操作函数,返回的A是个连接connection。这次我们来模拟一个对数据库表进行新纪录存储的场景。一般来说我们会按以下几个固定步骤进行:

1、连接数据库,获取connection连接

2、产生新数据(在其它场景里可能是读取数据然后更新)。这可能是一个循环的操作

3、将数据写入数据库

这三个步骤可以用Stream的三种状态来表示:一个源头(source)、传转(pipe transducer)、终点(sink)。

我们先示范如何构建源头:这是一种占用资源的操作,会产生副作用,所以我们必须用延迟运算方式来编程:

1 //用Map模拟数据库表
2 import scala.collection.mutable.Map
3 type DataStore = Map[Long, String]
4 val dataStore: DataStore = Map()       //> dataStore  : fs2Eval.DataStore = Map()
5 case class Connection(id: String, store: DataStore)
6 def src(producer: String): Stream[Task,Connection] =
7   Stream.eval(Task.delay { Connection(producer,dataStore)})
8                 //> src: (producer: String)fs2.Stream[fs2.Task,fs2Eval.Connection]

这个示范用了一个mutable map类型来模拟会产生副作用的数据库表。我们把具体产生数据的源头用Connection.id传下去便于在并行运算示范里进行跟踪。在这个环节里我们模拟了连接数据库dataStore操作。

产生数据是在内存里进行的,不会使用到connection,但我们依然需要把这个connection传递到下个环节:

1 case class Row(conn: Connection, key: Long, value: String)
2 val recId = new java.util.concurrent.atomic.AtomicLong(1)
3                                          //> recId  : java.util.concurrent.atomic.AtomicLong = 1
4 def createData(conn: Connection): Row =
5    Row(conn, recId.incrementAndGet, s"Producer $conn.id: at ${System.currentTimeMillis}")
6                                          //> createData: (conn: fs2Eval.Connection)fs2Eval.Row
7 val trans: Pipe[Task,Connection,Row] = _.map {conn => createData(conn)}
8                  //> trans  : fs2.Pipe[fs2.Task,fs2Eval.Connection,fs2Eval.Row] = <function1>

trans是个Pipe。我们可以用through把它连接到src。

向数据库读写都会产生副作用。下一个环节我们模拟把trans传递过来的Row写入数据库。这里我们需要用延迟运算机制:

1 def log: Pipe[Task, Row, Row] = _.evalMap { r =>
2  Task.delay {println(s"saving row pid:${r.conn.id}, rid:${r.key}"); r}}
3 def saveRow(row: Row) = row.conn.store += (row.key -> row.value)
4 
5 val snk: Sink[Task,Row] = _.evalMap { r =>
6   Task.delay { saveRow(r); () } }

增加了个跟踪函数log。从上面的代码可以看出:实际上Sink就是Pipe,只不过返回了()。

我们试试把这几个步骤连接起来运算一下:

1 val sprg = src("001").through(trans).repeat.take(3).through(log).to(snk)
2     //> sprg  : fs2.Stream[fs2.Task,Unit] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
3 sprg.run.unsafeRun                                //> saving row pid:001, rid:2
4                                                   //| saving row pid:001, rid:3
5                                                   //| saving row pid:001, rid:4
6 println(dataStore)       //> Map(2 -> Connection(001,Map()).id: at 1472605736214, 4 -> Connection(001,Map(2 -> Connection(001,Map()).id: at 1472605736214, 3 -> Connection(001,Map(2 -> Connection(001,Map()).id: at 1472605736214)).id: at 1472605736245)).id : at 1472605736248, 3 -> Connection(001,Map(2 -> Connection(001,Map()).id:  at 1472605736214)).id: at 1472605736245)

我们看到mutable map dataStore内容有变化了。

如果我们把以上的例子用并行运算方式来实现的话,应该如何调整?为方便观察结果,我们先在几个环节增加一些时间延迟:

1 implicit val strategy = Strategy.fromFixedDaemonPool(4)
2 implicit val scheduler = Scheduler.fromFixedDaemonPool(2)
3 def src(producer: String): Stream[Task,Connection] =
4   Stream.eval(Task.delay { Connection(producer,dataStore)}
5   .schedule(3.seconds))
6 
7 val trans: Pipe[Task,Connection,Row] = _.evalMap {conn => 
8  Task.delay{createData(conn)}.schedule(1.second)}

下面我们把一些类型调整成Stream[Task,Stream[Row]],然后把concurrent.join函数掺进去:

 1 val srcs = concurrent.join(3)(Stream(src("001"),src("002"),src("003"),src("004")))
 2     //> srcs  : fs2.Stream[fs2.Task,fs2Eval.Connection] = attemptEval(Task).flatMap
 3 <function1>).flatMap(<function1>)
 4 val recs: Pipe[Task,Connection,Row] = src => {
 5     concurrent.join(4)(src.map { conn =>
 6       Stream.repeatEval(Task {createData(conn)}.schedule(1.second)) })
 7   }        //> recs  : fs2.Pipe[fs2.Task,fs2Eval.Connection,fs2Eval.Row] = <function1>
 8   
 9 def saveRows(row: Row) = { row.conn.store += (row.key -> row.value); row}
10                                          //> saveRows: (row: fs2Eval.Row)fs2Eval.Row
11 val snks: Pipe[Task,Row,Row] = rs => {
12     concurrent.join(4)(rs.map { r =>
13       Stream.eval(Task {saveRows(r)}.schedule(1.second)) })
14   }                                      //> snks  : fs2.Pipe[fs2.Task,fs2Eval.Row,fs2Eval.Row] = <function1>

我们试着把它们连接起来进行运算:

 1 val par = srcs.through(recs).take(10).through(log("before")).through(chnn).through(log("after"))
 2    //> par  : fs2.Stream[fs2.Task,fs2Eval.Row] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
 3 par.run.unsafeRun                                 //> before saving pid:001, rid:3
 4                                                   //| before saving pid:003, rid:2
 5                                                   //| before saving pid:002, rid:4
 6                                                   //| before saving pid:001, rid:5
 7                                                   //| after saving pid:001, rid:3
 8                                                   //| after saving pid:003, rid:2
 9                                                   //| before saving pid:003, rid:6
10                                                   //| after saving pid:002, rid:4
11                                                   //| before saving pid:002, rid:7
12                                                   //| after saving pid:001, rid:5
13                                                   //| before saving pid:001, rid:8
14                                                   //| before saving pid:003, rid:9
15                                                   //| after saving pid:003, rid:6
16                                                   //| after saving pid:002, rid:7
17                                                   //| before saving pid:002, rid:10
18                                                   //| before saving pid:004, rid:11
19                                                   //| after saving pid:001, rid:8
20                                                   //| after saving pid:003, rid:9
21                                                   //| after saving pid:002, rid:10
22                                                   //| after saving pid:004, rid:11

从跟踪函数显示可以看出before,after是交叉发生的,这就代表已经实现了并行运算。

下面是本篇示范源代码:

 1 import fs2._
 2 import scala.concurrent.duration._
 3 object fs2Eval {
 4 
 5 //用Map模拟数据库表
 6 import scala.collection.mutable.Map
 7 type DataStore = Map[Long, String]
 8 val dataStore: DataStore = Map()
 9 case class Connection(id: String, store: DataStore)
10 implicit val strategy = Strategy.fromFixedDaemonPool(4)
11 implicit val scheduler = Scheduler.fromFixedDaemonPool(2)
12 def src(producer: String): Stream[Task,Connection] =
13   Stream.eval(Task.delay { Connection(producer,dataStore)}
14   .schedule(3.seconds))
15 case class Row(conn: Connection, key: Long, value: String)
16 val recId = new java.util.concurrent.atomic.AtomicLong(1)
17 def createData(conn: Connection): Row =
18    Row(conn, recId.incrementAndGet, s"$conn.id: at ${System.currentTimeMillis}")
19 val trans: Pipe[Task,Connection,Row] = _.evalMap {conn =>
20  Task.delay{createData(conn)}.schedule(1.second)}
21 
22 def log(pfx: String): Pipe[Task, Row, Row] = _.evalMap { r =>
23  Task.delay {println(s"$pfx saving pid:${r.conn.id}, rid:${r.key}"); r}}
24 def saveRow(row: Row) = row.conn.store += (row.key -> row.value)
25 
26 val snk: Sink[Task,Row] = _.evalMap { r =>
27   Task.delay { saveRow(r); () } }
28 
29 val sprg = src("001").through(trans).repeat.take(3).through(log("")).to(snk)
30 //sprg.run.unsafeRun
31 //println(dataStore)
32 
33 val srcs = concurrent.join(3)(Stream(src("001"),src("002"),src("003"),src("004")))
34 val recs: Pipe[Task,Connection,Row] = src => {
35     concurrent.join(4)(src.map { conn =>
36       Stream.repeatEval(Task {createData(conn)}.schedule(1.second)) })
37   }
38   
39 def saveRows(row: Row) = { row.conn.store += (row.key -> row.value); row}
40 val chnn: Pipe[Task,Row,Row] = rs => {
41     concurrent.join(4)(rs.map { r =>
42       Stream.eval(Task {saveRows(r)}.schedule(1.second)) })
43   }
44 
45 
46 val par = srcs.through(recs).repeat.take(10).through(log("before")).through(chnn).through(log("after"))
47 par.run.unsafeRun

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏日常分享

Java 垃圾回收机制(早期版本)

Java 垃圾回收机制在我们普通理解来看,应该视为一种低优先级的后台进程来实现的,其实早期版本的Java虚拟机并非以这种方式实现的。

581
来自专栏性能与架构

Redis 实现队列优先级

通常使用一个list来实现队列操作,这样有一个小限制,所以的任务统一都是先进先出,如果想优先处理某个任务就不太好处理了 这就需要让队列有优先级的概念,我们就可...

4195
来自专栏木可大大

漫谈虚拟内存

如上图,程序1、程序2、程序3装入到内存,而程序2运行完成被换出,内存空闲出20k,然后进来程序4,大小为25K,此时,只有两处空闲块,10K和20K,没有一处...

4814
来自专栏JackieZheng

AngularJS in Action读书笔记3——走近Services

  试着想想这些问题:如果一个controller只关心自己所控制的view页面,那么对于整个application来说,你如何调用想要的function;如果...

2009
来自专栏微信公众号:Java团长

面试的角度诠释Java工程师(一)

我相信每一个程序员都是为了生活而努力着的。很多人因为兴趣,从此踏上了这条‘烧脑大行动’的金桥;也有很多人因为梦想和执着,奋不顾身融入这个职业;还有很多人因为被现...

561
来自专栏木木玲

对 volatile、compareAndSet、weakCompareAndSet 的一些思考

1815
来自专栏公有云大数据平台弹性MapReduce

yarn UI中appliaction展示个数分析

客户在使用我们的EMR产品时一天大概提交2000个appliaction,但是yarn的UI界面仅仅展示出了100多个历史application信息,影响了客户...

3855
来自专栏逆向技术

win32进程概念之句柄表,以及内核对象.

我们知道.我们使用CreateProcess 的时候会返回一个进程句柄.以及线程句柄. 其实在调用CreateProcess的时候.内核中会新建一个EPROCE...

612
来自专栏炸天帮5

win32进程概念之句柄表,以及内核对象.

我们知道.我们使用CreateProcess 的时候会返回一个进程句柄.以及线程句柄. 其实在调用CreateProcess的时候.内核中会新建一个EPROCE...

291
来自专栏网络

玩转 React 服务器端渲染

【编者按】React 生态提供了很多选择方案,这里我们选用 Redux 和 react-router 来做说 React 提供了两个方法renderToStri...

2258

扫码关注云+社区