Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing

    从表面上来看,Stream代表一连串无穷数据元素。一连串的意思是元素有固定的排列顺序,所以对元素的运算也必须按照顺序来:完成了前面的运算再跟着进行下一个元素的运算。这样来看,Stream应该不是很好的并行运算工具。但是,fs2所支持的并行运算方式不是以数据元素而是以Stream为运算单位的:fs2支持多个Stream同时进行运算,如merge函数。所以fs2使Stream的并行运算成为了可能。

一般来说,我们可能在Stream的几个状态节点要求并行运算:

1、同时运算多个数据源头来产生不排序的数据元素

2、同时对获取的一连串数据元素进行处理,如:map(update),filter等等

3、同时将一连串数据元素无序存入终点(Sink)

我们可以创建一个例子来示范fs2的并行运算:模拟从3个文件中读取字串,然后统计在这3个文件中母音出现的次数。假设文件读取和母音统计是有任意时间延迟的(latency),我们看看如何进行并行运算及并行运算能有多少效率上的提升。我们先设定一些跟踪和模拟延迟的帮助函数:

1 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap { a => Task.delay{ println(s"$prompt>"); a }}
2                                                   //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
3 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a =>
4   val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) }
5   delay.flatMap {d => Task.now(a).schedule(d.millis) }
6 }                                                 //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.

log是个跟踪函数,randomDelay是个延迟模拟函数,模拟在max内的任意时间延迟。

与scalaz-stream-0.8不同,fs2重新实现了文件操作功能:不再依赖java的字串(string)处理功能。也不再依赖scodec的二进制数据转换功能。下面是fs2的文件读取方法示范:

1 val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024)
2   //> s1  : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>)
3 val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024)
4   //> s2  : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>)
5 val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024)
6   //> s3  : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>)

fs2.io.file.readAll函数的款式如下:

def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] ={...}

readAll分批(by chunks)从文件中读取Byte类型数据(当返回数据量小于chunkSize代表完成读取),返回结果类型是Stream[F,Byte]。我们需要进行Byte>>>String转换及分行等处理。fs2在text对象里提供了相关函数:

object text {
  private val utf8Charset = Charset.forName("UTF-8")

  /** Converts UTF-8 encoded byte stream to a stream of `String`. */
  def utf8Decode[F[_]]: Pipe[F, Byte, String] =
    _.chunks.through(utf8DecodeC)

  /** Converts UTF-8 encoded `Chunk[Byte]` inputs to `String`. */
  def utf8DecodeC[F[_]]: Pipe[F, Chunk[Byte], String] = {
    /**
      * Returns the number of continuation bytes if `b` is an ASCII byte or a
      * leading byte of a multi-byte sequence, and -1 otherwise.
      */
    def continuationBytes(b: Byte): Int = {
      if      ((b & 0x80) == 0x00) 0 // ASCII byte
      else if ((b & 0xE0) == 0xC0) 1 // leading byte of a 2 byte seq
      else if ((b & 0xF0) == 0xE0) 2 // leading byte of a 3 byte seq
      else if ((b & 0xF8) == 0xF0) 3 // leading byte of a 4 byte seq
      else                        -1 // continuation byte or garbage
    }
...
/** Encodes a stream of `String` in to a stream of bytes using the UTF-8 charset. */
  def utf8Encode[F[_]]: Pipe[F, String, Byte] =
    _.flatMap(s => Stream.chunk(Chunk.bytes(s.getBytes(utf8Charset))))

  /** Encodes a stream of `String` in to a stream of `Chunk[Byte]` using the UTF-8 charset. */
  def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] =
    _.map(s => Chunk.bytes(s.getBytes(utf8Charset)))

