Scalaz(52)- scalaz-stream: 并行运算-parallel processing concurrently by merging

   如果scalaz-stream真的是一个实用的数据流编程工具库的话,那它应该能处理同时从多个数据源获取数据以及把数据同时送到多个终点(Sink),最重要的是它应该可以实现高度灵活的多线程运算。但是:我们说Process代表了一串可能是无穷的元素。这个一串的意思是多个按序排列的元素。也就是说如果我们有一个Process(a,b,c),那么我们只能按顺序来进行运算:我们只能在完成了对a的运算后才能运算b。这样也说得过去:它让我们更容易理解scalaz-stream Process的运算过程。面对scalaz-stream这样的特性我们应该怎样去实现它的并行运算呢?实际上在很多应用场景中我们对运算结果的排列顺序并不关心,我们只对运算结果内容感兴趣。如:从数据库库存表中查询商品价格大于100的所有商品,这时我们对读出商品记录的顺序并不关心,我们只对每条记录的价格感兴趣。如果我们从很多源头(数据表)读取商品信息的话,可以同时对这些源头进行并行读取。scalaz-stream是通过merge来实现并行运算的。merge可以同时读取多个数据源然后产生一个合并的数据流。由于各个源头的滞后情况有所不同,所以merge产生结果的顺序是不可预测的(nondeterministic)。我们用个例子来示范有那些方法可以同时从三个文件中逐行读取文字然后再合并成一个多行文件:

1 al p1 = io linesR s"/Users/Tiger/Process.scala"  
2 //> p1  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@7494e528,<function1>,<function1>)
3 val p2 = io linesR s"/Users/Tiger/Wye.scala"      
4 //> p2  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1f554b06,<function1>,<function1>)
5 val p3 = io linesR s"/Users/Tiger/Tee.scala"     
6 //> p3  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@694e1548,<function1>,<function1>)

p1,p2,p3是三个Source。它们分别从Process.scala, Wye.scala, Tee.scala中读取数据。我们可以模拟读取数据时可能遇到的延迟:

1 //假定读取数据造成不确定延迟
2 def readDelay(i: Int) = Thread.sleep( i/10 )      //> readDelay: (i: Int)Unit
3 val pa = p1.map{ s => readDelay(s.length); s}     //> pa  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@7494e528,<function1>,<function1>)
4 val pb = p2.map{ s => readDelay(s.length); s}     //> pb  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1f554b06,<function1>,<function1>)
5 val pc = p3.map{ s => readDelay(s.length); s}     //> pc  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@694e1548,<function1>,<function1>)

现在pa,pb,pc都按照所读文件中每行文字长度来产生滞延。下面我们先统计一下每个Process运算所需要的时间:

 1 val pa_start = System.currentTimeMillis           //> pa_start  : Long = 1470051661503
 2 val palines= pa.runFoldMap(_ => 1).run            //> palines  : Int = 1616
 3 println(s"reading p1 $palines lines in ${System.currentTimeMillis - pa_start}ms")
 4                                                   //> reading p1 1616 lines in 6413ms
 5 val pb_start = System.currentTimeMillis           //> pb_start  : Long = 1470051667917
 6 val pblines=pb.runFoldMap(_ => 1).run             //> pblines  : Int = 901
 7 println(s"reading p2 $pblines lines in ${System.currentTimeMillis - pb_start}ms")
 8                                                   //> reading p2 901 lines in 3275ms
 9 val pc_start = System.currentTimeMillis           //> pc_start  : Long = 1470051671192
10 val pclines=pc.runFoldMap(_ => 1).run             //> pclines  : Int = 306
11 println(s"reading p3 $pclines lines in ${System.currentTimeMillis - pc_start}ms")
12                                                   //> reading p3 306 lines in 1181ms
13 println(s"reading all ${palines+pblines+pclines} lines in ${System.currentTimeMillis - pa_start}ms")
14                                                   //> reading all 2823 lines in 10870ms

三个文件总共有2823行,读取时间为10870ms。我们用append方式来连续运算:

1 val pl_start = System.currentTimeMillis           //> pl_start  : Long = 1470051672373
2 val plines = (pa ++ pb ++ pc).runFoldMap(_ => 1).run
3                                                   //> plines  : Int = 2823
4 println(s"continue reading $plines in ${System.currentTimeMillis - pl_start}ms")
5                                                   //> continue reading 2823 in 10501ms

连续运算所需时间10501ms,稍微短于分开运算结果。那么如果我们用merge来并行运算呢?

