Scalaz(51)- scalaz-stream: 资源使用安全-Resource Safety

    scalaz-stream是一个数据流处理工具库,对资源使用,包括:开启文件、连接网络、连接数据库等这些公共资源使用方面都必须确定使用过程的安全:要保证在作业终止时能进行事后处理程序(finalizer)来释放相关的文件、网络链接、数据库连接等。所谓作业终止包括正常的作业完成(End)、人工强行终止(Kill)及出现异常中断(Exception)。scalaz-stream并且保证了无论在数据产生的上游Source或者消费数据的下游Process都能在作业终止时运行上游Source的finalizer。scalaz-stream是按照下面的两种情况要求来设计它的finalizer启动程序的:

1、在数据产生源头环节可能开始占用资源,那么在这个环节的终止状态中必须保证运行事后处理程序

2、在消费数据的下游环节终止时必须能够运行由上游Process定义的事后处理程序

我们用一些例子来示范以上场景:

 1 //数据产生源
 2 val src = Process.emitAll(Seq("a","b","c")).toSource //> p  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Emit(List(a, b, c))
 3 //指定事后处理程序                                       
 4 val p1 = src.onComplete{Process.suspend{println("---RUN CLEANUP---");Process.halt}}
 5                                                   //> p1  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Append(Em
 6 //正常终止                                         //| it(List(a, b, c)),Vector(<function1>))
 7 p1.runLog.run                                     //> ---RUN CLEANUP---
 8 //提前强制终止                                      //| res0: Vector[String] = Vector(a, b, c)
 9 p1.take(2).runLog.run                             //> ---RUN CLEANUP---
10 //异常终止                                         //| res1: Vector[String] = Vector(a, b)
11 p1.map{_.toDouble}.runLog.run                     //> ---RUN CLEANUP---
12                                                   //| java.lang.NumberFormatException: For input string: "a"

在scalaz-stream里我们用onComplete来指定一个Source的事后处理程序(finalizer)。我们可以从上面的例子里看到Source状态在正常终止、提前终止、异常终止时都运行指定给Source自身的finalizer。Process.onComplete是这样定义的:

 /**
   * Run `p2` after this `Process` completes normally, or in the event of an error.
   * This behaves almost identically to `append`, except that `p1 append p2` will
   * not run `p2` if `p1` halts with an `Error` or is killed. Any errors raised by
   * `this` are reraised after `p2` completes.
   *
   * Note that `p2` is made into a finalizer using `asFinalizer`, so we
   * can be assured it is run even when this `Process` is being killed
   * by a downstream consumer.
   */
  final def onComplete[F2[x] >: F[x], O2 >: O](p2: => Process[F2, O2]): Process[F2, O2] =
    this.onHalt { cause => p2.asFinalizer.causedBy(cause) }

/**
   * When this `Process` halts, call `f` to produce the next state.
   * Note that this function may be used to swallow or handle errors.
   */
  final def onHalt[F2[x] >: F[x], O2 >: O](f: Cause => Process[F2, O2]): Process[F2, O2] = {
     val next = (t: Cause) => Trampoline.delay(Try(f(t)))
     this match {
       case (append: Append[F2, O2] @unchecked) => Append(append.head, append.stack :+ next)
       case emt@Emit(_)        => Append(emt, Vector(next))
       case awt@Await(_, _, _) => Append(awt, Vector(next))
       case hlt@Halt(rsn)      => Append(hlt, Vector(next))
     }
  }

/**
   * Mostly internal use function. Ensures this `Process` is run even
   * when being `kill`-ed. Used to ensure resource safety in various
   * combinators.
   */
  final def asFinalizer: Process[F, O] = {
    def mkAwait[F[_], A, O](req: F[A], cln: A => Trampoline[Process[F,Nothing]])(rcv: EarlyCause \/ A => Trampoline[Process[F, O]]) = Await(req, rcv,cln)
    step match {
      case Step(e@Emit(_), cont) => e onHalt {
        case Kill => (halt +: cont).asFinalizer.causedBy(Kill)
        case cause => (Halt(cause) +: cont).asFinalizer
      }
      case Step(Await(req, rcv, cln), cont) => mkAwait(req, cln) {
        case -\/(Kill) => Trampoline.delay(Await(req, rcv, cln).asFinalizer.causedBy(Kill))
        case x => rcv(x).map(p => (p +: cont).asFinalizer)
      }
      case hlt@Halt(_) => hlt
    }
  }

我们看到onComplete的作用是在当前Process进入终止状态时(正常或非正常)运行一个finalizer(p2.asFinalizer)。onHalt则将finalizer附加在当前状态后面。这样在当前状态为Halt时就会运行finalizer。asFinalizer保证即使是强行终止情况也会运行finalizer。那么如果下游的Process提前终止,是否会运行finalizer呢?

 1 //下游正常终止
 2 (p1 |> process1.filter(_ == true) |> process1.take(10)).runLog.run
 3                                                   //> ---RUN CLEANUP---
 4                                                   //| res3: Vector[String] = Vector()
 5 //下游提前终止
 6 (p1 |> process1.take(2)).runLog.run               //> ---RUN CLEANUP---
 7                                                   //| res4: Vector[String] = Vector(a, b)
 8 //隔层下游提前终止
 9 (p1 |> process1.id.map{_.toUpperCase} |> process1.take(2)).runLog.run
