Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model

   长久以来,函数式编程模式都被认为是一种学术研究用或教学实验用的编程模式。直到近几年由于大数据和多核CPU的兴起造成了函数式编程模式在一些实际大型应用中的出现,这才逐渐改变了人们对函数式编程无用论的观点。通过一段时间对函数式编程方法的学习,我们了解到Free Monad的算式/算法关注分离(separation of concern)可以是一种很实用的函数式编程模式。用Free Monad编写的程序容易理解并具备良好的可维护性。scalaz-stream的流程控制和多线程运算模式可以实现程序的安全并行运算。把Free Monad和scalaz-stream有机结合起来可以形成一种新的编程模式来支持函数式多线程编程来编制具备安全性、易扩展、易维护的并行运算程序。我们先从一个简单的Free Monad程序开始:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 import scala.language.higherKinds
 6 import scala.language.implicitConversions
 7 object freeStream {
 8 //1. 定义语句
 9  object DSLs {
10    sealed trait Interact[A]
11    case class Ask(q: String) extends Interact[String]
12    case class Tell(m: String) extends Interact[Unit]
13 //2. Free升格
14    implicit def interactToFree[A](ia: Interact[A]) = Free.liftF(ia)
15  }
16  //3. 程序逻辑/算式
17  object PRGs {
18  import DSLs._
19    val prgGetName: Free[Interact,Unit] = for {
20      first <- Ask("What's your first name?")
21      last <- Ask("What's your last name?")
22      _ <- Tell(s"Hello $first $last")
23    } yield ()
24  }
25  //4. 实现方式/算式
26  object IMPs {
27  import DSLs._
28    object InteractConsole extends (Interact ~> Id) {
29       def apply[A](ia: Interact[A]): Id[A] = ia match {
30         case Ask(q) => {println(q); Console.readLine}
31         case Tell(m) => println(m)
32       }
33    }
34  }

在这个程序里我们按照一个固定的框架步骤来实现“定义语句”、“升格Free”、“功能描述”及“实现方式”。这里特别需要注意的是所谓的算式/算法关注分离,即“功能描述”和“实现方式”是互不关联的。这样我们可以提供不同版本的实现方式来进行测试、环境转换等工作。Free Monad的具体运算方式如下:

1 //5. 运算/Run
2 import DSLs._,PRGs._,IMPs._
3 prgGetName.foldMapRec(InteractConsole)

运算结果返回A:对于prgGetName来说就是Unit。不过如果直接运行foldMapRec有可能会产生副作用(siede effect)。这样不符合纯代码要求,无法实现这个程序与其它程序的函数组合。我们需要把这段可能产生副作用的代码放到Task里:

1 val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
2 //> taskGetName  : scalaz.concurrent.Task[scalaz.Scalaz.Id[Unit]] = scalaz.concurrent.Task@282ba1e

这样我们就获得了一个异线程的延迟运算。我们可以放心地用这个taskGetName进行函数组合。把这个Free Monad程序转换成scalaz-stream的Process也很容易:

1 val prcGetName = Process.eval(taskGetName)  //> prcGetName  : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.Scalaz.Id[Unit]] = Await(scalaz.concurrent.Task@282ba1e,<function1,<function1>)

我们用Process.eval直接把它转换成Process[Task,Unit]类型。下面我们用scalaz-stream的运算方式来运算这个Free Monad程序:

1 object FreeInteract extends App {
2   import DSLs._,PRGs._,IMPs._
3   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
4   val prcGetName = Process.eval(taskGetName)  
5   prcGetName.run.run    
6 }

运算结果如下:

1 What's your first name?
2 tiger
3 What's your last name?
4 chan
5 Hello, tiger chan!

虽然这个例子看起来很简单,但其中代表的意义却不小:我们潜移默化地实现了函数式多线程编程了。

如果我们需要Free Monad程序返回运算结果的话就调整一下功能描述(算式):

1   val prgGetUserID = for {
2     uid <- ask("Enter User ID:")
3   } yield uid

再运算一下:

 1 object FreeInteract extends App {
 2   import DSLs._,PRGs._,IMPs._
 3   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
 4   val prcGetName = Process.eval(taskGetName)  
 5   //prcGetName.run.run    
 6   Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)}).runLog.run.map(println)
 7 ...
 8 Enter User ID:
 9 tiger123
10 tiger123

用纯代码方式echo输入:

1   pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run
2 ...
3 Enter User ID:
4 user234
5 user234

也可以把结果发送到一个Sink来显示:

val outSink: Sink[Task,String] = Process.constant{x =>Task.delay{prgEchoInput(x).foldMapRec(InteractConsole)}}
(pUserID to outSink).run.run
...
Enter User ID:
jonathon
jonathon

