Scalaz(53)- scalaz-stream: 程序运算器-application scenario

    从上面多篇的讨论中我们了解到scalaz-stream代表一串连续无穷的数据或者程序。对这个数据流的处理过程就是一个状态机器(state machine)的状态转变过程。这种模式与我们通常遇到的程序流程很相似:通过程序状态的变化来推进程序进展。传统OOP式编程可能是通过一些全局变量来记录当前程序状态,而FP则是通过函数组合来实现状态转变的。这个FP模式讲起来有些模糊和抽象,但实际上通过我们前面长时间对FP编程的学习了解到FP编程讲究避免使用任何局部中间变量,更不用说全局变量了。FP程序的数据A是包嵌在算法F[A]内的。FP编程模式提供了一整套全新的数据更新方法来实现对F[A]中数据A的操作。对许多编程人员来讲,FP的这种编程方式会显得很别扭、不容易掌握。如果我们仔细观察分析,会发觉scalaz-stream就是一种很好的FP编程工具:它的数据也是不可变的(immutable),并且是包嵌在高阶类型结构里的,是通过Process状态转变来标示数据处理过程进展的。scalaz-stream的数据处理是有序流程,这样可以使我们更容易分析理解程序的运算过程,它的三个大环节包括:数据源(source),数据传换(transducer)及数据终点(Sink/Channel)可以很形象地描绘一个程序运算的全过程。scalaz-stream在运算过程中的并行运算方式(parallel computaion)、安全资源使用(resource safety)和异常处理能力(exception handling)是实现泛函多线程编程最好的支持。我们先来看看scalaz-stream里的一个典型函数:

/**
   * Await the given `F` request and use its result.
   * If you need to specify fallback, use `awaitOr`
   */
  def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O] =
    awaitOr(req)(Halt.apply)(rcv)
/**
   * Await a request, and if it fails, use `fb` to determine the next state.
   * Otherwise, use `rcv` to determine the next state.
   */
  def awaitOr[F[_], A, O](req: F[A])(fb: EarlyCause => Process[F, O])(rcv: A => Process[F, O]): Process[F, O] =
    Await(req,(r: EarlyCause \/ A) => Trampoline.delay(Try(r.fold(fb,rcv))))

这个await函数可以说是一个代表完整程序流程的典范。注意,awaitOr里的Await是个数据结构。这样我们在递归运算await时可以避免StackOverflowError的发生。req: F[A]代表与外界交互的一个运算,如从外部获取输入、函数rcv对这个req产生的运算结果进行处理并设定程序新的状态。

1 import scalaz.stream._
2 import scalaz.concurrent._
3 object streamApps {
4 import Process._
5   def getInput: Task[Int] = Task.delay { 3 }      //> getInput: => scalaz.concurrent.Task[Int]
6   val prg = await(getInput)(i => emit(i * 3))     //> prg  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@4973813a,<function1>,<function1>)
7   prg.runLog.run                                  //> res0: Vector[Int] = Vector(9)
8 }

这是一个一步计算程序。我们可以再加一步:

1  val add10 = await1[Int].flatMap{i => emit(i + 10)}
2                                                   //> add10  : scalaz.stream.Process[[x]scalaz.stream.Process.Env[Int,Any]#Is[x],Int] = Await(Left,<function1>,<function1>)
3   val prg1 = await(getInput)(i => emit(i * 3) |> add10)
4                                                   //> prg1  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@6737fd8f,<function1>,<function1>)
5   prg1.runLog.run                                 //> res0: Vector[Int] = Vector(19)

add10是新增的一个运算步骤,是个transducer所以调用了Process1的函数await1,并用pipe(|>)来连接。实际上我们可以用组合方式(compose)把add10和prg组合起来:

1 val prg3 = prg |> add10                         //> prg3  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End) ,Vector(<function1>))
2   prg3.runLog.run                               //> res1: Vector[Int] = Vector(19)

我们同样可以增加一步输出运算:

1  val outResult: Sink[Task,Int] = sink.lift { i => Task.delay{println(s"the result is: $i")}}
2                                                   //> outResult  : scalaz.stream.Sink[scalaz.concurrent.Task,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))
3   val prg4 = prg1 to outResult                    //> prg4  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],Unit] = Append(Halt(End),Vector(<function1>, <function1>))
4   prg4.run.run                                    //> the result is: 19

scalaz-stream的输出类型是Sink,我们用to来连接。那么如果需要不断重复运算呢:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 import Process._
 6 object streamAppsDemo extends App {
 7   def putLine(line: String) = Task.delay { println(line) }
 8   def getLine = Task.delay { Console.readLine }
 9   val readL = putLine("Enter:>").flatMap {_ => getLine}
10   val readLines =  repeatEval(readL)
11   val echoLine = readLines.flatMap {line => eval(putLine(line))}  
12   echoLine.run.run
13 }