1 val par_start = System.currentTimeMillis          //> par_start  : Long = 1470051682874
2 val parlines = (pa merge pb merge pc).runFoldMap(_ => 1).run
3                                                   //> parlines  : Int = 2823
4 println(s"parallel reading $parlines in ${System.currentTimeMillis - par_start}ms")
5                                                   //> parallel reading 2823 in 6278ms

现在整个运算只需要6278ms,约莫是连续运算所需时间的60%。当然,如果我们需要从更多的源头读取数据的话,那么merge方法可以实现更高的效率提升。但是,由于stream可能是一串无穷的元素,我们更需要对一个stream无穷的元素实现并行运算。在上面的例子里我们用merge把三个源头的数据合并成为一个更长的数据串,如果我们对其中每条记录进行运算如抽取、对比筛选等的话,那么运算时间仍然与数据串的长度成直线正比。比如:在以上例子的基础上,我们需要对合并的数据进行统计:计算出使用元音(vowl)的频率的。我们可以先把每条记录中的vowl过滤出来;然后把所有筛选出来的记录加起来就能得出这个统计结果了:

 1 /c 是个vowl
 2 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c)
 3                                                   //> vowls: (c: Char)Boolean
 4 
 5 //返回Map代表每个字符频率, 测试使用了scalaz.Lens
 6 def vowlCount(text: String): Map[Char,Int] = {
 7     text.toUpperCase.toList.filter(vowls).foldLeft(Map[Char,Int]()) { (b,a) =>
 8       if ((Lens.mapVLens(a) get b) == None) Lens.mapVLens(a) set(b,1.some)
 9       else Lens.mapVLens(a).set(b, (Lens.mapVLens(a) get b).map(_ + 1))
10     }
11  }                                                //> vowlCount: (text: String)Map[Char,Int]
12 //直接用scala标准库实现
13 def stdVowlsCount(text: String): Map[Char,Int] =
14   text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size)
15                                                   //> stdVowlsCount: (text: String)Map[Char,Int]

我们先按序运算结果:

 1 //为runFoldMap提供一个Map[Char,Int]Monoid实例
 2 implicit object mapMonoid extends Monoid[Map[Char,Int]]  {
 3    def zero: Map[Char,Int] = Map()
 4    def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = {
 5      (m1.keySet ++ m2.keySet).map { k =>
 6        (k, m1.getOrElse(k,0) + m2.getOrElse(k,0))
 7      }.toMap
 8    }
 9 }
10 
11 val cnt_start = System.currentTimeMillis          //> cnt_start  : Long = 1470197392016
12 val merged = (pa merge pb merge pc)
13   .map(vowlCount)
14   .runFoldMap(identity).run                       //> merged  : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O-> 3748)
15 println(s"calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms")
16                                                   //> calc vowl frequency in 28646ms

整个运算需要28646ms。实际上这些运算不会依赖每条记录的排列位置,那么如果能够实现并行运算的话可能会提高效率。scalaz-stream提供了merge.mergeN方法来支持对一顺数据流进行并行运算。merge.mergeN函数的款式如下:

/**
   * Merges non-deterministically processes that are output of the `source` process.
   *
   * Merging stops when all processes generated by source have stopped, and all source process stopped as well.
   * Merging will also stop when resulting process terminated. In that case the cleanup of all `source`
   * processes is run, followed by cleanup of resulting process.
   *
   * When one of the source processes fails the mergeN process will fail with that reason.
   *
   * Merging is non-deterministic, but is fair in sense that every process is consulted, once it has `A` ready.
   * That means processes that are `faster` provide it's `A` more often than slower processes.
   *
   * Internally mergeN keeps small buffer that reads ahead up to `n` values of `A` where `n` equals to number
   * of active source streams. That does not mean that every `source` process is consulted in this read-ahead
   * cache, it just tries to be as much fair as possible when processes provide their `A` on almost the same speed.
   *
   */
  def mergeN[A](source: Process[Task, Process[Task, A]])(implicit S: Strategy): Process[Task, A] =
    scalaz.stream.nondeterminism.njoin(0, 0)(source)(S)

  /**
   * MergeN variant, that allows to specify maximum of open `source` processes.
   * If, the maxOpen is <= 0 it acts like standard mergeN, where the number of processes open is not limited.
   * However, when the maxOpen > 0, then at any time only `maxOpen` processes will be running at any time
   *
   * This allows for limiting the eventual concurrent processing of opened streams not only by supplied strategy,
   * but also by providing a `maxOpen` value.
   *
   *
   * @param maxOpen   Max number of open (running) processes at a time
   * @param source    source of processes to merge
   */
  def mergeN[A](maxOpen: Int)(source: Process[Task, Process[Task, A]])(implicit S: Strategy): Process[Task, A] =
    scalaz.stream.nondeterminism.njoin(maxOpen, maxOpen)(source)(S)

