FunDA(17)- 示范:异常处理与事后处理 - Exceptions handling and Finalizers

    作为一个能安全运行的工具库,为了保证占用资源的安全性,对异常处理(exception handling)和事后处理(final clean-up)的支持是不可或缺的。FunDA的数据流FDAPipeLine一般是通过读取数据库数据形成数据源开始的。为了保证每个数据源都能被安全的使用,FunDA提供了事后处理finalizing程序接口来实现数据流使用完毕后的清理及异常处理(error-handling)程序接口来捕获和处理使用过程中出现的异常情况。首先,事后处理程序(finalizer)保证了在任何情况下的FunDA数据流终止运算:包括元素耗尽,强制中断以及异常中断,finalizer都会被调用。在这篇讨论里我们将会测试和示范FunDA Exception-handling和Final-cleanup。下面的样板代码设定了一个静态集合数据源viewState和一个动态数据流streamState:

  val db = Database.forConfig("h2db")
  implicit def toState(row: StateTable#TableElementType) =
    StateModel(row.id,row.name)
  val viewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toState _)

  val stateSeq = viewLoader.fda_typedRows(StateQuery.result)(db).toSeq
  val viewState = fda_staticSource(stateSeq)(println("***Finally*** the end of viewState!!!"))
  val streamState = streamLoader.fda_typedStream(StateQuery.result)(db)(64,64)(println("***Finally*** the end of streamState!!!"))

在上面的代码例子里我们可以看到fda_staticSource和fad_typedStream都挂接了事后处理程序,我们简单的用println代表一段完整的程序来证实对事后处理程序的调用。所以说事后处理程序的挂接是在构建view或者stream时进行的。我们先看看它们在正常终止或者强行中断是是否发生调用:

  viewState.startRun
  viewState.take(2).startRun
  streamState.startRun
  streamState.take(3).startRun
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of streamState!!!
  //  ***Finally*** the end of streamState!!!

那么如果在出现了异常中断是是否同样会被调用呢?我们先设计下面两个用户自定义函数:

  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case m@_ => fda_next(m)
    }
  }

  def errorRow: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        val idx = id / (id - 3)
        fda_next(StateModel(idx,name))
      case m@_ => fda_next(m)
    }
  }

trackRows跟踪显示当前数据行,errorRow人为的会在第三行出现异常。我们用streamState来测试一下:

  streamState.appendTask(errorRow).appendTask(trackRows).startRun
//  State: 0 Alabama
//  ----------------
//  State: -2 Alaska
//  ----------------
//  Exception in thread "main" java.lang.ArithmeticException: / by zero
//  at examples.ExceptionsAndFinalizers$$anonfun$errorRow$1.apply(ExceptionsAndFinalizers.scala:46)
//  ...
//  at java.lang.Thread.run(Thread.java:745)
//  ***Finally*** the end of streamState!!!

的确在正常显示了两行数据后,第三行出错中断,直接调用了finalizer。这就保证了无论发生任何情况,当完成使用数据源后都给予编程人员一个空间去进行事后处理如释放资源、中断连接、关闭文件等。

我们可以用onError来挂接异常处理程序,如下:

   val s = streamState.appendTask(errorRow).appendTask(trackRows)
   val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}

注意:onError必须挂接在stream的最尾端以确保所有环节的异常情况都可以正确地得到处理。看看运行结果:

State: 0 Alabama
----------------
State: -2 Alaska
----------------
***Finally*** the end of streamState!!!
Caught Error in streamState!!![/ by zero]

以上例子捕获了异常情况,同时在异常中断情况后还是调用了finalizer。

有时我们需要自定义一些特殊情况,我们希望能捕获这些情况的发生。但我们同时希望这些情况发生时不会中断运算。首先我们可以先自定义一个异常行类型:

  case class DivideZeroError(msg: String, e: Exception) extends FDAROW

注意:切不可忘记extends FDAROW。我们把上面的errorRow函数修改成一个自捕获异常的函数:

 def catchError: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        try {
          val idx = id / (id - 3)
          fda_next(StateModel(idx, name))
        } catch {
          case e: Exception => //pass an error row
            fda_next(DivideZeroError(s"Divide by zero excption at ${id}",e))
        }
      case m@_ => fda_next(m)
    }
  }

必须修改trackRows能分辨DivideZeroError行:

  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case DivideZeroError(msg, e) => //error row
        println(s"***Error:$msg***")
        fda_skip
      case m@_ => fda_next(m)
    }
  }

运算下面的程序:

  val s = streamState.take(5).appendTask(catchError).appendTask(trackRows)
  val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
  s1.startRun

产生下面的结果:

State: 0 Alabama
----------------
State: -2 Alaska
----------------
***Error:Divide by zero excption at 3***
State: 4 Arkansas
----------------
State: 2 California
----------------
***Finally*** the end of streamState!!!

