泛函编程(19)-泛函库设计-Parallelism In Action

    上节我们讨论了并行运算组件库的基础设计,实现了并行运算最基本的功能:创建新的线程并提交一个任务异步执行。并行运算类型的基本表达形式如下:

 1 import java.util.concurrent._
 2 object Par {
 3   type Par[A] = ExecutorService => Future[A]
 4   def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)
 5                                                   //> run: [A](es: java.util.concurrent.ExecutorService)(pa: ch71.Par.Par[A])java.
 6                                                   //| util.concurrent.Future[A]
 7   def unit[A](a: A): Par[A] = {
 8       es => new Future[A] {
 9           def get = a
10           def get(t: Long, u: TimeUnit) = get
11           def isDone = true
12           def isCancelled = false
13           def cancel(evenIsRunning: Boolean) = false
14       }
15   }                                               //> unit: [A](a: A)ch71.Par.Par[A]
16   def fork[A](pa: Par[A]): Par[A] = {            //注意这里有个错误?
17       es => es.submit(new Callable[A] {
18           def call: A = run(es)(pa).get
19       })
20   }
21   def async[A](a: => A): Par[A] = fork(unit(a))
22   
23 }

实际上我们已经实现了两项最基本的函数:

1、unit[A](a: A): Par[A] : 我们硬生生的按照Par的类型款式造了一个Future实例,这样我们才可以用Future.get的形式读取运算结果值。看看这个例子:unit(42+1),在调用函数unit时由于传入参数是即时计算的,所以在进入unit前已经完成了计算结果43。然后人为的把这个结果赋予Future.get,这样我们就可以和真正的由ExecutorService返回的Future一样用同样的方式读取结果。所以说unit纯粹是一个改变格式的升格函数,没有任何其它作用。

2、async[A](a: => A): Par[A]:这个async函数把表达式a提交到主线程之外的另一个线程。新的线程由ExecutorService提供,我们无须理会,这样可以实现线程管理和并行运算组件库的松散耦合。由于async的传人函数是延后计算类型,所以我们可以把表达式a提交给另一个线程去运算。

那么我们用例子来示范一下:

 1   val es = Executors.newCachedThreadPool()  //线程由jvm提供,我们无须理会
 2                                                   //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool
 3                                                   //| Executor@19dfb72a[Running, pool size = 0, active threads = 0, queued tasks =
 4                                                   //|  0, completed tasks = 0]
 5   val a = unit({println(Thread.currentThread.getName); 42+1})
 6                                                   //> main
 7                                                   //| a  : ch71.Par.Par[Int] = <function1>
 8   val b = async({println(Thread.currentThread.getName); 42+1})
 9                                                   //> main
10                                                   //| b  : ch71.Par.Par[Int] = <function1>
11   run(es)(a).get                                  //> res0: Int = 43
12   run(es)(b).get                                  //> res1: Int = 43
13   es.shutdown()

看到问题了吗?用run运算a,b时没有显示println,而这个println在申明val a, val b 时已经执行了。对unit这可以理解:参数是即时计算的,所以println和结果43都在进入函数之前运算了(然后放到Future.get)。但是async的参数不是延迟计算的吗?我们再看清楚:async(a: => A) >>> fork(unit(a)),到fork函数参数unit(a)就立即计算了。所以 fork(pa: => Par[A])才可以保证在提交任务前都不会计算表达式a。我们必须把fork的函数款式改一下:

1 def fork[A](pa: => Par[A]): Par[A] = {
2         es => es.submit(new Callable[A] {
3          def call: A = run(es)(pa).get
4       })
5   }                                               //> fork: [A](pa: ch71.Par.Par[A])ch71.Par.Par[A]

再运行一下例子:

 1  val es = Executors.newCachedThreadPool()  //线程由jvm提供,我们无须理会
 2                                                   //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool
 3                                                   //| Executor@19dfb72a[Running, pool size = 0, active threads = 0, queued tasks =
 4                                                   //|  0, completed tasks = 0]
 5   val a = unit({println(Thread.currentThread.getName); 42+1})
 6                                                   //> main
 7                                                   //| a  : ch71.Par.Par[Int] = <function1>
 8   val b = async({println(Thread.currentThread.getName); 42+1})
 9                                                   //> b  : ch71.Par.Par[Int] = <function1>
10   run(es)(a).get                                  //> res0: Int = 43
11   run(es)(b).get                                  //> pool-1-thread-1
12                                                   //| res1: Int = 43
13   es.shutdown()

看看结果:unit在主线程main运行,而async则在pool-1-thread-1这个非主线程内运行。