mergeN的入参source类型款式是这样的:Process[Task,Process[Task,A]],意思是在Process里还有一个Process。这个内部Process是并行运算的。这样的类型款式也可以被理解为:内部的Process是读取数据库的记录(data),我们可以同时从多个源头读取数据,外部Process是数据库连接(connection)。应用在我们上面的例子里:内部Process就是vowlCount作业,因为我们希望对每条记录的vowlCount并行处理。那么我们先要进行类型款式转换:从Process[Task,A] 转换到 Process[Task,Process[Task,A]]:

1 val merged = (pa merge pb merge pc)               //> merged  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Hal
2                                                   //| t(End),Vector(<function1>))
3 val par = merged.map {text => Task {vowlCount(text)} }
4           .map {task => Process.eval(task)}       //> par  : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Map[Char,Int]]] = Append(Halt(End),Vector(<function1>))

这个par的类型是我们希望的了。现在我们可以看看mergeN运算的效率:

1 val cnt_start = System.currentTimeMillis          //> cnt_start  : Long = 1470204623562
2 val merged = (pa merge pb merge pc)               //> merged  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
3 val par = merged.map {text => Task {vowlCount(text)} }
4           .map {task => Process.eval(task)}       //> par  : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Map[Char,Int]]] = Append(Halt(End),Vector(<function1>))
5 val resm = merge.mergeN(par).runFoldMap(identity).run
6                                                   //> resm  : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O -> 3748)
7 println(s"parallel calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms")
8                                                   //> parallel calc vowl frequency in 6922ms

看看这个结果:从28646ms降到6922,约莫4倍效率的提高,够显著的了。如果我们把上面这个例子用在实际的数据库操作上:比如对几个数据库表里的所有在一定价格范围内商品购买次数进行统计等,我们是可以在scalaz-stream里实现这个场景并行运算的。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏GreenLeaves

Stopwatch类学习

1、概述:给一条大MSDN的链接关于Stopwatch类最详细的教程 ,然后看着教程自己手动敲一边,加深映象,好记性不如烂键盘,哈哈,开个玩笑! 2、类位置:这...

1798
来自专栏好好学java的技术栈

SpringMVC+RestFul详细示例实战教程(实现跨域访问)

**REST(Representational State Transfer)**,中文翻译叫“表述性状态转移”。是 Roy Thomas Fielding 在...

1574
来自专栏大内老A

关于CLR内存管理一些深层次的讨论[上篇]

半年之前,PM让我在部门内部进行一次关于“内存泄露”的专题分享,我为此准备了一份PPT。今天无意中将其翻出来,觉得里面提到的关于CLR下关于内存管理部分的内存还...

1688
来自专栏函数式编程语言及工具

泛函编程(38)-泛函Stream IO:IO Process in action

  在前面的几节讨论里我们终于得出了一个概括又通用的IO Process类型Process[F[_],O]。这个类型同时可以代表数据源(Source)和数据终端...

1817
来自专栏函数式编程语言及工具

泛函编程(24)-泛函数据类型-Monad, monadic programming

    在上一节我们介绍了Monad。我们知道Monad是一个高度概括的抽象模型。好像创造Monad的目的是为了抽取各种数据类型的共性组件函数汇集成一套组件库从...

19410
来自专栏写代码的海盗

scala与java之间的那些事

  scala与java之间的关系,我认为可以用一句话来开头:scala来源于java,但又高于java。   scala的设计者Martin Odersky就...

3325
来自专栏大内老A

通过定义UnityContainer扩展变”Explicit Interception”为”Automatic Interception”

Unity是微软P&P部门开发的一个轻量级IoC框架,通过Interception机制可以实现基于三种拦截机制的AOP。不过Unity仅仅提供“显式”拦截机制,...

1899
来自专栏程序员Gank

【译】使用RxJava实现延迟订阅

我越来越喜欢把RxJava的defer()操作符作为一个工具来使用,以确保Observable代码在被订阅后才执行(而不是创建后立即执行)。我之前写过一些有关d...

863
来自专栏.NET技术

表达式树的解析.

公司的orm框架在dapper的基础上扩展了一套表达式的方法,当时就研究了一下,把学习过程和结果记录下来,和大家分享。

1655
来自专栏GreenLeaves

WCF系列教程之WCF客户端异常处理

本文参考自:http://www.cnblogs.com/wangweimutou/p/4414393.html,纯属读书笔记,加深记忆 一、简介 当我们打开W...

1856

扫码关注云+社区