FunDA(8)- Static Source:保证资源使用安全 - Resource Safety

   我们在前面用了许多章节来讨论如何把数据从后台数据库中搬到内存,然后进行逐行操作运算。我们选定的解决方案是把后台数据转换成内存中的数据流。无论在打开数据库表或从数据库读取数据等环节都涉及到对数据库表这项资源的安全使用:我们最起码要保证在完成使用或者使用中途出现错误异常退出时能释放占用的资源。谈到资源使用安全,不得不想到函数式编程通用的那个bracket函数,fs2同样提供了这个函数:

def bracket[F[_],R,A](r: F[R])(use: R => Stream[F,A], release: R => F[Unit]): Stream[F,A] = Stream.mk {
    StreamCore.acquire(r, release andThen (Free.eval)) flatMap { case (_, r) => use(r).get }
  }

这个函数的入参数r,use,release都涉及到了资源占用处理:r一般是打开文件或者库表操作,use是资源使用如读取数据过程,release 顾名思义就是正常完成资源使用后的资源释放清理过程。函数bracket能保证这些过程的正确引用。

我们用几个例子来分析一下这个函数的功能:

val s = Stream.bracket(Task.delay(throw new Exception("Oh no!")))(
  _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
  _ => Task.delay(println("normal end")))         
s.runLog.unsafeRun                                //> java.lang.Exception: Oh no!
                                                  //|     at demo.ws.streams$$anonfun$main$1$$anonfun$1.apply(demo.ws.streams.scal
                                                  //| a:4)
                                                  //|     at demo.ws.streams$$anonfun$main$1$$anonfun$1.apply(demo.ws.streams.scal
                                                  //| a:4)

在上面这个例子里我们人为在两个地方制造了异常。我们可以用onError来截获这些异常: 

val s1 = s.map(_.toString).onError {e => Stream.emit(e.getMessage)}
                                                  
s1.runLog.unsafeRun                               //> res0: Vector[String] = Vector(Oh no!)

必须用toString转换了Stream元素类型后才能把截获的异常信息放进Stream。注意release未调用,因为资源还没有被占用。但是如果除了释放资源外还有其它清理工作的话,我们可以用onFinalize来确保一定可以调用清理程序:

val s5 = s1.onFinalize(Task.delay{println("finally end!")})
 
s5.runLog.unsafeRun                               //> finally end!
                                                  //| res1: Vector[String] = Vector(Oh no!)

如果在使用资源中间出现异常会怎样?

val s3 = Stream.bracket(Task.delay())(
  _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
  _ => Task.delay(println("normal end"))) 
val s4 = s3.map(_.toString).onError {e => Stream.emit(e.getMessage)}
         .onFinalize(Task.delay{println("finally end!")})

s4.runLog.unsafeRun                               //> normal end
                                                  //| finally end!
                                                  //| res2: Vector[String] = Vector(1, 2, 3, boom!)

返回结果res2正确记录了出错地点,而且所有清理过程都得到运行。当然,我们可以不用动Stream元素类型,用attempt:

val s6 = s3.attempt.onError {e => Stream.emit(e.getMessage)}
         .onFinalize(Task.delay{println("finally end!")})
 
s6.runLog.unsafeRun                               //> normal end
                                                  //| finally end!
 //| res3: Vector[Object] = Vector(Right(1), Right(2), Right(3), Left(java.lang.Exception: boom!))

我们在前面FunDA(1)里讨论过运算slick Query Action run返回结果类型是Future[Iterable[ROW]]。Slick获取数据的方式是一次性读入内存,所以本期标题提到的Static-Source就是指这样的一个内存中的集合。那么我们就可以不必考虑开启并占用数据库表这项操作了。我们只需要用FunDA DataRowType.getTypedRow函数获取了Iterable[ROW]结果后直接传给bracket就行了。现在最重要的是如何把Seq[ROW]转换成Stream[F[_],ROW]。我们可以用Seq的fold函数来构建Stream: 

val data = Seq(1,2,3,4)                           //> data  : Seq[Int] = List(1, 2, 3, 4)
val s8 = data.foldLeft(Stream[Task,Int]())((s,a) => s ++ Stream.emit(a))
def log[A](prompt: String): Pipe[Task,A,A] =
    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
                                                  //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]

s8.through(log("")).run.unsafeRun                 //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4