实现异步运算才是并行运算的第一步。并行运算顾名思义就是把一个大任务分解成几个较小任务然后同时异步运算后再把结果结合起来。我们用伪代码描述一下并行运算思路:

1  //伪代码
2   val big10sencondJob = ???      //一个10秒运算
3   val small5sJob1 = split big10sencondJob in half  //分解成两个 5秒运算
4   val small5sJob2 = split big10sencondJob in half  //分解成两个 5秒运算
5   val fa = run small5sJob1      //立即返回future 但开始运算 5 秒
6   val fb = run small5sJob2      //立即返回future 但开始运算 5 秒
7   val sum = fa.get + fb.get     //等待5秒后可以得出结果

看来用以上方式是可以得到并行运算的效果(10秒到5秒区别)。但我们采用了串指令(imperative)方式实现。当然我们必须考虑用泛函方式来实现并行运算的启动及结果抽取。

先用泛函方式启动并行运算。如果我们并行启动两个运算:

1  def map2[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C]

map2并行启动pa,pb然后把它们的结果用函数f结合。看起来很美。那么我们先试着把它实现了:

 1  def map2[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C] = {
 2     import TimeUnit.NANOSECONDS
 3       es => new Future[C] {
 4           val fa = run(es)(pa)        //在这里按pa的定义来确定在那个线程运行。如果pa是fork Par则在非主线程中运行
 5           val fb = run(es)(pb)
 6           def get = f(fa.get, fb.get)
 7           def get(timeOut: Long, timeUnit: TimeUnit) = {
 8               val start = System.nanoTime
 9               val a = fa.get
10               val end = System.nanoTime
11               //fa.get用去了一些时间。剩下给fb.get的timeout值要减去
12               val b = fb.get(timeOut - timeUnit.convert((end - start), NANOSECONDS) , timeUnit)
13             f(a,b)
14           }
15           def isDone = fa.isDone && fb.isDone
16           def isCancelled = fa.isCancelled && fb.isCancelled
17           def cancel(evenIsRunning: Boolean) = fa.cancel(evenIsRunning) || fb.cancel(evenIsRunning)
18       }
19   }                                               //> map2: [A, B, C](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])(f: (A, B) => C)ch
20                                                   //| 71.Par.Par[C]

在map2的实现里我们人为地建了个Future[C]。但在建的过程中我们运行了pa,pb的计算。如果我们对pa或pb有运算超时要求的话,就必须计算每次运算所使用的时间。所以Future[C]是符合pa,pb的运算要求的。

我们先试着同时运算41+2,33+4两个计算:

 1 val es = Executors.newCachedThreadPool()  //线程由jvm提供,我们无须理会
 2                                                   //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPoo
 3                                                   //| lExecutor@19dfb72a[Running, pool size = 0, active threads = 0, queued tasks
 4                                                   //|  = 0, completed tasks = 0]
 5   map2(async({println(Thread.currentThread.getName); 41+2}),
 6        async({println(Thread.currentThread.getName); 33+4}))
 7        {(a,b) => {println(Thread.currentThread.getName); a+b}}(es).get
 8                                                   //> pool-1-thread-1
 9                                                   //| pool-1-thread-2
10                                                   //| main
11                                                   //| res0: Int = 80

啊!pa,pb分别在不同的非主线程中运行了。但函数f的运行是在主线程main中运行的。我们试着把这个也放到非主线程中:

1 fork { map2(async({println(Thread.currentThread.getName); 41+2}),
2        async({println(Thread.currentThread.getName); 33+4}))
3        {(a,b) => {println(Thread.currentThread.getName); a+b}}}(es).get
4                                                   //> pool-1-thread-2
5                                                   //| pool-1-thread-3
6                                                   //| pool-1-thread-1
7                                                   //| res0: Int = 80

现在所有的计算都是在不同的非主线程中运算的了,清楚了吧。

两个以上并行运算可以通过map2来实现:

 1   def map3[A,B,C,D](pa: Par[A], pb: Par[B], pc: Par[C])(f: (A,B,C) => D): Par[D] = {
 2       map2(pa,map2(pb,pc){(b,c) => (b,c)}){(a,bc) => {
 3           val (b,c) = bc
 4           f(a,b,c)
 5       }}
 6   }
 7   def map4[A,B,C,D,E](pa: Par[A], pb: Par[B], pc: Par[C], pd: Par[D])(f: (A,B,C,D) => E): Par[E] = { //| 71.Par.Par[C]
 8       map2(pa,map2(pb,map2(pc,pd){(c,d) => (c,d)}){(b,cd) => (b,cd)}){(a,bcd) => {
 9           val (b,(c,d)) = bcd
10           f(a,b,c,d)
11       }}
12   }
13   def map5[A,B,C,D,E,F](pa: Par[A], pb: Par[B], pc: Par[C], pd: Par[D], pe: Par[E])(f: (A,B,C,D,E) => F): Par[F] = { //| 71.Par.Par[C]
14       map2(pa,map2(pb,map2(pc,map2(pd,pe){(d,e) => (d,e)}){(c,de) => (c,de)}){(b,cde) => (b,cde)}){(a,bcde) => {
15           val (b,(c,(d,e))) = bcde
16           f(a,b,c,d,e)
17       }}
18   }