10                                                   //> ---RUN CLEANUP---
11                                                   //| res5: Vector[String] = Vector(A, B)
12 //下游异常终止
13 (p1 |> process1.id.map{_.toDouble}).runLog.run    //> ---RUN CLEANUP---
14                                                   //| java.lang.NumberFormatException: For input string: "a"

事实证明下游在任何终止情况下都会运行上游定义的finalizer。那么scalaz-stream是怎么做到从下游运行上游定义的finalizer呢?我想答案一定会跟这个|>符号的pipe函数有关:

/**
   * Feed the output of this `Process` as input of `p1`. The implementation
   * will fuse the two processes, so this process will only generate
   * values as they are demanded by `p1`. If `p1` signals termination, `this`
   * is killed with same reason giving it an opportunity to cleanup.
   */
  final def pipe[O2](p1: Process1[O, O2]): Process[F, O2] =
    p1.suspendStep.flatMap({ s1 =>
      s1 match {
        case s@Step(awt1@Await1(rcv1), cont1) =>
          val nextP1 = s.toProcess
          this.step match {
            case Step(awt@Await(_, _, _), cont) => awt.extend(p => (p +: cont) pipe nextP1)
            case Step(Emit(os), cont)           => cont.continue pipe process1.feed(os)(nextP1)
            case hlt@Halt(End)                  => hlt pipe nextP1.disconnect(Kill).swallowKill
            case hlt@Halt(rsn: EarlyCause)      => hlt pipe nextP1.disconnect(rsn)
          }

        case Step(emt@Emit(os), cont)      =>
          // When the pipe is killed from the outside it is killed at the beginning or after emit.
          // This ensures that Kill from the outside is not swallowed.
          emt onHalt {
            case End => this.pipe(cont.continue)
            case early => this.pipe(Halt(early) +: cont).causedBy(early)
          }

        case Halt(rsn)           => this.kill onHalt { _ => Halt(rsn) }
      }
    })

  /** Operator alias for `pipe`. */
  final def |>[O2](p2: Process1[O, O2]): Process[F, O2] = pipe(p2)

pipe函数的输入参数p1就是下游Process。当下游的p1状态是Halt(rsn)时,表示p1终结(提前或者正常),this.kill会将上游强制终结并运行上游onHalt函数。我们在上面的分析里已经知道Source的finalizer是在它的onHalt函数里运行的。这样就明确解释了为何在任何情况下都能保证finalizer的运行。 scalaz-stream在io对象里提供了一个linesR函数。我们可以用这个函数来读取文件系统里的文件:

1 val fileLines = io.linesR(s"/Users/TraverseUsage.scala")
2                                                   //> fileLines  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@6279cee3,<function1>,<function1>)
3 val lns = fileLines.onComplete(Process.eval[Task,String]{Task.delay{println("--FILE CLOSED--");""}})
4                                                   //> lns  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Append(Await(scalaz.concurrent.Task@6279cee3,<function1>,<function1>),Vector(<function1>))
5 
6 lns.take(3).runLog.run                            //> --FILE CLOSED--
7                                                   //| res6: Vector[String] = Vector(package scalaz.example, "", object TraverseUsage extends App {)
8 lns.map {_.toDouble}.runLog.run                   //> --FILE CLOSED--
9                                                   //| java.lang.NumberFormatException: empty String caused by: java.lang.NumberFormatException: For input string: "package scalaz.example"

我们看到这个文件的使用是安全的,因为在任何终结情况下都会自动关闭打开的文件。实际上linesR打开文件后已经指定了释放文件的方式,我们看看下面的源码:

/**
   * Creates a `Process[Task,String]` from the lines of a file, using
   * the `iteratorR` combinator to ensure the file is closed
   * when processing the stream of lines is finished.
   */
  def linesR(filename: String)(implicit codec: Codec): Process[Task,String] =
    linesR(Source.fromFile(filename)(codec))

  /**
   * Creates a `Process[Task,String]` from the lines of the `InputStream`,
   * using the `iteratorR` combinator to ensure the `InputStream` is closed
   * when processing the stream of lines is finished.
   */
  def linesR(in: => InputStream)(implicit codec: Codec): Process[Task,String] =
    linesR(Source.fromInputStream(in)(codec))

  /**
   * Creates a `Process[Task,String]` from the lines of the `Source`,
   * using the `iteratorR` combinator to ensure the `Source` is closed
   * when processing the stream of lines is finished.
   */
  def linesR(src: => Source): Process[Task,String] = {
    iteratorR(Task.delay(src))(src => Task.delay(src.close()))(r => Task.delay(r.getLines()))
  }