表面上看好像没什么问题,但仔细分析:Seq[ROW]可以是个超大的集合,而foldLeft是个递归函数,无论是否尾递归都有可能造成堆栈溢出错误(StackOverflowError)。看来还是用freemonad,它可以把每步运算都存放在内存结构里,可以在固定的堆栈空间运算。下面的函数用fs2.Pull类型结构可以把Seq[ROW]转换成Stream[F[_],ROW]:

 def pullSeq[ROW](h: Seq[ROW]): Pull[Task, ROW, Unit] = {
    val it = h.iterator
    def go(it: Iterator[ROW]): Pull[Task, ROW, Unit] = for {
      res <- Pull.eval(Task.delay({ if (it.hasNext) Some(it.next()) else None }))
      next <- res.fold[Pull[Task, ROW, Unit]](Pull.done)(o => Pull.output1(o) >> go(it))
    } yield next
    go(it)
  }                                                  
 def streamSeq[ROW](h: Seq[ROW]): Stream[Task, ROW] =
    pullSeq(h).close

虽然go是个递归函数,但因为Pull是个freemonad,每个flapMap循环(>>)会把新的Iterable it状态存放在heap内存里。由于每个步骤都是存放在内存结构里的,而运算这些步骤的模式是靠下游拖动逐步运算的,也就是说按下游拖动每次产生一个元素。pullSeq返回Pull,Pull.close >>> Stream,这就是streamSeq函数的作用了。现在我们可以直接用bracket来安全构建Stream:

 val s9 = Stream.bracket(Task.delay(data))(streamSeq, _ => Task.delay())                                                
 s9.through(log("")).run.unsafeRun               //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4

现在可以放心了。但我们的目的是为大众编程人员提供一个最低门槛的工具库,他们不需要了解Task, onError,onFinalize。。。我们必须把bracket函数使用方式搞得更直白点,让用户可以更容易调用:

  type FDAStream[A] = Stream[Task,A]
  implicit val strategy = Strategy.fromFixedDaemonPool(4)
                                                  //> strategy  : fs2.Strategy = Strategy

  def fda_staticSource[ROW](acquirer: => Seq[ROW],
                            releaser: => Unit = (),
                            errhandler: Throwable => FDAStream[ROW] = null,
                            finalizer: => Unit = ()): FDAStream[ROW] = {
     val s = Stream.bracket(Task(acquirer))(r => streamSeq(r), r => Task(releaser))
     if (errhandler != null)
       s.onError(errhandler).onFinalize(Task.delay(finalizer))
     else
       s.onFinalize(Task.delay(finalizer))
  }                                               //> fda_staticSource: [ROW](acquirer: => Seq[ROW], releaser: => Unit, errhandle
                                                  //| r: Throwable => demo.ws.streams.FDAStream[ROW], finalizer: => Unit)demo.ws.
                                                  //| streams.FDAStream[ROW]

如果完整调用fda_staticSource可以如下这样:

  val s10 = fda_staticSource(data,
     println("endofuse"), e => { println(e.getMessage);Stream.emit(-99) },
     println("finallyend")) 
  s10.through(log("")).run.unsafeRun              //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4
                                                  //| endofuse
                                                  //| finallyend

最简单直接的方式如下:

  val s11 = fda_staticSource(acquirer = data) 
  s11.through(log("")).run.unsafeRun              //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4

又或者带异常处理过程的调用方法:

  val s12 = fda_staticSource(acquirer = data, errhandler = {e => println(e.getMessage);Stream()})
 
  s12.through(log("")).run.unsafeRun              //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4

下面是这次讨论示范的源代码:

import fs2._
object streams {
val s = Stream.bracket(Task.delay(throw new Exception("Oh no!")))(
  _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
  _ => Task.delay(println("normal end")))
//s.runLog.unsafeRun
val s1 = s.map(_.toString).onError {e => Stream.emit(e.getMessage)}
s1.runLog.unsafeRun
val s5 = s1.onFinalize(Task.delay{println("finally end!")})
s5.runLog.unsafeRun

val s3 = Stream.bracket(Task.delay())(
  _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
  _ => Task.delay(println("normal end")))
val s4 = s3.map(_.toString).onError {e => Stream.emit(e.getMessage)}
         .onFinalize(Task.delay{println("finally end!")})
s4.runLog.unsafeRun
val s6 = s3.attempt.onError {e => Stream.emit(e.getMessage)}
         .onFinalize(Task.delay{println("finally end!")})

s6.runLog.unsafeRun
                                                  
val data = Seq(1,2,3,4)
val s8 = data.foldLeft(Stream[Task,Int]())((s,a) => s ++ Stream.emit(a))
def log[A](prompt: String): Pipe[Task,A,A] =
    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}