再看个例子:如果一个并行运算的表达式是个List[Int],即 Par[List[Int]]。 如何对内部的List[Int]进行排序?

 1 //我们可以run pa, get list 后进行排序,然后再封装进Future[List[Int]]
 2   def sortPar(pa: Par[List[Int]]): Par[List[Int]] = {
 3     es => {
 4           val l = run(es)(pa).get
 5           new Future[List[Int]] {
 6               def get = l.sorted
 7               def isDone = true
 8               def isCancelled = false
 9               def get(t: Long, u: TimeUnit) = get
10               def cancel(e: Boolean) = false
11           }
12       }
13   }
14  //也可以用map2来实现。因为map2可以启动并行运算,也可以对par内元素进行操作。但操作只针对一个par,
15  //我们用unit(())替代第二个par。现在我们可以对一个par的元素进行操作了
16   def sortedPar(pa: Par[List[Int]]): Par[List[Int]] = {
17       map2(pa,unit(())){(a,_) => a.sorted}
18   }
19   //map是对一个par的元素进行变形操作,我们同样可以用map2实现了
20   def map[A,B](pa: Par[A])(f: A => B): Par[B] = {
21       map2(pa,unit(())){(a,_) => f(a) }
22   }
23   //然后用map去对Par[List[Int]]排序
24   def sortParByMap(pa: Par[List[Int]]): Par[List[Int]] = {
25       map(pa){_.sorted}
26   }

看看运行结果: 

1 sortPar(async({println(Thread.currentThread.getName); List(4,1,2,3)}))(es).get
2                                                   //> pool-1-thread-1
3                                                   //| res3: List[Int] = List(1, 2, 3, 4)
4  sortParByMap(async({println(Thread.currentThread.getName); List(4,1,2,3)}))(es).get
5                                                   //> pool-1-thread-1
6                                                   //| res4: List[Int] = List(1, 2, 3, 4)

实际上map2做了两件事:启动了两个并行运算、对运算结果进行了处理。这样说map2是可以被分解成更基本的组件函数:

 1 //启动两项并行运算
 2   def product[A,B](pa: Par[A], pb: Par[B]): Par[(A,B)] = {
 3       es => unit((run(es)(pa).get, run(es)(pb).get))(es)
 4   }                                               //> product: [A, B](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])ch71.Par.Par[(A, B
 5                                                   //| )]
 6   //处理运算结果
 7   def map[A,B](pa: Par[A])(f: A => B): Par[B] = {
 8       es => unit(f(run(es)(pa).get))(es)
 9   }                                               //> map: [A, B](pa: ch71.Par.Par[A])(f: A => B)ch71.Par.Par[B]
10   //再组合map2
11   def map2_pm[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C] = {
12       map(product(pa, pb)){a => f(a._1, a._2)}
13   }                                               //> map2_pm: [A, B, C](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])(f: (A, B) => C
14                                                   //| )ch71.Par.Par[C]

我们还可以把函数A => B转换成A => Par[B],意思是把对A的运算变成并行运算Par[B]:

1   def asyncF[A,B](f: A => B): A => Par[B] = a => async(f(a))
2                                                   //> asyncF: [A, B](f: A => B)A => ch71.Par.Par[B]

用asyncF应该可以把对一个List的处理函数变成并行运算:

1 def parMap[A,B](as: List[A])(f: A => B): Par[List[B]]

