泛函编程(38)-泛函Stream IO:IO Process in action

  在前面的几节讨论里我们终于得出了一个概括又通用的IO Process类型Process[F[_],O]。这个类型同时可以代表数据源(Source)和数据终端(Sink)。在这节讨论里我们将针对Process[F,O]的特性通过一些应用实例来示范它的组合性(composibility)和由数据源到接收终端IO全过程的功能完整性。

  我们已经在前面的讨论中对IO Process的各种函数组合进行了调研和尝试,现在我们先探讨一下数据源设计方案:为了实现资源使用的安全性和IO程序的可组合性,我们必须保证无论在完成资源使用或出现异常时数据源都能得到释放,同时所有副作用的产生都必须延后直至Interpreter开始运算IO程序时。

 我们先试试一个读取文件字符内容的组件:

 1  import java.io.{BufferedReader, FileReader}
 2   def readFile(fileName: String): Process[IO,String] =
 3     await[IO,BufferedReader,String](IO{new BufferedReader(new FileReader(fileName))}){
 4         case Left(err) => Halt(err)
 5         case Right(r) => {
 6             lazy val next: Process[IO,String] = await(IO{r.readLine}) {
 7                 case Left(err2) => await(IO{r.close}){_ => Halt[IO,String](err2)}()
 8                 case Right(line) => emit(line, next)
 9             }()
10             next
11         }
12     }()

注意以下几个问题:首先是所有的IO动作都是通过await函数来实现的,再就是所有产生副作用的语句都被包嵌在IO{}里,这是典型的延迟运算。我们先来看看这个await函数:

1   def await[F[_],A,O](req: F[A])(
2      rcvfn: Either[Throwable,A] => Process[F,O] = (a: Either[Throwable,A]) => Halt[F,O](End))
3      (fallback: Process[F,O] = Halt[F,O](End),
4      onError: Process[F,O] = Halt[F,O](End)): Process[F,O] = Await(req,rcvfn,fallback,onError)

req是个F[A],在例子里就是IO[A]。把这个函数精简表述就是: await(IO(iorequest))(ioresult => Process)。从这个函数的款式可以看出:在调用await函数的时候并不会产生副作用,因为产生副作用的语句是包嵌在IO{}里面的。我们需要一个interpreter来运算这个IO{...}产生副作用。await的另一个输入参数是 iorequest => Process:iorequest是运算iorequest返回的结果:可以是A即String或者是个异常Exception。所以我们可以用PartialFunction来表达:

{ case Left(err) => Process[IO,???]; case Right(a) => Process[IO,???] } 

我们用PartialFunction来描述运算iorequest后返回正常结果或者异常所对应的处理办法。

上面的例子readFile就是用这个await函数来打开文件: IO {new BufferedReader(new FileReader(fileName))}

读取一行:IO {r.readLine},如果读取成功则发送emit出去: case Right(line) => emit(line,next), 

如果出现异常则关闭文件:case Left(err) => IO {r.close},注意我们会使用异常End来代表正常完成读取:

1   case object End extends Exception
2   case object Kill extends Exception

以上的Kill是强行终止信号。刚才说过,我们需要用一个interpreter来运算readFile才能正真产生期望的副作用如读写文件。现在我们就来了解一个interpreter:

 1   def collect[O](src: Process[IO,O]): IndexedSeq[O] =  {
 2      val E = java.util.concurrent.Executors.newFixedThreadPool(4)
 3      def go(cur: Process[IO,O], acc: IndexedSeq[O]): IndexedSeq[O] = cur match {
 4         case Halt(e) => acc
 5         case Emit(os,ns) => go(ns, acc ++ os)
 6         case Halt(err) => throw err
 7         case Await(rq,rf,fb,fl) =>
 8             val next =
 9               try rf(Right(unsafePerformIO(rq)(E)))
10               catch { case err: Throwable => rf(Left(err)) }
11             go(next, acc)
12                 
13      }
14      try go(src, IndexedSeq())
15      finally E.shutdown
16   }

首先要注意的是这句:unsafePerformIO(rq)(E)。它会真正产生副作用。当Process[IO,O] src当前状态是Await的时候就会进行IO运算。运算IO产生的结果作为Await的rf函数输入参数,正如我们上面描述的一样。所以,运算IO{iorequest}就是构建一个Await结构把iorequest和转换状态函数rf放进去就像这样:

await(iorequest)(rf)(fb,fl) = Await(ioreques,rf,fb,fl),

然后返回到collect,collect看到src状态是Await就会运算iorequest然后再运行rf。

