泛函编程(18)-泛函库设计-并行运算组件库

    作为专业的编程人员,我们经常会因为工作需要建立一些工具库。所谓工具库就是针对工作上经常会遇到的一些共性问题预先编制的由一整套函数所组成的函数库。通常这些工具库的功能都是在特别定制的一些数据类型支持下由一系列函数围绕着这些数据类型进行运算而实现的。在泛函编程范畴内也不例外。但在泛函工具库里的函数则更重视函数的组合能力(functional composition);因而泛函的工具库一般称为组件库(combinator library),库内函数则被称之为组件(combinator)。组件库的设计者对函数设计有着共通的最基本目标:通过对组件进行各种函数组合可以实现更大的功能。泛函组件库设计一般针对特别的功能需求或课题:首先尝试用一些数据类型来表述课题需求,然后围绕这些特制的数据类型设计一系列函数针对课题各个最基本需求范畴提供解决方法。我们在这节讨论中从一个并行运算组件库的设计过程来介绍泛函组件库设计模式。

    我们设计这个并行运算组件库的目的:可以把一个普通运算放到另外一个独立的线程(thread)中去运行。这样我们可以同时把多个运算分别放到多个线程中同时运行从而达到并行运算的目的。问题简单明确,但如何对这些在各自独立运行空间的运算进行组合(composition)、变形(transformation)则值得仔细思量。

先从数据类型着手:一个并行运算应该像是一个容器,把一个普通运算封装在里面。我们来随便造个结构出来:Par[A],A是普通运算返回的结果类型。这个Par类型很像我们前面接触的高阶类型,那个承载A类型元素的管子类型。如果这样去想的话,我们可以用前面所有针对高阶类型的函数对管子内的元素A进行操作处理。那么如果一个运算是封装在Par里在另一个线程中运算完成后总是需要一个方法把结果取出来。这样我们可以先得出两个最基本的函数:

1 def unit[A](a: A): Par[A]    //把一个普通运算注入Par。把A升格到一个并行运算
2 def get[A](pa: Par[A]): A    //把并行运行结果抽取出来

下一个问题是运行线程控制:是由程序员来决定一个运算该放到一个新的线程里还是固定每一个运算都用新的独立线程?假设我们选择用由程序员调用一个函数来确定产生新线程。这样有两个优越:1、可以有更灵活的并行运算策略(有些已经确定很快完成的运算可能没有必要用新的线程,独立线程运算可能消耗更多的资源);2、独立线程机制和并行运算是松散耦合的:Par的实现中不需要了解线程管理机制。这个函数的款式如下:

def fork[A](pa: Par[A]): Par[A]  //为pa设定一个新的运行空间。并不改变pa,还是返回Par[A]

那么把一个运算放到一个新的线程里运行可以用这个函数表达:

def async[A](a: => A): Par[A] = fork(unit(a))  //不需要了解任何关于Par的信息。知道fork会为这个运算设定新的运行空间。注意还是返回Par[A]

因为我们追求的是线程机制和并行运算的松散耦合,那么我们就不会在Par里实际进行并行运算的运行,那么Par就只是对一个并行运算的描述。fork的返回还是Par,只是增加了对运算环境的描述,也不会真正运行算法。这样来说Par如果是一个运算描述,那么我们就需要一个真正的运行机制来获取运算结果了:

1 def run[A](pa: Par[A]): A    //由于Par的意义从容器变成运算描述,我们把get重新命名为run

我们就需要在run的函数实现方法里进行线程管理、计算运行等真正Par的运行了。

现在Par的表达形式包括如下:

1 def unit[A](a: A): Par[A]                      //把一个普通运算注入Par。把A升格到一个并行运算描述
2 def fork[A](pa: Par[A]): Par[A]                //为pa设定一个新的运行空间。返回的结果Par必须经run来运行并获取结果
3 def async[A](a: => A): Par[A] = fork(unit(a))  //不需要了解任何关于Par的信息。注意还是返回Par[A]
4 def run[A](pa: Par[A]): A                      //运行pa并抽取运算结果

应该是在v1.6以后吧,java API包含了java.util.concurrent包,其中包括了ExecutorService类提供线程管理方面的支持。ExecutorService和Future类翻译成scala如下: 

class ExecutorService {
  def submit[A](a: Callable[A]): Future[A]
}
trait Future[A] {
  def get: A
  def get(timeout: Long, unit: TimeUnit): A
  def cancel(evenIfRunning: Boolean): Boolean
  def isDone: Boolean
  def isCancelled: Boolean
}

我们不需要进入多线程编程底层细节,用java Concurrent ExecutorService足够了。ExecutorService提供了以Callable形式向系统提交需运算任务方式;系统立即返回Future,我们可以用Future.get以锁定线程方式读取运算。由于运算结果读取是以锁定线程(blocking)形式进行的,那么使用get的时间节点就很重要了:如果提交一个运算后下一步直接get就会立即锁定线程直至运算完成,那我们就无法得到任何并行运算效果了。Future还提供了运行状态和中断运行等功能为编程人员提供更强大灵活的运算控制。为了获取更灵活的控制,Par的返回值应该从直接锁定线程读取A改成不会产生锁定线程效果的Future:

1 type Par[A] = ExecutorService => Future[A]
2 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)

现在Par的含义又从一个数据类型变成了一个函数描述了:传入一个ExecutorService,返回Future。我们可以用run来运行这个函数,系统会立即返回Future,无需任何等待。