  /** Transforms a stream of `String` such that each emitted `String` is a line from the input. */
  def lines[F[_]]: Pipe[F, String, String] = {
...

utf8Encode,utf8Decode,lines这几个函数正是我们需要的,它们都是Pipe类型。我们可以把这几个Pipe直接用through接到Stream上:

 1 val startTime = System.currentTimeMillis         //> startTime  : Long = 1472444756321
 2  val s1lines = s1.through(text.utf8Decode).through(text.lines)
 3      .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun
 4                                                   //> s1lines  : Int = 479
 5  println(s"reading s1 $s1lines lines in ${System.currentTimeMillis - startTime}ms")
 6                                                   //> reading s1 479 lines in 5370ms
 7  
 8  val startTime2 = System.currentTimeMillis        //> startTime2  : Long = 1472444761691
 9  val s2lines = s2.through(text.utf8Decode).through(text.lines)
10    .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun
11                                                   //> s2lines  : Int = 174
12  println(s"reading s2 $s2lines lines in ${System.currentTimeMillis - startTime2}ms")
13                                                   //> reading s2 174 lines in 1923ms
14  val startTime3 = System.currentTimeMillis        //> startTime3  : Long = 1472444763614
15  val s3lines = s3.through(text.utf8Decode).through(text.lines)
16    .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun
17                                                   //> s3lines  : Int = 174
18 println(s"reading s3 $s3lines lines in ${System.currentTimeMillis - startTime3}ms")
19                                                   //> reading s3 174 lines in 1928ms
20 println(s"reading all three files ${s1lines+s2lines+s3lines} total lines in ${System.currentTimeMillis - startTime}ms")
21                                                   //> reading all three files 827 total lines in 9221ms

在以上的例子里我们用runFold函数统计文件的文字行数并在读取过程中用randomDelay来制造了随意长度的拖延。上面3个文件的字串读取和转换处理一共877行、9221ms。

我们知道fs2的并行运算函数concurrent.join函数类型款式是这样的:

def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}

join运算的对象outer是个两层Stream(Streams of Stream):Stream[F,Stream[F,P]],我们需要先进行类型款式调整:

1 val lines1 = s1.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))
2   //> lines1  : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>)
3 val lines2 = s2.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))
4   //> lines2  : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>)
5 val lines3 = s3.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))
6   //> lines3  : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>)
7 val ss: Stream[Task,Stream[Task,String]] = Stream(lines1,lines2,lines3)
8   //> ss  : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,String]] = Segment(Emit(Chunk(evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>), evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>), evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>))))

现在这个ss的类型复合我们的要求。我们可以测试一下并行运算的效率:

1 val ss_start = System.currentTimeMillis           //> ss_start  : Long = 1472449962698
2 val ss_lines = fs2.concurrent.join(3)(ss).runFold(0)((b,_) => b + 1).unsafeRun
3                                                   //> ss_lines  : Int = 827
4 println(s"parallel reading all files ${ss_lines} total lines in ${System.currentTimeMillis - ss_start}ms")
5                                                   //> parallel reading all files 827 total lines in 5173ms

读取同等行数但只用了5173ms,与之前的9221ms相比,大约有成倍的提速。

join(3)(ss)返回了一个合并的Stream,类型是Stream[Task,String]。我们可以运算这个Stream里母音出现的频率。我们先设计这个统计函数:

1 //c 是个vowl
2 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c)
3                                                   //> vowls: (c: Char)Boolean
4 //直接用scala标准库实现
5 def pipeVowlsCount: Pipe[Task,String,Map[Char,Int]] =
6   _.evalMap (text => Task.delay{
7      text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size)
8      }.schedule((text.length / 10).millis))       //> pipeVowlsCount: => fs2.Pipe[fs2.Task,String,Map[Char,Int]]

注意我们使用了text => Task.delay{...}.schedule(d),实际上我们完全可以用 text => Thread.sleep(d),但是这样会造成了不纯代码,所以我们用evalMap来实现纯代码运算。试试统计全部字串内母音出现的总数:

 1 import scalaz.{Monoid}
 2 //为runFold提供一个Map[Char,Int]Monoid实例
 3 implicit object mapMonoid extends Monoid[Map[Char,Int]]  {
 4    def zero: Map[Char,Int] = Map()
 5    def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = {
 6      (m1.keySet ++ m2.keySet).map { k =>
 7        (k, m1.getOrElse(k,0) + m2.getOrElse(k,0))
 8      }.toMap
 9    }
10 }
11 val vc_start = System.currentTimeMillis           //> vc_start  : Long = 1472464772465
12 val vowlsLine = fs2.concurrent.join(3)(ss).through(pipeVowlsCount)
13     .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun
14   //> vowlsLine  : scala.collection.immutable.Map[Char,Int] = Map(E -> 3381, U - 838, A -> 2361, I -> 2031, O -> 1824)
15 println(s"parallel reading all files and counted vowls sequencially in ${System.currentTimeMillis - vc_start}ms")
16   //> parallel reading all files and counted vowls sequencially in 10466ms

