Scalaz(56)- scalaz-stream: fs2-安全运算,fs2 resource safety

    fs2在处理异常及资源使用安全方面也有比较大的改善。fs2 Stream可以有几种方式自行引发异常:直接以函数式方式用fail来引发异常、在纯代码里隐式引发异常或者在运算中引发异常,举例如下:

1 /函数式
2 val err = Stream(1,2,3) ++ Stream.fail(new Exception("oh,no..."))
3   //> err  : fs2.Stream[Nothing,Int] = append(Segment(Emit(Chunk(1, 2, 3))), Segment(Emit(Chunk(()))).flatMap(<function1>))
4 //隐式转换
5 val err1 = Stream(1,2,3) ++ (throw new Exception("oh my god!"))
6   //> err1  : fs2.Stream[Nothing,Int] = append(Segment(Emit(Chunk(1, 2, 3))), Segment(Emit(Chunk(()))).flatMap(<function1>))
7 //运算中
8 val err2 = Stream.eval(Task.delay { throw new Exception("it suck!")})
9   //> err2  : fs2.Stream[fs2.Task,Nothing] = attemptEval(Task).flatMap(<function1>)

我们可以用非函数式方式处理异常:

1 try err.toList catch {case e => println(e.getMessage)}
2                                                   //> oh,no...
3                                                   //| res0: Any = ()
4 try err2.run.unsafeRun catch {case e => println(e.getMessage)}
5                                                   //> it suck!

当然,我们会选择用纯代码方式处理异常:

1 err.map(_.toString).onError { case e => Stream.emit(e.getMessage) }.toList
2                                //> res1: List[String] = List(1, 2, 3, oh,no...)
3 err1.map(_.toString).onError { case e => Stream.emit(e.getMessage) }.toList
4                                //> res2: List[String] = List(1, 2, 3, oh my god!)
5 err2.onError { case e => Stream.emit(e.getMessage) }.runLog.unsafeRun
6                                //> res3: Vector[String] = Vector(it suck!)

我们在上一篇讨论中介绍过fs2提供了一个bracket函数来保证资源的安全使用。bracket函数款式是这样的:

def bracket[F[_],R,A](r: F[R])(use: R => Stream[F,A], release: R => F[Unit]) = Stream.mk {
    StreamCore.acquire(r, release andThen (Free.eval)) flatMap (use andThen (_.get))
}

r代表一个获取资源R的运算,use是对资源R的使用操作,release是事后对R的处理。安全使用资源就是无论use是正常完成或者异常中断,release都会保证得到运行。我们看看下面的例子:

 1 val counter = new java.util.concurrent.atomic.AtomicLong(0)
 2                                                   //> counter  : java.util.concurrent.atomic.AtomicLong = 0
 3 val acquire = Task.delay { println(s"acquiring:${counter.incrementAndGet}") }
 4                                                   //> acquire  : fs2.Task[Unit] = Task
 5 val release = Task.delay { println(s"releasing:${counter.decrementAndGet}") }
 6                                                   //> release  : fs2.Task[Unit] = Task
 7 Stream.bracket(acquire)(_ => Stream(4,5,6) ++ err, _ => release).run.unsafeRun
 8                                                   //> acquiring:1
 9                                                   //| releasing:0
10                                                   //| java.lang.Exception: oh,no...
11                                                   //|     at fs2Safety$$anonfun$main$1$$anonfun$3.apply(fs2Safety.scala:4)
12                                                   //|     at fs2Safety$$anonfun$main$1$$anonfun$3.apply(fs2Safety.scala:4)

在上面的例子里use会引发异常中断,但release还是得到运行。我们可以用onError来把错误信息截住: 

1  s1.map(_.toString).onError {case e => Stream.emit(e.getMessage)}.runLog.unsafeRun
2                                                   //> acquiring:1
3                                                   //| releasing:0
4                                                   //| res4: Vector[String] = Vector(4, 5, 6, 1, 2, 3, oh,no...)

我们也可以用attempt来获取所有运算结果:

1 s1.attempt.runLog.unsafeRun                       //> acquiring:1
2                                                   //| releasing:0
3                                                   //| res5: Vector[fs2.util.Attempt[Int]] = Vector(Right(4), Right(5), Right(6), Right(1), Right(2), Right(3), Left(java.lang.Exception: oh,no...))

