FunDA(17)- 示范:异常处理与事后处理 - Exceptions handling and Finalizers

    作为一个能安全运行的工具库,为了保证占用资源的安全性,对异常处理(exception handling)和事后处理(final clean-up)的支持是不可或缺的。FunDA的数据流FDAPipeLine一般是通过读取数据库数据形成数据源开始的。为了保证每个数据源都能被安全的使用,FunDA提供了事后处理finalizing程序接口来实现数据流使用完毕后的清理及异常处理(error-handling)程序接口来捕获和处理使用过程中出现的异常情况。首先,事后处理程序(finalizer)保证了在任何情况下的FunDA数据流终止运算:包括元素耗尽,强制中断以及异常中断,finalizer都会被调用。在这篇讨论里我们将会测试和示范FunDA Exception-handling和Final-cleanup。下面的样板代码设定了一个静态集合数据源viewState和一个动态数据流streamState:

  val db = Database.forConfig("h2db")
  implicit def toState(row: StateTable#TableElementType) =
    StateModel(row.id,row.name)
  val viewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toState _)

  val stateSeq = viewLoader.fda_typedRows(StateQuery.result)(db).toSeq
  val viewState = fda_staticSource(stateSeq)(println("***Finally*** the end of viewState!!!"))
  val streamState = streamLoader.fda_typedStream(StateQuery.result)(db)(64,64)(println("***Finally*** the end of streamState!!!"))

在上面的代码例子里我们可以看到fda_staticSource和fad_typedStream都挂接了事后处理程序,我们简单的用println代表一段完整的程序来证实对事后处理程序的调用。所以说事后处理程序的挂接是在构建view或者stream时进行的。我们先看看它们在正常终止或者强行中断是是否发生调用:

  viewState.startRun
  viewState.take(2).startRun
  streamState.startRun
  streamState.take(3).startRun
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of streamState!!!
  //  ***Finally*** the end of streamState!!!

那么如果在出现了异常中断是是否同样会被调用呢?我们先设计下面两个用户自定义函数:

  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case m@_ => fda_next(m)
    }
  }

  def errorRow: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        val idx = id / (id - 3)
        fda_next(StateModel(idx,name))
      case m@_ => fda_next(m)
    }
  }

trackRows跟踪显示当前数据行,errorRow人为的会在第三行出现异常。我们用streamState来测试一下:

  streamState.appendTask(errorRow).appendTask(trackRows).startRun
//  State: 0 Alabama
//  ----------------
//  State: -2 Alaska
//  ----------------
//  Exception in thread "main" java.lang.ArithmeticException: / by zero
//  at examples.ExceptionsAndFinalizers$$anonfun$errorRow$1.apply(ExceptionsAndFinalizers.scala:46)
//  ...
//  at java.lang.Thread.run(Thread.java:745)
//  ***Finally*** the end of streamState!!!

的确在正常显示了两行数据后,第三行出错中断,直接调用了finalizer。这就保证了无论发生任何情况,当完成使用数据源后都给予编程人员一个空间去进行事后处理如释放资源、中断连接、关闭文件等。

我们可以用onError来挂接异常处理程序,如下:

   val s = streamState.appendTask(errorRow).appendTask(trackRows)
   val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}

注意:onError必须挂接在stream的最尾端以确保所有环节的异常情况都可以正确地得到处理。看看运行结果:

State: 0 Alabama
----------------
State: -2 Alaska
----------------
***Finally*** the end of streamState!!!
Caught Error in streamState!!![/ by zero]

以上例子捕获了异常情况,同时在异常中断情况后还是调用了finalizer。

有时我们需要自定义一些特殊情况,我们希望能捕获这些情况的发生。但我们同时希望这些情况发生时不会中断运算。首先我们可以先自定义一个异常行类型:

  case class DivideZeroError(msg: String, e: Exception) extends FDAROW

注意:切不可忘记extends FDAROW。我们把上面的errorRow函数修改成一个自捕获异常的函数:

 def catchError: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        try {
          val idx = id / (id - 3)
          fda_next(StateModel(idx, name))
        } catch {
          case e: Exception => //pass an error row
            fda_next(DivideZeroError(s"Divide by zero excption at ${id}",e))
        }
      case m@_ => fda_next(m)
    }
  }

必须修改trackRows能分辨DivideZeroError行:

  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case DivideZeroError(msg, e) => //error row
        println(s"***Error:$msg***")
        fda_skip
      case m@_ => fda_next(m)
    }
  }

运算下面的程序:

  val s = streamState.take(5).appendTask(catchError).appendTask(trackRows)
  val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
  s1.startRun

产生下面的结果:

State: 0 Alabama
----------------
State: -2 Alaska
----------------
***Error:Divide by zero excption at 3***
State: 4 Arkansas
----------------
State: 2 California
----------------
***Finally*** the end of streamState!!!

Process finished with exit code 0

没有出现异常中断,捕获并处理了自定义异常,并且调用了事后处理程序finalizer。

下面就是这次示范的源代码:

import slick.jdbc.H2Profile.api._
import com.bayakala.funda.samples.SlickModels._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions

object ExceptionsAndFinalizers extends App {

  val db = Database.forConfig("h2db")
  implicit def toState(row: StateTable#TableElementType) =
    StateModel(row.id,row.name)
  val viewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toState _)

  val stateSeq = viewLoader.fda_typedRows(StateQuery.result)(db).toSeq
  val viewState = fda_staticSource(stateSeq)(println("***Finally*** the end of viewState!!!"))
  val streamState = streamLoader.fda_typedStream(StateQuery.result)(db)(64,64)(println("***Finally*** the end of streamState!!!"))

/*
  viewState.startRun
  viewState.take(2).startRun
  streamState.startRun
  streamState.take(3).startRun
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of streamState!!!
  //  ***Finally*** the end of streamState!!!
*/



  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case DivideZeroError(msg, e) => //error row
        println(s"***Error:$msg***")
        fda_skip
      case m@_ => fda_next(m)
    }
  }

  def errorRow: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        val idx = id / (id - 3)
        fda_next(StateModel(idx,name))
      case m@_ => fda_next(m)
    }
  }

  case class DivideZeroError(msg: String, e: Exception) extends FDAROW
  def catchError: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        try {
          val idx = id / (id - 3)
          fda_next(StateModel(idx, name))
        } catch {
          case e: Exception => //pass an error row
            fda_next(DivideZeroError(s"Divide by zero excption at ${id}",e))
        }
      case m@_ => fda_next(m)
    }
  }



  /*
  streamState.appendTask(errorRow).appendTask(trackRows).startRun
//  State: 0 Alabama
//  ----------------
//  State: -2 Alaska
//  ----------------
//  Exception in thread "main" java.lang.ArithmeticException: / by zero
//  at examples.ExceptionsAndFinalizers$$anonfun$errorRow$1.apply(ExceptionsAndFinalizers.scala:46)
//  ...
//  at java.lang.Thread.run(Thread.java:745)
//  ***Finally*** the end of streamState!!!
*/
  /*
   val v = viewState.appendTask(errorRow).appendTask(trackRows)
   val v1 = v.onError {case e: Exception => println(s"Caught Error in viewState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
   v1.startRun

   val s = streamState.appendTask(errorRow).appendTask(trackRows)
   val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
   s1.startRun
  */

  val s = streamState.take(5).appendTask(catchError).appendTask(trackRows)
  val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
  s1.startRun


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Kirito的技术分享

Spring Data Redis(二)--序列化

默认序列化方案 在上一篇文章《Spring Data Redis(一)》中,我们执行了这样一个操作: redisTemplate.opsForValue().s...

59711
来自专栏.NET开发那点事

IoC原理-使用反射/Emit来实现一个最简单的IoC容器

从Unity到Spring.Net,到Ninject,几年来陆陆续续用过几个IoC框架。虽然会用,但也没有一直仔细的研究过IoC实现的过程。最近花了点时间,下了...

19410
来自专栏码匠的流水账

聊聊HystrixCircuitBreaker

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixCircuitBreaker.java

531
来自专栏搜云库

Spring Boot 中使用 Java API 调用 lucene

Lucene是apache软件基金会4 jakarta项目组的一个子项目,是一个开放源代码的全文检索引擎工具包,但它不是一个完整的全文检索引擎,而是一个全文检索...

3635
来自专栏大内老A

ASP.NET MVC基于标注特性的Model验证:一个Model,多种验证规则

对于Model验证,理想的设计应该是场景驱动的,而不是Model(类型)驱动的,也就是对于同一个Model对象,在不同的使用场景中可能具有不同的验证规则。举个简...

16310
来自专栏猿天地

Netty粘包拆包解决方案

前言 本篇文章是Netty专题的第六篇,前面五篇文章如下: 高性能NIO框架Netty入门篇 高性能NIO框架Netty-对象传输 高性能NIO框架Netty...

3837
来自专栏函数式编程语言及工具

FunDA(16)- 示范:整合并行运算 - total parallelism solution

   在对上两篇讨论中我们介绍了并行运算的两种体现方式:并行构建数据源及并行运算用户自定义函数。我们分别对这两部分进行了示范。本篇我准备示范把这两种情况集成一体...

18110
来自专栏函数式编程语言及工具

FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading

   FunDA的并行数据库读取功能是指在多个线程中同时对多个独立的数据源进行读取。这些独立的数据源可以是在不同服务器上的数据库表,又或者把一个数据库表分成几个...

1889
来自专栏Kirito的技术分享

深入理解RPC之序列化篇--总结

上一篇《深入理解RPC之序列化篇--Kryo》,介绍了序列化的基础概念,并且详细介绍了Kryo的一系列特性,在这一篇中,简略的介绍其他常用的序列化器,并对它们进...

8687
来自专栏偏前端工程师的驿站

GridView实战二:使用ObjectDataSource数据源控件

前言:   ObjectDataSource数据源控件优点甚多,确实令人爱不惜手,但不支持重绑定这一项确实让人失望。下面的实战二将通过ObjectDataSou...

22710

扫码关注云+社区