下面让我们把这些最基本的函数都实现了:

 1 object par {
 2 import java.util.concurrent._
 3 
 4 type Par[A] = ExecutorService => Future[A]
 5 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)
 6                                                   //> run: [A](es: java.util.concurrent.ExecutorService)(pa: ch7.par.Par[A])java.u
 7                                                   //| til.concurrent.Future[A]
 8 
 9 def unit[A](a: A): Par[A] = es => {
10     new Future[A] {
11         def get: A = a
12         def isDone = true
13         def isCancelled = false
14         def get(timeOut: Long, timeUnit: TimeUnit): A = get
15         def cancel(evenIfRunning: Boolean): Boolean = false
16     }
17 }                                                 //> unit: [A](a: A)ch7.par.Par[A]
18 def fork[A](pa: Par[A]): Par[A] = es => {
19     es.submit[A](new Callable[A] {
20       def call: A = run(es)(pa).get
21     })
22 }                                                 //> fork: [A](pa: ch7.par.Par[A])ch7.par.Par[A]
23 def async[A](a: => A): Par[A] = fork(unit(a))     //> async: [A](a: => A)ch7.par.Par[A]
24 
25 val a = unit(4+7)                                 //> a  : ch7.par.Par[Int] = <function1>
26 val b = async(2+1)                                //> b  : ch7.par.Par[Int] = <function1>
27 val es = Executors.newCachedThreadPool()          //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool
28                                                   //| Executor@71be98f5[Running, pool size = 0, active threads = 0, queued tasks =
29                                                   //|  0, completed tasks = 0]
30 run(es)(b).get                                    //> res0: Int = 3
31 run(es)(a).get                                    //> res1: Int = 11
32 es.shutdown()
33 
34 }

从应用例子里我们可以了解线程的管理是由现有的java工具提供的(Executors.newCachedThreadPool),我们无须了解线程管理细节。我们同时确定了线程的管理机制与我们设计的并行运算Par是松散耦合的。

注意:unit并没有使用ExecutorService es, 而是直接返回一个注明完成运算(isDone=true)的Future,这个Future的get就是unit的传入参数a。如果我们再用这个Future的get来得取表达式的运算结果的话,这个运算是在当前主线程中运行的。async通过fork选择新的线程;并向新的运行环境提交了运算任务。我们来分析一下运算流程:

1、val a = unit(4+7),unit构建了一个完成的 new Future; isDone=true,设置了 Future.get = 4 + 7,run(es)(a)在主线程中对表达式 4+7 进行了运算并得取结果 11。

2、val b = async(2+1) >>> fork(unit(2+1)), run(es)(b) >>> submit(new Callable), 注意 def call = run(es)(b).get : 这里提交的运算run(es)(b).get实际上又提交了一次运算并直接锁定线程(blocking)等待读取运算结果。第一次提交Callable又需要锁定线程等待提交运算完成计算。如果线程池只能提供一个线程的话,第一次提交了Callable会占用这个唯一的线程并等待第二次提交运算得出的结果,由于没有线程可以提供给二次提交运算,这个运算永远无法得到结果,那么run(es)(b).get就会产生死锁了(dead lock)。

    我们在这节介绍了一个简单的泛函并行组件库设计,可以把一个运算放到主线程之外的另一个新的线程中计算。但是抽取运算结果却还是会锁定线程(blocking)。我们下一节将会讨论如何通过一些算法函数来实现并行运算。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏喔家ArchiSelf

从构造函数看线程安全

线程是编程中常用而且强大的手段,在使用过程中,我们经常面对的就是线程安全问题了。对于Java中常见的数据结构而言,一般的,ArrayList是非线程安全的,Ve...

832
来自专栏编程

哪些Python操作是原子性的?

与同事的一次对话使我意识到一个事实,那就是Python中相当大一部分操作都是原子的,即使像字典和类成员赋值这样的操作也是原子的。 为了完成像哈希表插入这样的操作...

2996
来自专栏云霄雨霁

死锁、饥饿和活锁

1554
来自专栏Android知识点总结

O1-开源框架使用之EventBus

说明使用POSTING,发布与订阅在同一个线程,也就是子线程,更新UI会崩 说明使用MAIN,不管发布者在哪,订阅者都在main线程,可更新UI,但不能耗时操...

682
来自专栏数据结构笔记

实战:爬取简书之多线程爬取(一)

在上上篇我们编写了一个简单的程序框架来爬取简书的文章信息,10分钟左右爬取了 1万 5千条数据。

983
来自专栏Java编程技术

JDK8中新增原子性操作类LongAdder

LongAdder类似于AtomicLong是原子性递增或者递减类,AtomicLong已经通过CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说性能...

581
来自专栏阿杜的世界

《七周七并发模型》阅读笔记(一)一、线程与锁——第一天二、线程与锁——第二天三、线程与锁——第三天

线程与锁模型其实是对底层硬件运行过程的形式化,这种形式化既是该模型最大的优点,也是它最大的缺点。我们借助Java语言来学习线程与锁模型,不过内容也适用于其他语言...

802
来自专栏我叫刘半仙

【JDK并发包基础】并发容器详解

      Java.util.concurrent 包是专为 Java并发编程而设计的包,它下有很多编写好的工具,使用这些更高等的同步工具来编写代码,让我们的...

3478
来自专栏fixzd

java多线程系列:Semaphore和Exchanger

Semaphore思想在分布式中也有应用,分布式限流就是典型的案例。现在举个小例子来使用Semaphore

782
来自专栏微信公众号:Java团长

从并发编程到分布式系统——如何处理海量数据(上)

在这里想写写自己在学习并发处理的学习思路,也会聊聊自己遇到的那些坑,以此为记,希望鞭策自己不断学习、永不放弃!

511

扫码关注云+社区