我们再举个在bracket在中间环节里占用资源的例子:

 1  def logBracket[A]: A => Stream[Task,A] = a => {
 2    Stream.bracket(Task.delay { println(s"acquiring $a"); a })(
 3     _ => Stream.emit(a),
 4     _ => Task.delay { println(s"releasing $a") })
 5  }             //> logBracket: [A]=> A => fs2.Stream[fs2.Task,A]
 6  Stream(3).flatMap(logBracket).map{ n =>
 7    if (n>2) sys.error("oh no...") else n }.run.unsafeAttemptRun
 8               //> acquiring 3
 9               //| releasing 3
10               //| res6: fs2.util.Attempt[Unit] = Left(java.lang.RuntimeException: oh no...)

应该注意到:在任何情况下releasing都会运行。

实际上所谓安全的资源使用(resource safety)主要是指在任何形式的运算终止情况下运算的事后处理程序都能保证得到运行。运算的终止形式有以下几种:

1、正常终止。如Stream(1,2,3)的运算:在发出一节Chunk(1,2,3)后终止

2、异常终止。在运算过程中发生异常中途终止

3、强迫终止。用户强制终止,如:Stream.range(1,5).take(1),在发出元素1后就立刻终止

我们要注意的是第三种情况。先看个例子:

 1 val s5 = (Stream(1) ++ Stream.fail(new Exception("oh no...")))
 2 //> s5  : fs2.Stream[Nothing,Int] = append(Segment(Emit(Chunk(1))), Segment(Emit(Chunk(()))).flatMap(<function1>))
 3 s5.map(_.toString).onError {case e => Stream.emit(e.getMessage)}.toList
 4                                                   //> res7: List[String] = List(1, oh no...)
 5 s5.take(1).toList                                 //> res8: List[Int] = List(1)
 6 (Stream("a") ++ Stream("bc")).onComplete(Stream.emit("completed!")).toList
 7                                                   //> res9: List[String] = List(a, bc, completed!)
 8 s5.map(_.toString).onComplete(Stream.emit("completed!")).take(1).toList
 9                                                   //> res10: List[String] = List(1)
10 s5.covary[Task].map(_.toString).onFinalize(Task.delay { println("finalized!")})
11  .take(1).runLog.unsafeRun                        //> finalized!
12                                                   //| res11: Vector[String] = Vector(1)

我们看到:虽然s5会引发异常,可以用onError来捕获异常。但奇怪的是用take(1)后不会发生异常。这是因为take(1)是用户强制终止操作,即在发出一个元素后即刻终止。此时还没开始处理fail。值得注意的是运算遭到强制终止后onComplete是不会运行的,onFinalize则在任何情况下都能得到运行。 说到运算安全,FP的运行方式以递归算法为主:flatMap就是一个递归算法,那么在fs2里能不能保证运算的安全呢?下面的测试程序可以成为最具代表性的示范:

 1 // Sanity tests - not run as part of unit tests, but these should run forever
 2 // at constant memory.
 3 //
 4 object ResourceTrackerSanityTest extends App {
 5   val big = Stream.constant(1).flatMap { n =>
 6     Stream.bracket(Task.delay(()))(_ => Stream.emits(List(1, 2, 3)), _ => Task.delay(()))
 7   }
 8   big.run.unsafeRun
 9 }