s8.through(log("")).run.unsafeRun
                                              
  def pullSeq[ROW](h: Seq[ROW]): Pull[Task, ROW, Unit] = {
    val it = h.iterator
    def go(it: Iterator[ROW]): Pull[Task, ROW, Unit] = for {
      res <- Pull.eval(Task.delay({ if (it.hasNext) Some(it.next()) else None }))
      next <- res.fold[Pull[Task, ROW, Unit]](Pull.done)(o => Pull.output1(o) >> go(it))
    } yield next
    go(it)
  }
  def streamSeq[ROW](h: Seq[ROW]): Stream[Task, ROW] =
    pullSeq(h).close
  val s9 = Stream.bracket(Task.delay(data))(streamSeq, _ => Task.delay())
  s9.through(log("")).run.unsafeRun
  
  type FDAStream[A] = Stream[Task,A]
  implicit val strategy = Strategy.fromFixedDaemonPool(4)

  def fda_staticSource[ROW](acquirer: => Seq[ROW],
                            releaser: => Unit = (),
                            errhandler: Throwable => FDAStream[ROW] = null,
                            finalizer: => Unit = ()): FDAStream[ROW] = {
     val s = Stream.bracket(Task(acquirer))(r => streamSeq(r), r => Task(releaser))
     if (errhandler != null)
       s.onError(errhandler).onFinalize(Task.delay(finalizer))
     else
       s.onFinalize(Task.delay(finalizer))
  }
  val s10 = fda_staticSource(data,
     println("endofuse"), e => { println(e.getMessage);Stream.emit(-99) },
     println("finallyend"))
  s10.through(log("")).run.unsafeRun
   val s11 = fda_staticSource(acquirer = data)
   s11.through(log("")).run.unsafeRun
   val s12 = fda_staticSource(acquirer = data, errhandler = {e => println(e.getMessage);Stream()})
   s12.through(log("")).run.unsafeRun
 
 }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏web前端教室

【JS高程】第3章 3.4.5(3)NaN(节选)

NaN,它的全称是 Not a Number,即非数值。用来表示“一个本来要返回数值的操作数,却未返回数值时的情况”。这样就不会报错了嘛。 在ECMAScrip...

3227
来自专栏Python

Django中Q查询及Q()对象

问题 一般我们在Django程序中查询数据库操作都是在QuerySet里进行进行,例如下面代码: >>> q1 = Entry.objects.filter(h...

2045
来自专栏一个会写诗的程序员的博客

Integer 与 Long 数字类型的比较:Java与Kotlin的细节不同

我们在数学中,123 == 123 , 直觉上是一目了然的。但是到了计算机编程语言中, 问题就显得有点“傻瓜”化了。

912
来自专栏前端架构与工程

【译】《Understanding ECMAScript6》- 第一章-基础知识(二)

块绑定 JavaScript中使用var进行变量声明的机制非常怪异。在大多数C系列的编程语言中,变量的创建是在被声明的时刻同时进行的。但是JavaScript并...

2525
来自专栏深度学习之tensorflow实战篇

hive中数据类型的转化CAST

在《Hive内置数据类型》文章中,我们提到了Hive内置数据类型由基本数据类型和复杂数据类型组成。今天的话题是Hive数据类型之间的转换。同Java语言一样,H...

6443
来自专栏撸码那些事

【封装那些事】 未利用封装

1424
来自专栏一个会写诗的程序员的博客

13.11 Scala混用Java的集合类调用scala的foreach遍历问题13.11 Scala混用Java的集合类调用scala的foreach遍历问题问题描述原因分析解决方案

由于都运行在JVM上,Java与Scala之间基本能做到无缝的集成,区别主要在于各自的API各有不同。由于Scala为集合提供了更多便捷的函数,因此,Java与...

564
来自专栏大闲人柴毛毛

三分钟理解“策略模式”——设计模式轻松掌握

实际问题: 由于超市隔三差五就要办促销活动,但每次促销活动的方式不一样,所以需要开发一个软件,营业员只要输入原价再选择活动类型后,就能计算出折扣以后的价钱。...

32914
来自专栏Play & Scala 技术分享

Java字符串处理技巧

2705
来自专栏码匠的流水账

聊聊flink StreamOperator的initializeState方法

本文主要研究一下flink StreamOperator的initializeState方法

703

扫码关注云+社区