这是一个无穷运算程序:不停地把键盘输入回响到显示器上。下面是一些测试结果:

1 Enter:>
2 hello world!
3 hello world!
4 Enter:>
5 how are you?
6 how are you?
7 Enter:>

当然,我们也可以把上面的程序表达的更形象些:

1   val outLine: Sink[Task,String] = constant(putLine _).toSource
2   val echoInput: Process[Task,Unit] = readLines to outLine
3   //echoLine.run.run
4   echoInput.run.run 

用to Sink来表述可能更形象。这个程序没有任何控制:甚至无法有意识地退出。我们试着加一些控制机制:

 1   def lines: Process[Task,String] = {
 2     def go(line: String): Process[Task,String] = 
 3         line.toUpperCase match {
 4           case "QUIT" => halt
 5           case _ => emit(line) ++ await(readL)(go)
 6         } 
 7     await(readL)(go)
 8   }
 9   
10   val prg = lines to outLine
11   prg.run.run 

在rcv函数里检查输入是否quit,如果是就halt,否则重复运算await。现在可以控制终止程序了。

下面再示范一下异常处理机制:看看能不能有效的捕捉到运行时的错误:

1   def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat
2   val prg = (lines |> mul(5)) to outLine  
3   prg.run.run 

加了个transducer mul(5),如果输入是可转变为数字类型的就乘5否者会异常退出。下面是一些测试场景:

 1 Enter:>
 2 5
 3 25.0
 4 Enter:>
 5 6
 6 30.0
 7 Enter:>
 8 six
 9 Exception in thread "main" java.lang.NumberFormatException: For input string: "six"
10     at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)

我们可以用onFailure来捕捉任何错误:

1   def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat
2 //val prg = (lines |> mul(5)) to outLine  
3   val prg = (lines |> mul(5)).onFailure { e => emit("invalid input!!!") } to outLine
4   prg.run.run 

现在运算结果变成了下面这样:

1 Enter:>
2 5
3 25.0
4 Enter:>
5 6
6 30.0
7 Enter:>
8 six
9 invalid input!!!

 证明我们捕捉并处理了错误。一个完整安全的程序还必须具备自动事后清理的功能。这项可以通过onComplete来实现:

1   def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat
2 //val prg = (lines |> mul(5)) to outLine  
3   val prg = (lines |> mul(5)).onFailure { e => emit("invalid input!!!") }
4   val prg1 = prg.onComplete{ Process.eval(Task.delay {println("end of program"); ""}) } to outLine
5   prg1.run.run 

测试结果如下:

 1 Enter:>
 2 5
 3 25.0
 4 Enter:>
 5 6
 6 30.0
 7 Enter:>
 8 six
 9 invalid input!!!
10 end of program

再有一个值得探讨的就是这些程序的组合集成。scalaz-stream就是存粹的泛函类型,那么基于scalaz-stream的程序就自然具备组合的能力了。我们可以用两个独立的程序来示范Process程序组合:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 import Process._
 6 object prgStream extends App {
 7   def prompt(prmpt: String) = Task.delay { print(prmpt) }
 8   def putLine(line: String) = Task.delay { println(line) }
 9   def getLine = Task.delay { Console.readLine }
10   val readLine1 = prompt("Prg1>:").flatMap {_ => getLine}
11   val readLine2 = prompt("Prg2>:").flatMap {_ => getLine}
12   val stdOutput = constant(putLine _).toSource
13   def multiplyBy(n: Int) = await1[String].flatMap {line => 
14       if (line.isEmpty) halt
15       else emit((line.toDouble * n).toString)
16   }.repeat
17   val prg1: Process[Task,String] =  {
18     def go(line: String): Process[Task,String] = line.toUpperCase match {
19       case "QUIT" => halt
20       case _ => emit(line) ++ await(readLine1)(go)
21     }
22     await(readLine1)(go)   
23   }.onComplete{ Process.eval(Task.delay {println("end of program1"); ""}) }
24   val prg2: Process[Task,String] =  {
25     def go(line: String): Process[Task,String] = line.toUpperCase match {
26       case "QUIT" => halt
27       case _ => emit(line) ++ await(readLine2)(go)
28     }
29     await(readLine2)(go)   
30   }.onComplete{ Process.eval(Task.delay {println("end of program2"); ""}) } 
31   val program1 = (prg1 |> multiplyBy(3) to stdOutput)
32   val program2 = (prg2 |> multiplyBy(5) to stdOutput)
33   
34   (program1 ++ program2).run.run
35   
36 } 