10 
11 object RepeatPullSanityTest extends App {
12   def id[A]: Pipe[Pure, A, A] = _ repeatPull Pull.receive1 { case h #: t => Pull.output1(h) as t }
13   Stream.constant(1).covary[Task].throughPure(id).run.unsafeRun
14 }

运行以上两个程序都不会产生StackOverflowError错误。

从上面的讨论里我们知道了bracket函数是fs2建议的安全运算机制。我们可以用bracket来读取我们自定义的资源,如:数据库或者一些外设,这样我们可以确定当运算终止后事后处理机制一定会发生作用。fs2在io.file对象里提供了自身的文件读写功能,这些函数都具备了资源使用安全机制。也就是说当对fs2.file的使用终止后,事后处理机制运行是得到保证的。下面我们分享一个fs2.file的经典例子:

 1 def fahrenheitToCelsius(f: Double): Double =
 2   (f - 32.0) * (5.0/9.0)                          //> fahrenheitToCelsius: (f: Double)Double
 3 
 4 val converter: Task[Unit] =
 5   io.file.readAll[Task](java.nio.file.Paths.get("/users/tiger-macpro/fahrenheit.txt"), 4096)
 6     .through(text.utf8Decode)
 7     .through(text.lines)
 8     .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
 9     .map(line => fahrenheitToCelsius(line.toDouble).toString)
10     .intersperse("\n")
11     .through(text.utf8Encode)
12     .through(io.file.writeAll(java.nio.file.Paths.get("/users/tiger-macpro/celsius.txt")))
13     .run                                          //> converter  : fs2.Task[Unit] = Task
14   converter.unsafeRun

首先在这个例子里可以肯定所有使用的文件(fahrenheit.txt, celsius.txt)在任何情况下都会得到释放。readAll的函数款式是这样的:

def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] = {...}

值得注意的是readAll是按批次逐批从文件里读入的,这样可以避免一次性把所有内容全部搬到内存里。上面的例子是按4K字节读取的。readAll返回结果类型是Byte,我们要用个transducer把Byte转成String,这些转换函数可以在text对象里发现。text.utf8Decode的函数类型如下:

/** Converts UTF-8 encoded byte stream to a stream of `String`. */
  def utf8Decode[F[_]]: Pipe[F, Byte, String] =
    _.chunks.through(utf8DecodeC)

utf8Decode是个Pipe:从Byte转到String。同样如果从String转成Byte的话可以用utf8Encode。当我们需要把String写入文件时就需要utf8Encode来转换Byte了。writeAll的函数款式如下:

def writeAll[F[_]](path: Path, flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE))(implicit F: Effect[F]): Sink[F, Byte] = {...}

writeAll的结果类型是Sink[F,Byte],代表输入是Stream[F,Byte],所以我们必须用utf8Encode先把String转成Byte。text.lines是fs2自带的文字型iterator:在fs2里不再使用java的iterator了。另外interperse的作用是在元素由String转换成Byte之前先进行分行。在这篇讨论里我们主要介绍的是pipe对象中的函数。我们将会在下次关于多线程运算的讨论里介绍pipe2。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏PHP技术

MySQL的语句执行顺序

MySQL的语句一共分为11步,如下图所标注的那样,最先执行的总是 FROM操作,最后执行的是LIMIT操作。其中每一个操作都会产生一张虚拟的表,这个虚拟的表作...

38210
来自专栏跟着阿笨一起玩NET

Winform TreeView 查找下一个节点

var tn = _Tv.NextNodes().FirstOrDefault(x => Regex.IsMatch(x.Text, "(?i)" + txtK...

582
来自专栏名山丶深处

springaop——AspectJ不可不知的细节

2615
来自专栏小灰灰

java之的读取文件大全

java之的读取文件大全 使用java进行文件读写,因为使用的频率不高,加上写起来也没那么简单,经常容易忘记,然后就得去翻阅以前的笔记,或者找寻之前写的文件读...

27810
来自专栏蓝天

一个简单的支持MySQL和SQLite3的DB接口

simple_db.zip 相关联代码:https://github.com/eyjian/mooon/tree/master/common_library/...

812
来自专栏人工智能LeadAI

深度学习之主流数据库 | MySQL基础

这篇文章主要是讲一下常见的MySQL的安装,和基本操作。适合完全没有MySQL知识但是又急需一些MySQL知识的童靴作为快速入门使用。 背景与安装 背景不用多说...

4326
来自专栏生信宝典

DESeq2差异基因分析和批次效应移除

85110
来自专栏开发与安全

算法:Solutions for the Maximum Subsequence Sum Problem

The maximum subarray problem is the task of finding the contiguous subarray wit...

1848
来自专栏Android点滴积累

SharedPreferences 详解(多进程,存取数组解决方案)

一、SharedPreferences基本概念 文件保存路径:/data/data/<包名>/shared_prefs目录下目录下生成了一个SP.xml文件 S...

2109
来自专栏名山丶深处

springaop——AspectJ不可不知的细节

1217

扫码关注云+社区