Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model

   长久以来,函数式编程模式都被认为是一种学术研究用或教学实验用的编程模式。直到近几年由于大数据和多核CPU的兴起造成了函数式编程模式在一些实际大型应用中的出现,这才逐渐改变了人们对函数式编程无用论的观点。通过一段时间对函数式编程方法的学习,我们了解到Free Monad的算式/算法关注分离(separation of concern)可以是一种很实用的函数式编程模式。用Free Monad编写的程序容易理解并具备良好的可维护性。scalaz-stream的流程控制和多线程运算模式可以实现程序的安全并行运算。把Free Monad和scalaz-stream有机结合起来可以形成一种新的编程模式来支持函数式多线程编程来编制具备安全性、易扩展、易维护的并行运算程序。我们先从一个简单的Free Monad程序开始:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 import scala.language.higherKinds
 6 import scala.language.implicitConversions
 7 object freeStream {
 8 //1. 定义语句
 9  object DSLs {
10    sealed trait Interact[A]
11    case class Ask(q: String) extends Interact[String]
12    case class Tell(m: String) extends Interact[Unit]
13 //2. Free升格
14    implicit def interactToFree[A](ia: Interact[A]) = Free.liftF(ia)
15  }
16  //3. 程序逻辑/算式
17  object PRGs {
18  import DSLs._
19    val prgGetName: Free[Interact,Unit] = for {
20      first <- Ask("What's your first name?")
21      last <- Ask("What's your last name?")
22      _ <- Tell(s"Hello $first $last")
23    } yield ()
24  }
25  //4. 实现方式/算式
26  object IMPs {
27  import DSLs._
28    object InteractConsole extends (Interact ~> Id) {
29       def apply[A](ia: Interact[A]): Id[A] = ia match {
30         case Ask(q) => {println(q); Console.readLine}
31         case Tell(m) => println(m)
32       }
33    }
34  }

在这个程序里我们按照一个固定的框架步骤来实现“定义语句”、“升格Free”、“功能描述”及“实现方式”。这里特别需要注意的是所谓的算式/算法关注分离,即“功能描述”和“实现方式”是互不关联的。这样我们可以提供不同版本的实现方式来进行测试、环境转换等工作。Free Monad的具体运算方式如下:

1 //5. 运算/Run
2 import DSLs._,PRGs._,IMPs._
3 prgGetName.foldMapRec(InteractConsole)

运算结果返回A:对于prgGetName来说就是Unit。不过如果直接运行foldMapRec有可能会产生副作用(siede effect)。这样不符合纯代码要求,无法实现这个程序与其它程序的函数组合。我们需要把这段可能产生副作用的代码放到Task里:

1 val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
2 //> taskGetName  : scalaz.concurrent.Task[scalaz.Scalaz.Id[Unit]] = scalaz.concurrent.Task@282ba1e

这样我们就获得了一个异线程的延迟运算。我们可以放心地用这个taskGetName进行函数组合。把这个Free Monad程序转换成scalaz-stream的Process也很容易:

1 val prcGetName = Process.eval(taskGetName)  //> prcGetName  : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.Scalaz.Id[Unit]] = Await(scalaz.concurrent.Task@282ba1e,<function1,<function1>)

我们用Process.eval直接把它转换成Process[Task,Unit]类型。下面我们用scalaz-stream的运算方式来运算这个Free Monad程序:

1 object FreeInteract extends App {
2   import DSLs._,PRGs._,IMPs._
3   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
4   val prcGetName = Process.eval(taskGetName)  
5   prcGetName.run.run    
6 }

运算结果如下:

1 What's your first name?
2 tiger
3 What's your last name?
4 chan
5 Hello, tiger chan!

虽然这个例子看起来很简单,但其中代表的意义却不小:我们潜移默化地实现了函数式多线程编程了。

如果我们需要Free Monad程序返回运算结果的话就调整一下功能描述(算式):

1   val prgGetUserID = for {
2     uid <- ask("Enter User ID:")
3   } yield uid

再运算一下:

 1 object FreeInteract extends App {
 2   import DSLs._,PRGs._,IMPs._
 3   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
 4   val prcGetName = Process.eval(taskGetName)  
 5   //prcGetName.run.run    
 6   Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)}).runLog.run.map(println)
 7 ...
 8 Enter User ID:
 9 tiger123
10 tiger123

用纯代码方式echo输入:

1   pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run
2 ...
3 Enter User ID:
4 user234
5 user234

也可以把结果发送到一个Sink来显示:

val outSink: Sink[Task,String] = Process.constant{x =>Task.delay{prgEchoInput(x).foldMapRec(InteractConsole)}}
(pUserID to outSink).run.run
...
Enter User ID:
jonathon
jonathon

我们试着再加一个Free程序功能:验证用户编号

 1   sealed trait Login[A]
 2   case class CheckID(id: String) extends Login[Boolean]
 3 ...
 4   def prgCheckID(id: String) = for {
 5     b <- Free.liftF(CheckID(id))
 6   } yield b
 7 ...
 8   object UserLogin extends (Login ~> Id) {
 9     def apply[A](la: Login[A]): Id[A] = la match {
10       case CheckID(id) => if (id === "tiger123") true else false
11     }
12   }