我们试着再加一个Free程序功能:验证用户编号

 1   sealed trait Login[A]
 2   case class CheckID(id: String) extends Login[Boolean]
 3 ...
 4   def prgCheckID(id: String) = for {
 5     b <- Free.liftF(CheckID(id))
 6   } yield b
 7 ...
 8   object UserLogin extends (Login ~> Id) {
 9     def apply[A](la: Login[A]): Id[A] = la match {
10       case CheckID(id) => if (id === "tiger123") true else false
11     }
12   }

stream流程是:先读取用户编号然后验证,跟着在Sink输出结果:

 1  def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString)
 2   val chCheckID = channel.lift(fCheckID)
 3   ((pUserID through chCheckID) to outSink).run.run
 4 ...
 5 
 6 Enter User ID:
 7 tiger123
 8 true
 9 ...
10 Enter User ID:
11 johnny234
12 false

不错!Free Monad和scalar-stream可以很好的集成在一起。 我把这节讨论的示范源代码提供给大家:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 object DSLs {
 6   sealed trait Interact[A]
 7   case class Ask(q: String) extends Interact[String]
 8   case class Tell(m: String) extends Interact[Unit]
 9   object Interact {
10     def ask(q: String): Free[Interact, String] = Free.liftF(Ask(q))
11     def tell(m: String): Free[Interact, Unit] = Free.liftF(Tell(m))
12   }
13   sealed trait Login[A]
14   case class CheckID(id: String) extends Login[Boolean]
15 }
16 object PRGs {
17   import DSLs._
18   import Interact._
19   
20   val prgGetName = for {
21     first <- ask("What's your first name?")
22     last <- ask("What's your last name?")
23     _ <- tell(s"Hello, $first $last!")
24   } yield()
25   
26   val prgGetUserID = for {
27     uid <- ask("Enter User ID:")
28   } yield uid
29   
30   def prgEchoInput(m: String) = tell(m)
31 
32   def prgCheckID(id: String) = for {
33     b <- Free.liftF(CheckID(id))
34   } yield b
35 
36 }
37 object IMPs {
38   import DSLs._
39   object InteractConsole extends (Interact ~> Id) {
40     def apply[A](ia: Interact[A]): Id[A] = ia match {
41       case Ask(q) => { println(q); readLine }
42       case Tell(m) => println(m)
43     }
44   }
45   object UserLogin extends (Login ~> Id) {
46     def apply[A](la: Login[A]): Id[A] = la match {
47       case CheckID(id) => if (id === "tiger123") true else false
48     }
49   }
50 }
51 
52 
53 object FreeInteract extends App {
54   import DSLs._,PRGs._,IMPs._
55   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
56   val prcGetName = Process.eval(taskGetName)  
57   //prcGetName.run.run    
58   val pUserID= Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)})
59   //pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run
60   val outSink: Sink[Task,String] = Process.constant { x => Task.delay {prgEchoInput(x).foldMapRec(InteractConsole) } }
61   //(pUserID to outSink).run.run
62   def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString)
63   val chCheckID = channel.lift(fCheckID)
64   ((pUserID through chCheckID) to outSink).run.run
65   

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏浪淘沙

Scala学习笔记

大数据框架(处理海量数据/处理实时流式数据) 一:以hadoop2.X为体系的海量数据处理框架         离线数据分析,往往分析的是N+1的数据  ...

1393
来自专栏冰霜之地

如何设计并实现一个线程安全的 Map ?(下篇)

在上篇中,我们已经讨论过如何去实现一个 Map 了,并且也讨论了诸多优化点。在下篇中,我们将继续讨论如何实现一个线程安全的 Map。说到线程安全,需要从概念开始...

1186
来自专栏Spark学习技巧

深入了解Redis内存模型

1765
来自专栏noteless

java集合框架容器 java框架层级 继承图结构 集合框架的抽象类 集合框架主要实现类

java集合框架  框架设计理念  容器 继承层级结构 继承图 集合框架中的抽象类  主要的实现类 实现类特性   集合框架分类 集合框架并发包 并发实现类

862
来自专栏木木玲

Netty 源码解析 ——— writeAndFlush流程分析

2414
来自专栏木木玲

设计模式 ——— 模板方法模式

1022
来自专栏java技术学习之道

JVM初探 -JVM内存模型

1294
来自专栏枕边书

Java高级特性之泛型

864
来自专栏Android机动车

Java 基础(六)——集合源码解析 Queue

Queue继承自 Collection,我们先来看看类结构吧,代码量比较少,我直接贴代码了。

531
来自专栏AI星球

Spark常用的算子以及Scala函数总结

首先,介绍一下scala语言: Scala 是一种把面向对象和函数式编程理念加入到静态类型语言中的混血儿。

742

扫码关注云+社区