我们必须为runFold提供一个Monoid[Map[Char,Int]]实例mapMonoid。

那我们又如何实现统计功能的并行运算呢? fs2.concurrent.join(maxOpen)(...)函数能把一个Stream截成maxOpen数的子Stream,然后对这些子Stream进行并行运算。那么我们又如何转换Stream[F,Stream[F,O]]类型呢?我们必须把Stream[F,O]的O升格成Stream[F,O]。我们先用一个函数来把O转换成Map[Char,Int],然后把这个函数升格成Stream[Task,Map[Char,Int],这个可以用Stream.eval实现:

1 def fVowlsCount(text: String): Map[Char,Int] =
2   text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size)
3                                                   //> fVowlsCount: (text: String)Map[Char,Int]
4 val parVowlsLine: Stream[Task,Stream[Task,Map[Char,Int]]] = fs2.concurrent.join(3)(ss)
5     .map {text => Stream.eval(Task {fVowlsCount(text)}.schedule((text.length / 10).millis))}
6     //> parVowlsLine  : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Map[Char,Int]]] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>).mapChunks(<function1>)

我们来检查一下运行效率:

1 val parvc_start = System.currentTimeMillis        //> parvc_start  : Long = 1472465844694
2 fs2.concurrent.join(8)(parVowlsLine)
3   .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun
4   //> res0: scala.collection.immutable.Map[Char,Int] = Map(E -> 3381, U -> 838, A-> 2361, I -> 2031, O -> 1824)
5 println(s"parallel reading all files and counted vowls in ${System.currentTimeMillis - parvc_start}ms")
6   //> parallel reading all files and counted vowls in 4984ms

并行运算只需要4985ms,而流程运算需要10466+(9221-5173)=14xxx,这里有3,4倍的速度提升。

下面是这次讨论的示范源代码:

 1 import fs2._
 2 import scala.language.{higherKinds,implicitConversions,postfixOps}
 3 import scala.concurrent.duration._
 4 object fs2Merge {
 5 implicit val strategy = Strategy.fromFixedDaemonPool(4)
 6 implicit val scheduler = Scheduler.fromFixedDaemonPool(2)
 7 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap { a => Task.delay{ println(s"$prompt>"); a }}
 8 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a =>
 9   val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) }