用 map(as){asyncF(f)}可以得到List[Par[B]]。再想办法List[Par[B]] >>> Par[List[B]],这不就是我们经常遇到的那个sequence函数的类型款式吗。那我们就先实现了par的sequence函数吧:

 1  //用递归法实现
 2   def sequence_r[A](lp: List[Par[A]]): Par[List[A]] = {
 3       lp match {
 4           case Nil => unit(List())
 5           case h::t => map2(h,fork(sequence_r(t))){_ :: _}
 6       }
 7   }                                               //> sequence_r: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]
 8   //用foldLeft
 9   def sequenceByFoldLeft[A](lp: List[Par[A]]): Par[List[A]] = {
10       lp.foldLeft(unit[List[A]](Nil)){(t,h) => map2(h,t){_ :: _}}
11   }                                               //> sequenceByFoldLeft: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]
12   //用foldRight
13   def sequenceByFoldRight[A](lp: List[Par[A]]): Par[List[A]] = {
14       lp.foldRight(unit[List[A]](Nil)){(h,t) => map2(h,t){_ :: _}}
15   }                                               //> sequenceByFoldRight: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]
16   //用IndexedSeq切成两半来实现
17   def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = {
18     if (as.isEmpty) unit(Vector())
19     else if (as.length == 1) map(as.head){a => Vector(a)}
20     else {
21         val (l,r) = as.splitAt(as.length / 2)
22         map2(sequenceBalanced(l),sequenceBalanced(r)){_ ++ _}
23     }  
24   }                                               //> sequenceBalanced: [A](as: IndexedSeq[ch71.Par.Par[A]])ch71.Par.Par[IndexedS
25   def sequence[A](lp: List[Par[A]]): Par[List[A]] = { //| eq[A]]
26     map(sequenceBalanced(lp.toIndexedSeq)){_.toList}
27   }

有了sequence就可以从List[Par[A]]到Par[List[A]],实现parMap应该没问题了:

 1  def parMap[A,B](as: List[A])(f: A => B): Par[List[B]] = fork {
 2       val lps = as.map{asyncF(f)}
 3       sequence(lps)
 4   }                                               //> parMap: [A, B](as: List[A])(f: A => B)ch71.Par.Par[List[B]]
 5  fork(parMap(List(1,2,3,4,5)){ _ + 10 })(es).get  //> pool-1-thread-1
 6                                                   //| pool-1-thread-2
 7                                                   //| pool-1-thread-3
 8                                                   //| pool-1-thread-4
 9                                                   //| pool-1-thread-5
10                                                   //| pool-1-thread-6
11                                                   //| pool-1-thread-8
12                                                   //| pool-1-thread-7
13                                                   //| pool-1-thread-9
14                                                   //| pool-1-thread-10
15                                                   //| pool-1-thread-14
16                                                   //| pool-1-thread-12
17                                                   //| pool-1-thread-15
18                                                   //| pool-1-thread-11
19                                                   //| pool-1-thread-13
20                                                   //| res3: List[Int] = List(11, 12, 13, 14, 15)

现在我们的并行计算组件库已经能够提供一些基本的并行运算功能了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android机动车

RxJava从入门到不离不弃(一)——基本概念和使用

RxJava的编程思想已经在Android开发者中变得越来越流行。有个不好的点就是上手不太容易,尤其是大部分人之前都是使用命令式编程语言。

692
来自专栏Android开发与分享

【Android】RxJava的使用(一)基本用法

3297
来自专栏BaronTalk

RxJava系列二(基本概念及使用介绍)

前言 上一篇的示例代码中大家一定发现了Observable这个类。从纯Java的观点看,Observable类源自于经典的观察者模式。RxJava的异步实现正是...

34110
来自专栏Scott_Mr 个人专栏

RxSwift 系列(二) -- Subject

3195
来自专栏一直在跳坑然后爬坑

RxJava2操作符之“Buffer”

这里贴一下观察者,为了能更清晰的看到发射出来的内容,我们将每一个item都打印出来

511
来自专栏积累沉淀

Java设计模式(十五)----观察者模式

观察者模式 一、定义 二、结构 具体案例 推模型和拉模型 三、Java提供的对观察者模式的支持 Observer接口 Observ...

2068
来自专栏DOTNET

.Net多线程编程—Parallel LINQ、线程池

Parallel LINQ 1 System.Linq.ParallelEnumerable 重要方法概览: 1)public static ParallelQ...

2787
来自专栏QQ空间开发团队的专栏

RxJava && Agera 从源码简要分析基本调用流程(1)

相信很多做Android或是Java研发的同学对RxJava应该都早有耳闻了,尤其是在Android开发的圈子里,RxJava渐渐开始广为流行。同样有很多同学已...

8.5K1
来自专栏分布式系统进阶

Kafka中的时间轮Kafka源码分析-汇总

将TimerTask对象绑定到 TimerTaskEntry上 如果这个TimerTask对象之前已经绑定到了一个 TimerTaskEntry上, 先调用t...

561
来自专栏余林丰

利用Java提供的Observer接口和Observable类实现观察者模式

对于观察者模式,其实Java已经为我们提供了已有的接口和类。对于订阅者(Subscribe,观察者)Java为我们提供了一个接口,JDK源码如下: 1 pack...

1828

扫码关注云+社区