stream流程是:先读取用户编号然后验证,跟着在Sink输出结果:

 1  def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString)
 2   val chCheckID = channel.lift(fCheckID)
 3   ((pUserID through chCheckID) to outSink).run.run
 4 ...
 5 
 6 Enter User ID:
 7 tiger123
 8 true
 9 ...
10 Enter User ID:
11 johnny234
12 false

不错!Free Monad和scalar-stream可以很好的集成在一起。 我把这节讨论的示范源代码提供给大家:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 object DSLs {
 6   sealed trait Interact[A]
 7   case class Ask(q: String) extends Interact[String]
 8   case class Tell(m: String) extends Interact[Unit]
 9   object Interact {
10     def ask(q: String): Free[Interact, String] = Free.liftF(Ask(q))
11     def tell(m: String): Free[Interact, Unit] = Free.liftF(Tell(m))
12   }
13   sealed trait Login[A]
14   case class CheckID(id: String) extends Login[Boolean]
15 }
16 object PRGs {
17   import DSLs._
18   import Interact._
19   
20   val prgGetName = for {
21     first <- ask("What's your first name?")
22     last <- ask("What's your last name?")
23     _ <- tell(s"Hello, $first $last!")
24   } yield()
25   
26   val prgGetUserID = for {
27     uid <- ask("Enter User ID:")
28   } yield uid
29   
30   def prgEchoInput(m: String) = tell(m)
31 
32   def prgCheckID(id: String) = for {
33     b <- Free.liftF(CheckID(id))
34   } yield b
35 
36 }
37 object IMPs {
38   import DSLs._
39   object InteractConsole extends (Interact ~> Id) {
40     def apply[A](ia: Interact[A]): Id[A] = ia match {
41       case Ask(q) => { println(q); readLine }
42       case Tell(m) => println(m)
43     }
44   }
45   object UserLogin extends (Login ~> Id) {
46     def apply[A](la: Login[A]): Id[A] = la match {
47       case CheckID(id) => if (id === "tiger123") true else false
48     }
49   }
50 }
51 
52 
53 object FreeInteract extends App {
54   import DSLs._,PRGs._,IMPs._
55   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
56   val prcGetName = Process.eval(taskGetName)  
57   //prcGetName.run.run    
58   val pUserID= Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)})
59   //pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run
60   val outSink: Sink[Task,String] = Process.constant { x => Task.delay {prgEchoInput(x).foldMapRec(InteractConsole) } }
61   //(pUserID to outSink).run.run
62   def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString)
63   val chCheckID = channel.lift(fCheckID)
64   ((pUserID through chCheckID) to outSink).run.run
65   

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏青枫的专栏

day54_BOS项目_06

第一步:根据提供的 业务受理.pdm 文件生成建表文件 bos_qp.sql 第二步:由于业务受理.pdm 文件中有伪表,所以我们需要修改生成的建表文件,修改如...

602
来自专栏蓝天

Thrift线程和状态机分析

启动Thrift时,可启动两类线程,一是TNonblockingIOThread,另一是Worker:

1091
来自专栏数据之美

浅谈 python multiprocessing(多进程)下如何共享变量

1、问题: 群中有同学贴了如下一段代码,问为何 list 最后打印的是空值? from multiprocessing import Process, Mana...

3295
来自专栏Java帮帮-微信公众号-技术文章全总结

c3p0,DBPC,Druid三大连接池的区别/性能【面试+工作】

492
来自专栏名山丶深处

springboot集成redis(mybatis、分布式session)

3394
来自专栏为数不多的Android技巧

请不要滥用SharedPreference

SharedPreference是Android上一种非常易用的轻量级存储方式,由于其API及其友好,得到了很多很多开发者的青睐。但是,SharedPrefer...

904
来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第三十天 Activiti工作流【悟空教程】

工作流(Workflow),就是“业务过程的部分或整体在计算机应用环境下的自动化”,它主要解决的是“使在多个参与者之间按照某种预定义的规则传递文档、信息或任务的...

1693
来自专栏Kubernetes

Kubelet PLEG源码分析

A: 同其他Manager类似,PLEG在kubelet调用NewMainKubelet进行初始化时创建的。

1338
来自专栏Java架构师历程

使用Spring Boot,JPA,Hibernate和Postgres的多租户应用程序

多租户是一种方法,应用程序实例由不同的客户使用,从而降低软件开发和部署成本,与单一租户解决方案相比,在这种解决方案中,需要触及多个部分以提供新客户端或更新现有租...

3593
来自专栏技术墨客

Vert.x源码-创建集群 原

在当前的最新版本中,Vert.x官方只实现了利用Hazelcast来创建集群。当然,如果可以的话,也可以通过ClusterManager接口实现或引入需要的集群...

1693

扫码关注云+社区