10   delay.flatMap {d => Task.now(a).schedule(d.millis) }
11 }
12      
13  val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024)
14  val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024)
15  val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024)
16  
17 
18  val startTime = System.currentTimeMillis
19  val s1lines = s1.through(text.utf8Decode).through(text.lines)
20      .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun
21  println(s"reading s1 $s1lines lines in ${System.currentTimeMillis - startTime}ms")
22  
23  val startTime2 = System.currentTimeMillis
24  val s2lines = s2.through(text.utf8Decode).through(text.lines)
25    .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun
26  println(s"reading s2 $s2lines lines in ${System.currentTimeMillis - startTime2}ms")
27  val startTime3 = System.currentTimeMillis
28  val s3lines = s3.through(text.utf8Decode).through(text.lines)
29    .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun
30 println(s"reading s3 $s3lines lines in ${System.currentTimeMillis - startTime3}ms")
31 println(s"reading all three files ${s1lines+s2lines+s3lines} total lines in ${System.currentTimeMillis - startTime}ms")
32 val lines1 = s1.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))
33 val lines2 = s2.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))
34 val lines3 = s3.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis))
35 val ss: Stream[Task,Stream[Task,String]] = Stream(lines1,lines2,lines3)
36 val ss_start = System.currentTimeMillis
37 val ss_lines = fs2.concurrent.join(3)(ss).runFold(0)((b,_) => b + 1).unsafeRun
38 println(s"parallel reading all files ${ss_lines} total lines in ${System.currentTimeMillis - ss_start}ms")
39 
40 //c 是个vowl
41 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c)
42 //直接用scala标准库实现
43 def pipeVowlsCount: Pipe[Task,String,Map[Char,Int]] =
44   _.evalMap (text => Task.delay{
45      text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size)
46      }.schedule((text.length / 10).millis))
47   
48 import scalaz.{Monoid}
49 //为runFold提供一个Map[Char,Int]Monoid实例
50 implicit object mapMonoid extends Monoid[Map[Char,Int]]  {
51    def zero: Map[Char,Int] = Map()
52    def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = {
53      (m1.keySet ++ m2.keySet).map { k =>
54        (k, m1.getOrElse(k,0) + m2.getOrElse(k,0))
55      }.toMap
56    }
57 }
58 val vc_start = System.currentTimeMillis
59 val vowlsLine = fs2.concurrent.join(3)(ss).through(pipeVowlsCount)
60     .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun
61 println(s"parallel reading all files and counted vowls sequencially in ${System.currentTimeMillis - vc_start}ms")
62 def fVowlsCount(text: String): Map[Char,Int] =
63   text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size)
64 val parVowlsLine: Stream[Task,Stream[Task,Map[Char,Int]]] = fs2.concurrent.join(3)(ss)
65     .map {text => Stream.eval(Task {fVowlsCount(text)}.schedule((text.length / 10).millis))}
66 val parvc_start = System.currentTimeMillis
67 fs2.concurrent.join(8)(parVowlsLine)
68   .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun
69 println(s"parallel reading all files and counted vowls in ${System.currentTimeMillis - parvc_start}ms") 
70 }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊storm的window trigger

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentPr...

814
来自专栏小樱的经验随笔

getline函数(精华版)

在我的印象中,getline函数经常出现在自己的视野里,模糊地记得它经常用来读取字符串 。但是又对它的参数不是很了解,今天又用到了getline函数,现在来细细...

2834
来自专栏柠檬先生

JavaScript 基础(七) 箭头函数 generator Date JSON

ES6 标准新增了一种新的函数: Arrow Function(箭头函数)。     x => x *x     上面的箭头相当于:   ...

1975
来自专栏函数式编程语言及工具

Scalaz(56)- scalaz-stream: fs2-安全运算,fs2 resource safety

    fs2在处理异常及资源使用安全方面也有比较大的改善。fs2 Stream可以有几种方式自行引发异常:直接以函数式方式用fail来引发异常、在纯代码里隐式...

1885
来自专栏HTML5学堂

JS 计时器参数剖析与真题

HTML5学堂-码匠:计时器的第一个参数,包含几种不同的书写方法,可以是函数名,匿名函数,JS代码字符串,还有一些面试题当中会出现“函数调用”的书写方式。 那么...

2714
来自专栏偏前端工程师的驿站

MyBatis魔法堂:ResultMap详解

一、前言                                  MyBatis是基于“数据库结构不可控”的思想建立的,也就是我们希望数据库遵循第三范...

1867
来自专栏于晓飞的专栏

Java 注解 学习笔记

我们平常写Java代码,对其中的注解并不是很陌生,比如说写继承关系的时候经常用到@Override来修饰方法。但是@Override是用来做什么的,为什么写继承...

1281
来自专栏牛肉圆粉不加葱

Spark的位置优先: TaskSetManager 的有效 Locality Levels

在Spark Application Web UI的 Stages tag 上,我们可以看到这个的表格,描述的是某个 stage 的 tasks 的一些信息,其...

793
来自专栏xdecode

Spring之AOP

之前在另外一篇博文里介绍过AOP的概念, 这边不在赘述, 可以参考 AOP与动态代理 本文主要介绍Spring中AOP的应用. 切入点: 具体实现类的方法 ...

18810
来自专栏carven

js原生函数之call和apply,bind

call 和 apply 和 bind 都是为了改变某个函数运行时的 context 即上下文而存在的,换句话说,就是为了改变函数体内部 this 的指向。

630

扫码关注云+社区