Process finished with exit code 0

没有出现异常中断,捕获并处理了自定义异常,并且调用了事后处理程序finalizer。

下面就是这次示范的源代码:

import slick.jdbc.H2Profile.api._
import com.bayakala.funda.samples.SlickModels._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions

object ExceptionsAndFinalizers extends App {

  val db = Database.forConfig("h2db")
  implicit def toState(row: StateTable#TableElementType) =
    StateModel(row.id,row.name)
  val viewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toState _)

  val stateSeq = viewLoader.fda_typedRows(StateQuery.result)(db).toSeq
  val viewState = fda_staticSource(stateSeq)(println("***Finally*** the end of viewState!!!"))
  val streamState = streamLoader.fda_typedStream(StateQuery.result)(db)(64,64)(println("***Finally*** the end of streamState!!!"))

/*
  viewState.startRun
  viewState.take(2).startRun
  streamState.startRun
  streamState.take(3).startRun
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of streamState!!!
  //  ***Finally*** the end of streamState!!!
*/



  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case DivideZeroError(msg, e) => //error row
        println(s"***Error:$msg***")
        fda_skip
      case m@_ => fda_next(m)
    }
  }

  def errorRow: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        val idx = id / (id - 3)
        fda_next(StateModel(idx,name))
      case m@_ => fda_next(m)
    }
  }

  case class DivideZeroError(msg: String, e: Exception) extends FDAROW
  def catchError: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        try {
          val idx = id / (id - 3)
          fda_next(StateModel(idx, name))
        } catch {
          case e: Exception => //pass an error row
            fda_next(DivideZeroError(s"Divide by zero excption at ${id}",e))
        }
      case m@_ => fda_next(m)
    }
  }



  /*
  streamState.appendTask(errorRow).appendTask(trackRows).startRun
//  State: 0 Alabama
//  ----------------
//  State: -2 Alaska
//  ----------------
//  Exception in thread "main" java.lang.ArithmeticException: / by zero
//  at examples.ExceptionsAndFinalizers$$anonfun$errorRow$1.apply(ExceptionsAndFinalizers.scala:46)
//  ...
//  at java.lang.Thread.run(Thread.java:745)
//  ***Finally*** the end of streamState!!!
*/
  /*
   val v = viewState.appendTask(errorRow).appendTask(trackRows)
   val v1 = v.onError {case e: Exception => println(s"Caught Error in viewState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
   v1.startRun

   val s = streamState.appendTask(errorRow).appendTask(trackRows)
   val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
   s1.startRun
  */

  val s = streamState.take(5).appendTask(catchError).appendTask(trackRows)
  val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
  s1.startRun


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏崔庆才的专栏

Scrapy框架的使用之Spider Middleware的用法

2134
来自专栏企鹅号快讯

聊一聊 Spring 中的线程安全性

Spring与线程安全 Spring作为一个IOC/DI容器,帮助我们管理了许许多多的“bean”。但其实,Spring并没有保证这些对象的线程安全,需要由开发...

1906
来自专栏张高兴的博客

张高兴的 Windows 10 IoT 开发笔记:使用 MAX7219 驱动数码管

3435
来自专栏开发技术

结合ThreadLocal来看spring事务源码,感受下清泉般的洗涤!

  在我的博客spring事务源码解析中,提到了一个很关键的点:将connection绑定到当前线程来保证这个线程中的数据库操作用的是同一个connection...

981
来自专栏Spark生态圈

[spark] TaskScheduler 任务提交与调度源码解析

在DAGScheduler划分为Stage并以TaskSet的形式提交给TaskScheduler后,再由TaskScheduler通过TaskSetMagag...

1563
来自专栏码匠的流水账

聊聊Spring Data Auditable接口的变化

spring-data-commons-1.12.8.RELEASE-sources.jar!/org/springframework/data/domain/...

642
来自专栏服务端技术杂谈

InheritableThreadLocal源码阅读

在进行多线程编程时,我们经常需要线程池子线程和父线程进行ThreadLocal信息传递,实现一些业务处理。 先看一个例子 public class App { ...

2724
来自专栏XAI

微信公众号发送模板消息 Java实现。

本博文是测试公众号调用模板接口测试。请不要完全复制我的代码。里面的测试代码中有本人测试号的微信模板id。麻烦替换成自己的可以吗? 第一步:创建模板信息 ? ? ...

51511
来自专栏码匠的流水账

聊聊spring cloud gateway的PrefixPath及StripPrefix功能

本文主要研究一下spring cloud gateway的PrefixPath及StripPrefix功能

1032
来自专栏Spark生态圈

[spark] 数据本地化及延迟调度

Spark数据本地化即移动计算而不是移动数据,而现实又是残酷的,不是想要在数据块的地方计算就有足够的资源提供,为了让task能尽可能的以最优本地化级别(Loca...

1332

扫码关注云+社区