前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model

Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model

作者头像
用户1150956
发布2018-01-05 10:48:32
5410
发布2018-01-05 10:48:32
举报

   长久以来,函数式编程模式都被认为是一种学术研究用或教学实验用的编程模式。直到近几年由于大数据和多核CPU的兴起造成了函数式编程模式在一些实际大型应用中的出现,这才逐渐改变了人们对函数式编程无用论的观点。通过一段时间对函数式编程方法的学习,我们了解到Free Monad的算式/算法关注分离(separation of concern)可以是一种很实用的函数式编程模式。用Free Monad编写的程序容易理解并具备良好的可维护性。scalaz-stream的流程控制和多线程运算模式可以实现程序的安全并行运算。把Free Monad和scalaz-stream有机结合起来可以形成一种新的编程模式来支持函数式多线程编程来编制具备安全性、易扩展、易维护的并行运算程序。我们先从一个简单的Free Monad程序开始:

代码语言:javascript
复制
 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 import scala.language.higherKinds
 6 import scala.language.implicitConversions
 7 object freeStream {
 8 //1. 定义语句
 9  object DSLs {
10    sealed trait Interact[A]
11    case class Ask(q: String) extends Interact[String]
12    case class Tell(m: String) extends Interact[Unit]
13 //2. Free升格
14    implicit def interactToFree[A](ia: Interact[A]) = Free.liftF(ia)
15  }
16  //3. 程序逻辑/算式
17  object PRGs {
18  import DSLs._
19    val prgGetName: Free[Interact,Unit] = for {
20      first <- Ask("What's your first name?")
21      last <- Ask("What's your last name?")
22      _ <- Tell(s"Hello $first $last")
23    } yield ()
24  }
25  //4. 实现方式/算式
26  object IMPs {
27  import DSLs._
28    object InteractConsole extends (Interact ~> Id) {
29       def apply[A](ia: Interact[A]): Id[A] = ia match {
30         case Ask(q) => {println(q); Console.readLine}
31         case Tell(m) => println(m)
32       }
33    }
34  }

在这个程序里我们按照一个固定的框架步骤来实现“定义语句”、“升格Free”、“功能描述”及“实现方式”。这里特别需要注意的是所谓的算式/算法关注分离,即“功能描述”和“实现方式”是互不关联的。这样我们可以提供不同版本的实现方式来进行测试、环境转换等工作。Free Monad的具体运算方式如下:

代码语言:javascript
复制
1 //5. 运算/Run
2 import DSLs._,PRGs._,IMPs._
3 prgGetName.foldMapRec(InteractConsole)

运算结果返回A:对于prgGetName来说就是Unit。不过如果直接运行foldMapRec有可能会产生副作用(siede effect)。这样不符合纯代码要求,无法实现这个程序与其它程序的函数组合。我们需要把这段可能产生副作用的代码放到Task里:

代码语言:javascript
复制
1 val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
2 //> taskGetName  : scalaz.concurrent.Task[scalaz.Scalaz.Id[Unit]] = scalaz.concurrent.Task@282ba1e

这样我们就获得了一个异线程的延迟运算。我们可以放心地用这个taskGetName进行函数组合。把这个Free Monad程序转换成scalaz-stream的Process也很容易:

代码语言:javascript
复制
1 val prcGetName = Process.eval(taskGetName)  //> prcGetName  : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.Scalaz.Id[Unit]] = Await(scalaz.concurrent.Task@282ba1e,<function1,<function1>)

我们用Process.eval直接把它转换成Process[Task,Unit]类型。下面我们用scalaz-stream的运算方式来运算这个Free Monad程序:

代码语言:javascript
复制
1 object FreeInteract extends App {
2   import DSLs._,PRGs._,IMPs._
3   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
4   val prcGetName = Process.eval(taskGetName)  
5   prcGetName.run.run    
6 }

运算结果如下:

代码语言:javascript
复制
1 What's your first name?
2 tiger
3 What's your last name?
4 chan
5 Hello, tiger chan!

虽然这个例子看起来很简单,但其中代表的意义却不小:我们潜移默化地实现了函数式多线程编程了。

如果我们需要Free Monad程序返回运算结果的话就调整一下功能描述(算式):

代码语言:javascript
复制
1   val prgGetUserID = for {
2     uid <- ask("Enter User ID:")
3   } yield uid

再运算一下:

代码语言:javascript
复制
 1 object FreeInteract extends App {
 2   import DSLs._,PRGs._,IMPs._
 3   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
 4   val prcGetName = Process.eval(taskGetName)  
 5   //prcGetName.run.run    
 6   Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)}).runLog.run.map(println)
 7 ...
 8 Enter User ID:
 9 tiger123