我们的下一个问题是如何把文件里的内容一行一行读入而不是一次性预先全部搬进内存,这样我们可以读一行,处理一行,占用最少内存。我们再仔细看看readFile的这个部分:

 1   def readFile(fileName: String): Process[IO,String] =
 2     await[IO,BufferedReader,String](IO{new BufferedReader(new FileReader(fileName))}){
 3         case Left(err) => Halt(err)
 4         case Right(r) => {
 5             lazy val next: Process[IO,String] = await(IO{r.readLine}) {
 6                 case Left(err2) => Halt[IO,String](err2) //await(IO{r.close}){_ => Halt[IO,String](err2)}()
 7                 case Right(line) => emit(line, next)
 8             }()
 9             next
10         }
11     }()

如果成功创建BufferedReader,运算IO产生Right(r)结果;运算IO{r.readLine}后返回最终结果next。next可能是Halt(err)或者Emit(line,next)。如果这样分析那么整个readFile函数也就会读入文件的第一行然后emit输出。记着泛函编程特点除了递归算法之外还有状态机器(state machine)方式的程序运算。在以上例子里的运算结果除输出值line外还有下一个状态next。再看看以下这个组件:

1   //状态进位,输出Process[F,O2]
2   final def drain[O2]: Process[F,O2] = this match {
3       case Halt(e) => Halt(e)      //终止  
4       case Emit(os,ns) => ns.drain  //运算下一状态ns,输出
5       case Await(rq,rf,fb,cl) => Await(rq, rf andThen (_.drain)) //仍旧输出Await
6   }

这个drain组件实际上起到了一个移动状态的作用。如果我们这样写:readFile("myfile.txt").drain 那么在我们上面的例子里readFile返回Emit(line,next);drain接着readFile输出状态就会运算next,这样程序又回到readFile next的IO{r.readline}运算中了。如果我们在drain组件前再增加一些组件:

1 readFile("farenheit.txt").filter(line => !line.startsWith("#").map(line => line.toUpperCase).drain

那么我们就会得到读取一行字符;过滤起始为#的行;转成大写字符;返回再读一行交替循环这样的效果了。

很明显readFile实在太有针对性了。函数类型款式变的复杂可读性低。我们需要一种更概括的形式来实现泛函编程语言的简练而流畅表达形式。

我们首先应该把IO运算方式重新定义一下。用await函数显得太复杂:

1   //await 的精简表达形式
2   def eval[F[_],A](fa: F[A]): Process[F,A] = //运算F[A]
3     await[F,A,A](fa){
4     case Left(err) => Halt(err)
5     case Right(a) => emit(a, Halt(End))
6   }()
7   def evalIO[A](ioa: IO[A]) = eval[IO,A](ioa)   //运算IO[A]
8   //确定终结的运算
9   def eval_[F[_],A,B](fa: F[A]): Process[F,B] = eval[F,A](fa).drain[B] //运算F[A]直到终止

如此运算IO只需要这样写:eval(iorequest),是不是精简多了。

再来一个通用安全的IO资源使用组件函数:

 1  def resource[R,O](   //通用IO程序运算函数
 2       acquire: IO[R])(  //获取IO资源。open file
 3       use: R => Process[IO,O])(  //IO运算函数  readLine
 4       release: R => Process[IO,O]): Process[IO,O] = //释放资源函数 close file
 5     eval(acquire) flatMap { r => use(r).onComplete(release(r)) }
 6   
 7   def resource_[R,O](   //与resource一样,只是运算realease直至终止
 8       acquire: IO[R])(  //获取IO资源。open file
 9       use: R => Process[IO,O])(  //IO运算函数  readLine
10       release: R => IO[Unit]): Process[IO,O] = //释放资源函数 close file
11     resource(acquire)(use)(release andThen (eval_[IO,Unit,O]))

以下是个套用resource组件的例子:从一个文件里逐行读出,在完成读取或出现异常时主动释放资源:

 1   def lines(fileName: String): Process[IO,String] = //从fileName里读取
 2      resource 
 3      {IO {io.Source.fromFile(fileName)}}  //占用资源
 4      {src =>  //使用资源。逐行读取
 5        lazy val iter = src.getLines 
 6        def nextLine = if (iter.hasNext) Some(iter.next) else None //下一行
 7        lazy val getLines: Process[IO,String] =  //读取
 8          eval(IO{nextLine}) flatMap {  //运算IO
 9              case None => Halt(End)   //无法继续读取:完成或者异常
10              case Some(line) => emit(line, getLines) //读取然后发送
11          }
12        getLines
13      }
14      {src => eval_ (IO{src.close})} //释放资源

现在我们应该可以很简练但又不失清楚详尽地描述一段IO程序:

打开文件fahrenheit.txt 

读取一行字符

过滤空行或者以#开始的字行,可通过的字行代表亨氏温度数

把亨氏温度转换成摄氏温度数

这里面的温度转换函数如下:

1  def fahrenheitToCelsius(f: Double): Double =
2     (f - 32) * 5.0/9.0

那么整个程序就可以这样写了:

1       lines("fahrenheit.txt").
2       filter(line => !line.startsWith("#") && !line.trim.isEmpty).
3       map(line => fahrenheitToCelsius(line.toDouble).toString).
4       drain

这段代码是不是很清晰的描述了其所代表的功能,不错!

现在到了了解IO过程的另一端:Sink的时候了。我们如果需要通过Process来实现输出功能的话,也就是把Source[O]的这个O发送输出到一个Sink。实际上我们也可以用Process来表达Sink,先看一个简单版本的Sink如下:

1   type SimpleSink[F[_],O] = Process[F,O => F[Unit]]        

SimpleSink就是一个IO Process,它的输出是一个 O => F[Unit]函数,用一个例子来解释:

1   def simpleWriteFile(fileName: String, append: Boolean = false) : SimpleSink[IO, String] =
2     resource[FileWriter, String => IO[Unit]] 
3     {IO {new FileWriter(fileName,append)}}   //acquire
4     {w => IO{(s:String) => IO{w.write(s)}}}  //use
5     {w => IO{w.close}}  //release

下面是个可使用的Sink:

 1     type Sink[F[_],O] = Process[F, O => Process[F,Unit]]
 2 
 3     import java.io.FileWriter
 4 
 5     def fileW(file: String, append: Boolean = false): Sink[IO,String] =
 6       resource[FileWriter, String => Process[IO,Unit]]
 7         { IO { new FileWriter(file, append) }}
 8         { w => stepWrite { (s: String) => eval[IO,Unit](IO(w.write(s))) }} //重复循环逐行写
 9         { w => eval_(IO(w.close)) }
10 
11     /* 一个无穷循环恒量stream. */
12     def stepWrite[A](a: A): Process[IO,A] =
13       eval(IO(a)).flatMap { a => Emit(a, stepWrite(a)) } 通过Emit的下一状态重复运算IO(a)

我们需要实现逐行输出,所以用这个stepWrite来运算IO。stepWrite是通过返回Emit来实现无穷循环的。

下一步是把Sink和Process对接起来。我们可以用以下的to组件来连接:

1     def to[O2](sink: Sink[F,O]): Process[F,Unit] =
2       join { (this zipWith sink)((o,f) => f(o)) } 

join组件就是标准的monadic组件,因为我们需要把 Process[F,Process[F,Unit]]打平为Process[F,Unit]:

1     def join[F[_],A](p: Process[F,Process[F,A]]): Process[F,A] = 
2       p.flatMap(pa => pa)

现在我们可以在前面例子里的Process过程中再增加一个写入celsius.txt的组件:

1     val converter: Process[IO,Unit] =
2       lines("fahrenheit.txt"). //读取
3       filter(line => !line.startsWith("#") && !line.trim.isEmpty). //过滤
4       map(line => fahrenheitToCelsius(line.toDouble).toString).  //温度转换
5       pipe(intersperse("\n")).  //加end of line
6       to(fileW("celsius.txt")).  //写入
7       drain     //继续循环

上面的Sink类型运算IO后不返回任何结果(Unit)。但有时我们希望IO运算能返回一些东西,如运算数据库query之后返回结果集,那我们需要一个新的类型:

1     type Channel[F[_],I,O] = Process[F, I => Process[F,O]]

Channel和Sink非常相似,差别只在Process[F,O]和Process[F,Unit]。

我们用Channel来描述一个数据库查询:

 1     import java.sql.{Connection, PreparedStatement, ResultSet}
 2 
 3     def query(conn: IO[Connection]):
 4         Channel[IO, Connection => PreparedStatement, Map[String,Any]] = //Map === Row
 5       resource_     //I >>> Connection => PreparedStatement
 6         { conn }  //打开connection
 7         { conn => constant { (q: Connection => PreparedStatement) => //循环查询
 8           resource_
 9             { IO {    //运行query
10                 val rs = q(conn).executeQuery   
11                 val ncols = rs.getMetaData.getColumnCount
12                 val cols = (1 to ncols).map(rs.getMetaData.getColumnName)
13                 (rs, cols)
14             }}
15             { case (rs, cols) =>   //读取纪录Row
16                 def step =
17                   if (!rs.next) None
18                   else Some(cols.map(c => (c, rs.getObject(c): Any)).toMap)
19                 lazy val rows: Process[IO,Map[String,Any]] = //循环读取
20                   eval(IO(step)).flatMap {
21                     case None => Halt(End)
22                     case Some(row) => Emit(row, rows)  //循环运算rows函数
23                   }
24                 rows
25             }
26             { p => IO { p._1.close } } // close the ResultSet
27         }}
28         { c => IO(c.close) }

以下提供更多的应用示范:

从一个文件里读取存放亨氏温度的文件名后进行温度转换并存放到celsius.txt中

1     val convertAll: Process[IO,Unit] = (for {
2       out <- fileW("celsius.txt").once  // out的类型是String => Process[IO,Unit]
3       file <- lines("fahrenheits.txt") //fahrenheits.txt里保存了一串文件名
4       _ <- lines(file).  //动态打开文件读取温度记录
5            map(line => fahrenheitToCelsius(line.toDouble)). //温度系统转换
6            flatMap(celsius => out(celsius.toString)) //输出
7     } yield ()) drain  //继续循环

输出到多个.celsius文件:

1    val convertMultisink: Process[IO,Unit] = (for {
2       file <- lines("fahrenheits.txt") //读取文件名称
3       _ <- lines(file). //打开文件读取温度数据
4            map(line => fahrenheitToCelsius(line.toDouble)). //温度系统转换
5            map(_ toString). 
6            to(fileW(file + ".celsius")) //写入文件
7     } yield ()) drain

我们可以按需要在处理过程中增加处理组件:

 1    val convertMultisink2: Process[IO,Unit] = (for {
 2       file <- lines("fahrenheits.txt")
 3       _ <- lines(file).
 4            filter(!_.startsWith("#")). //过滤#开始字串
 5            map(line => fahrenheitToCelsius(line.toDouble)).
 6            filter(_ > 0). // 过滤0度以下温度
 7            map(_ toString).
 8            to(fileW(file + ".celsius"))
 9     } yield ()) drain
10  

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Jackson0714

好用的SQL TVP~~独家赠送[增-删-改-查]的例子

1384
来自专栏calmound

多线程-互斥变量

第一个 CreateMutex 函数功能:创建互斥量(注意与事件Event的创建函数对比) 函数原型: HANDLE  CreateMutex(   LPSEC...

2434
来自专栏程序员宝库

JDK 源码中的一些“小技巧”

均摘选自JDK源码 1 i++ vs i-- String源码的第985行,equals方法中: while (n--!= 0) { if...

3335
来自专栏醒者呆

编程语言的基础——搞定JavaIO

关键字:IO基础,JUnit生命周期,字节流,字符流,字符编码,对象流,序列化,反序列化 Java I/O 流是一组有顺序的,有起点和终点的字节集合。是对设...

3745
来自专栏云瓣

JS 装饰器解析

随着 ES6 和 TypeScript 中类的引入,在某些场景需要在不改变原有类和类属性的基础上扩展些功能,这也是装饰器出现的原因。 装饰器简介 作为一种可以动...

2975
来自专栏Android 研究

Retrofit解析9之流程解析

我们讲解Retrofit整体流程,就依据官方给的demo来吧,代码如下: 代码如下:

1133
来自专栏java 成神之路

Java 序列化 之 Serializable

35013
来自专栏desperate633

设计模式之代理模式(Proxy模式)代理模式的引入代理模式的实例程序代理模式分析

Proxy是代理人的意思,指的是代替别人进行工作的人。当不一定需要本人亲自去做的工作的时候,就可以寻找代理人去完成。 但在代理模式中,往往是相反的,通常是代理...

492
来自专栏函数式编程语言及工具

Scalaz(50)- scalaz-stream: 安全的无穷运算-running infinite stream freely

scalaz-stream支持无穷数据流(infinite stream),这本身是它强大的功能之一,试想有多少系统需要通过无穷运算才能得以实现。这是因为外界...

1796
来自专栏大内老A

Enterprise Library Policy Injection Application Block 之三:PIAB的扩展—创建自定义CallHandler(提供Source Code下载)

本系列的第一部分对PIAB使用场景进行了简单的介绍,作中阐述了通过PI(Policy Injection)的方式实现了Business Logic和Non-Bu...

28410

扫码关注云+社区