这个iteratorR就已经指定了finalizer:src=>Task.delay(src.close()):

/**
   * Create a Process from an iterator that is tied to some resource,
   * `R` (like a file handle) that we want to ensure is released.
   * See `linesR` for an example use.
   * @param req acquires the resource
   * @param release releases the resource
   * @param mkIterator creates the iterator from the resource
   * @tparam R is the resource
   * @tparam O is a value in the iterator
   * @return
   */
  def iteratorR[R, O](req: Task[R])(
                     release: R => Task[Unit])(
                     mkIterator: R => Task[Iterator[O]]): Process[Task, O] = {
    bracket[Task, R, O](req)(r => Process.eval_(release(r)))(r => iterator(mkIterator(r)) )
  }

iteratorR提供了req,mkIterator,release三个输入参数,分别是开启文件,读取数据及释放文件的方法。我们也可以直接用iteratorR来示范上面的文件数据读取例子:

1 val iterLines =
2     io.iteratorR(Task.delay{Source.fromFile(s"/Users/TraverseUsage.scala")})(
3                 src => Task.delay{src.close()})(
4                 r => Task.delay{r.getLines()})    //> iterLines  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1a0dcaa,<function1>,<function1>)
5 iterLines.take(5).runLog.run                      //> res7: Vector[String] = Vector(package scalaz.example, "", object TraverseUsage extends App {, "  import scalaz._", "")

这样来说将来我们可以用iteratorR来使用数据库,因为我们可以在这里指定数据库的连接、读写及关闭释放的具体方法。

实际运行finalizer的是这个bracket函数:

/**
   * Resource and preemption safe `await` constructor.
   *
   * Use this combinator, when acquiring resources. This build a process that when run
   * evaluates `req`, and then runs `rcv`. Once `rcv` is completed, fails, or is interrupted, it will run `release`
   *
   * When the acquisition (`req`) is interrupted, neither `release` or `rcv` is run, however when the req was interrupted after
   * resource in `req` was acquired then, the `release` is run.
   *
   * If,the acquisition fails, use `bracket(req)(onPreempt)(rcv).onFailure(err => ???)` code to recover from the
   * failure eventually.
   *
   */
  def bracket[F[_], A, O](req: F[A])(release: A => Process[F, Nothing])(rcv: A => Process[F, O]): Process[F, O] = {
    Await(req,
    { (r: EarlyCause \/ A) => Trampoline.delay(Try(r.fold(Halt.apply, a => rcv(a) onComplete release(a) ))) },
    { a: A => Trampoline.delay(release(a)) })
  }

bracket是个对数据进行逐行读写操作的函数。我们看到无论req的运算结果是成功a或失败r,release(a)都得以运行。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏杨建荣的学习笔记

同样的sql执行结果不同的原因分析 (r4笔记第27天)

今天开发的同事问我一个问题,说有一个sql语句,在weblogic的日志中执行没有结果,但是手动拷贝数据到客户端执行,却能够查到。这种奇怪的问题一下子就能引起我...

2838
来自专栏iOS技术杂谈

iOS多线程——你要知道的NSOperation都在这里你要知道的iOS多线程NSThread、GCD、NSOperation、RunLoop都在这里

你要知道的iOS多线程NSThread、GCD、NSOperation、RunLoop都在这里 转载请注明出处 https://cloud.tencent.co...

3195
来自专栏Golang语言社区

go rpc 源码分析

go 源码中带了rpc框架,以相对精简的当时方式实现了rpc功能,目前源码中的rpc官方已经宣布不再添加新功能,并推荐使用grpc. 作为go标准库中rpc框架...

1214
来自专栏技术博文

PHP的几个常用加密函数

在php的开发过程中,常常需要对部分数据(如用户密码)进行加密 一、加密类型: 1.单向散列加密   就是把任意长度的信息进行散列计算,得到固定长度的输出,这个...

3148
来自专栏Java架构沉思录

分布式ID常见解决方案

在分布式系统中,往往需要对大量的数据如订单、账户进行标识,以一个有意义的有序的序列号来作为全局唯一的ID。

2342
来自专栏智能大石头

实体处理模块IEntityModule

在2015年7月16日,XCode新增了实体处理模块IEntityModule,用于拦截实体对象添删改操作。 该接口参考IHttpModule设计理念,横切在实...

18610
来自专栏FD的专栏

一步步理解python的异步IO

看到越来越多的大佬都在使用python的异步IO,协程等概念来实现高效的IO处理过程,可是我对这些概念还不太懂,就学习了一下。 因为是初学者,在理解上有很多不到...

1042
来自专栏开发 & 算法杂谈

Zookeeper C API学习总结

客户端使用C语言开发,zookeeper提供了两个库,zookeeper_st(单线程库)以及zookeeper_mt(多线程库)。

3725
来自专栏Golang语言社区

使用golang 实现JSON-RPC2.0

远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,...

1873
来自专栏分布式系统进阶

Kafka中的消息操作的层级调用关系Kafka源码分析-汇总

982

扫码关注云+社区