10 tiger123

用纯代码方式echo输入:

代码语言:javascript
复制
1   pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run
2 ...
3 Enter User ID:
4 user234
5 user234

也可以把结果发送到一个Sink来显示:

代码语言:javascript
复制
val outSink: Sink[Task,String] = Process.constant{x =>Task.delay{prgEchoInput(x).foldMapRec(InteractConsole)}}
(pUserID to outSink).run.run
...
Enter User ID:
jonathon
jonathon

我们试着再加一个Free程序功能:验证用户编号

代码语言:javascript
复制
 1   sealed trait Login[A]
 2   case class CheckID(id: String) extends Login[Boolean]
 3 ...
 4   def prgCheckID(id: String) = for {
 5     b <- Free.liftF(CheckID(id))
 6   } yield b
 7 ...
 8   object UserLogin extends (Login ~> Id) {
 9     def apply[A](la: Login[A]): Id[A] = la match {
10       case CheckID(id) => if (id === "tiger123") true else false
11     }
12   }

stream流程是:先读取用户编号然后验证,跟着在Sink输出结果:

代码语言:javascript
复制
 1  def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString)
 2   val chCheckID = channel.lift(fCheckID)
 3   ((pUserID through chCheckID) to outSink).run.run
 4 ...
 5 
 6 Enter User ID:
 7 tiger123
 8 true
 9 ...
10 Enter User ID:
11 johnny234
12 false

不错!Free Monad和scalar-stream可以很好的集成在一起。 我把这节讨论的示范源代码提供给大家:

代码语言:javascript
复制
 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 object DSLs {
 6   sealed trait Interact[A]
 7   case class Ask(q: String) extends Interact[String]
 8   case class Tell(m: String) extends Interact[Unit]
 9   object Interact {
10     def ask(q: String): Free[Interact, String] = Free.liftF(Ask(q))
11     def tell(m: String): Free[Interact, Unit] = Free.liftF(Tell(m))
12   }
13   sealed trait Login[A]
14   case class CheckID(id: String) extends Login[Boolean]
15 }
16 object PRGs {
17   import DSLs._
18   import Interact._
19   
20   val prgGetName = for {
21     first <- ask("What's your first name?")
22     last <- ask("What's your last name?")
23     _ <- tell(s"Hello, $first $last!")
24   } yield()
25   
26   val prgGetUserID = for {
27     uid <- ask("Enter User ID:")
28   } yield uid
29   
30   def prgEchoInput(m: String) = tell(m)
31 
32   def prgCheckID(id: String) = for {
33     b <- Free.liftF(CheckID(id))
34   } yield b
35 
36 }
37 object IMPs {
38   import DSLs._
39   object InteractConsole extends (Interact ~> Id) {
40     def apply[A](ia: Interact[A]): Id[A] = ia match {
41       case Ask(q) => { println(q); readLine }
42       case Tell(m) => println(m)
43     }
44   }
45   object UserLogin extends (Login ~> Id) {
46     def apply[A](la: Login[A]): Id[A] = la match {
47       case CheckID(id) => if (id === "tiger123") true else false
48     }
49   }
50 }
51 
52 
53 object FreeInteract extends App {
54   import DSLs._,PRGs._,IMPs._
55   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
56   val prcGetName = Process.eval(taskGetName)  
57   //prcGetName.run.run    
58   val pUserID= Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)})
59   //pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run
60   val outSink: Sink[Task,String] = Process.constant { x => Task.delay {prgEchoInput(x).foldMapRec(InteractConsole) } }
61   //(pUserID to outSink).run.run
62   def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString)
63   val chCheckID = channel.lift(fCheckID)
64   ((pUserID through chCheckID) to outSink).run.run
65   
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-08-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档