因为program的类型是Process[Task,String],所以我们可以用++把它们连接起来。同时我们应该看到在program的形成过程中transducer multiplyBy是如何用|>与prg组合的。现在我们看看测试运算结果:

 1 Prg1>:3
 2 9.0
 3 Prg1>:4
 4 12.0
 5 Prg1>:quit
 6 end of program1
 7 Prg2>:5
 8 25.0
 9 Prg2>:6
10 30.0
11 Prg2>:quit
12 end of program2

我们看到程序是按照流程走的。下面再试个流程控制程序分发(dispatching)的例子:

 1  val program1 = (prg1 |> multiplyBy(3) observe stdOutput)
 2   val program2 = (prg2 |> multiplyBy(5) observe stdOutput)
 3   
 4   //(program1 ++ program2).run.run
 5   val getOption = prompt("Enter your choice>:").flatMap {_ => getLine }
 6   val mainPrg: Process[Task,String] = {
 7      def go(input: String): Process[Task,String] = input.toUpperCase match {
 8        case "QUIT" => halt
 9        case "P1" => program1 ++ await(getOption)(go)
10        case "P2" => program2 ++ await(getOption)(go)
11        case _ => await(getOption)(go)
12      }
13      await(getOption)(go)
14   }.onComplete{ Process.eval(Task.delay {println("end of main"); ""}) } 
15   
16   mainPrg.run.run

我们先把program1和program2的终点类型Sink去掉。用observe来实现数据复制分流。这样program1和program2的结果类型才能与await的类型相匹配。我们可以测试运行一下:

 1 Enter your choice>:p2
 2 Prg2>:3
 3 15.0
 4 Prg2>:5
 5 25.0
 6 Prg2>:quit
 7 end of program2
 8 Enter your choice>:p1
 9 Prg1>:3
10 9.0
11 Prg1>:6
12 18.0
13 Prg1>:quit
14 end of program1
15 Enter your choice>:wat
16 Enter your choice>:oh no
17 Enter your choice>:quit
18 end of main

scalaz-stream是一种泛函类型。我们在上面已经示范了它的函数组合能力。当然,如果程序的类型是Process,那么我们可以很容易地用merge来实现并行运算。

scalaz-stream作为一种程序运算框架可以轻松实现FP程序的组合,那么它成为一种安全稳定的泛函多线程编程工具就会是很好的选择。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android随笔

LeakCanary笔记

RefWatcher 的代理类。通过注册 ActivityLifecycleCallbacks 回调,当 Activity 调用 onDestroy() 时进行...

712
来自专栏DOTNET

.Net多线程编程—Parallel LINQ、线程池

Parallel LINQ 1 System.Linq.ParallelEnumerable 重要方法概览: 1)public static ParallelQ...

2787
来自专栏高爽的专栏

Java线程(十):CAS

前言        在Java并发包中有这样一个包,java.util.concurrent.atomic,该包是对Java部分数据类型的原子封装,在原有数据类...

1780
来自专栏Java Web

Java I/O不迷茫,一文为你导航!

学习过计算机相关课程的童鞋应该都知道,I/O 即输入Input/ 输出Output的缩写,最容易让人联想到的就是屏幕这样的输出设备以及键盘鼠标这一类的输入设备,...

542
来自专栏函数式编程语言及工具

泛函编程(38)-泛函Stream IO:IO Process in action

  在前面的几节讨论里我们终于得出了一个概括又通用的IO Process类型Process[F[_],O]。这个类型同时可以代表数据源(Source)和数据终端...

1787
来自专栏xingoo, 一个梦想做发明家的程序员

漫谈Java IO之普通IO流与BIO服务器

今天来复习一下基础IO,也就是最普通的IO。 网络IO的基本知识与概念 普通IO以及BIO服务器 NIO的使用与服务器Hello world Netty入...

2455
来自专栏一个会写诗的程序员的博客

并发编程之CAS(Compare and Swap)原理Unsafe类

AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现...

761
来自专栏我是攻城师

理解Java轻量级并发包Atom系列工具类的设计

他们的主要功能是提供轻量级的同步能力从而帮助我们避免内存一致性错误,从源码中观察这些工具类其设计主要利用了CAS原语+volatile的功能。我们知道volat...

614
来自专栏java达人

ConcurrentHashMap使用示例

作者:mononite 链接:https://my.oschina.net/mononite/blog/144329(点击文末阅读原文前往) Concurren...

1958
来自专栏菩提树下的杨过

java学习:JMM(java memory model)、volatile、synchronized、AtomicXXX理解

一、JMM(java memory model)内存模型 从网上淘来二张图: ? 上面这张图说的是,在多核CPU的系统中,每个核CPU自带高速缓存,然后计算机主...

18110

扫码关注云+社区