FunDA(17)- 示范:异常处理与事后处理 - Exceptions handling and Finalizers

    作为一个能安全运行的工具库,为了保证占用资源的安全性,对异常处理(exception handling)和事后处理(final clean-up)的支持是不可或缺的。FunDA的数据流FDAPipeLine一般是通过读取数据库数据形成数据源开始的。为了保证每个数据源都能被安全的使用,FunDA提供了事后处理finalizing程序接口来实现数据流使用完毕后的清理及异常处理(error-handling)程序接口来捕获和处理使用过程中出现的异常情况。首先,事后处理程序(finalizer)保证了在任何情况下的FunDA数据流终止运算:包括元素耗尽,强制中断以及异常中断,finalizer都会被调用。在这篇讨论里我们将会测试和示范FunDA Exception-handling和Final-cleanup。下面的样板代码设定了一个静态集合数据源viewState和一个动态数据流streamState:

  val db = Database.forConfig("h2db")
  implicit def toState(row: StateTable#TableElementType) =
    StateModel(row.id,row.name)
  val viewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toState _)

  val stateSeq = viewLoader.fda_typedRows(StateQuery.result)(db).toSeq
  val viewState = fda_staticSource(stateSeq)(println("***Finally*** the end of viewState!!!"))
  val streamState = streamLoader.fda_typedStream(StateQuery.result)(db)(64,64)(println("***Finally*** the end of streamState!!!"))

在上面的代码例子里我们可以看到fda_staticSource和fad_typedStream都挂接了事后处理程序,我们简单的用println代表一段完整的程序来证实对事后处理程序的调用。所以说事后处理程序的挂接是在构建view或者stream时进行的。我们先看看它们在正常终止或者强行中断是是否发生调用:

  viewState.startRun
  viewState.take(2).startRun
  streamState.startRun
  streamState.take(3).startRun
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of streamState!!!
  //  ***Finally*** the end of streamState!!!

那么如果在出现了异常中断是是否同样会被调用呢?我们先设计下面两个用户自定义函数:

  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case m@_ => fda_next(m)
    }
  }

  def errorRow: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        val idx = id / (id - 3)
        fda_next(StateModel(idx,name))
      case m@_ => fda_next(m)
    }
  }

trackRows跟踪显示当前数据行,errorRow人为的会在第三行出现异常。我们用streamState来测试一下:

  streamState.appendTask(errorRow).appendTask(trackRows).startRun
//  State: 0 Alabama
//  ----------------
//  State: -2 Alaska
//  ----------------
//  Exception in thread "main" java.lang.ArithmeticException: / by zero
//  at examples.ExceptionsAndFinalizers$$anonfun$errorRow$1.apply(ExceptionsAndFinalizers.scala:46)
//  ...
//  at java.lang.Thread.run(Thread.java:745)
//  ***Finally*** the end of streamState!!!

的确在正常显示了两行数据后,第三行出错中断,直接调用了finalizer。这就保证了无论发生任何情况,当完成使用数据源后都给予编程人员一个空间去进行事后处理如释放资源、中断连接、关闭文件等。

我们可以用onError来挂接异常处理程序,如下:

   val s = streamState.appendTask(errorRow).appendTask(trackRows)
   val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}

注意:onError必须挂接在stream的最尾端以确保所有环节的异常情况都可以正确地得到处理。看看运行结果:

State: 0 Alabama
----------------
State: -2 Alaska
----------------
***Finally*** the end of streamState!!!
Caught Error in streamState!!![/ by zero]

以上例子捕获了异常情况,同时在异常中断情况后还是调用了finalizer。

有时我们需要自定义一些特殊情况,我们希望能捕获这些情况的发生。但我们同时希望这些情况发生时不会中断运算。首先我们可以先自定义一个异常行类型:

  case class DivideZeroError(msg: String, e: Exception) extends FDAROW

注意:切不可忘记extends FDAROW。我们把上面的errorRow函数修改成一个自捕获异常的函数:

 def catchError: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        try {
          val idx = id / (id - 3)
          fda_next(StateModel(idx, name))
        } catch {
          case e: Exception => //pass an error row
            fda_next(DivideZeroError(s"Divide by zero excption at ${id}",e))
        }
      case m@_ => fda_next(m)
    }
  }

必须修改trackRows能分辨DivideZeroError行:

  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case DivideZeroError(msg, e) => //error row
        println(s"***Error:$msg***")
        fda_skip
      case m@_ => fda_next(m)
    }
  }

运算下面的程序:

  val s = streamState.take(5).appendTask(catchError).appendTask(trackRows)
  val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
  s1.startRun

产生下面的结果:

State: 0 Alabama
----------------
State: -2 Alaska
----------------
***Error:Divide by zero excption at 3***
State: 4 Arkansas
----------------
State: 2 California
----------------
***Finally*** the end of streamState!!!

Process finished with exit code 0

没有出现异常中断,捕获并处理了自定义异常,并且调用了事后处理程序finalizer。

下面就是这次示范的源代码:

import slick.jdbc.H2Profile.api._
import com.bayakala.funda.samples.SlickModels._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions

object ExceptionsAndFinalizers extends App {

  val db = Database.forConfig("h2db")
  implicit def toState(row: StateTable#TableElementType) =
    StateModel(row.id,row.name)
  val viewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
  val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toState _)

  val stateSeq = viewLoader.fda_typedRows(StateQuery.result)(db).toSeq
  val viewState = fda_staticSource(stateSeq)(println("***Finally*** the end of viewState!!!"))
  val streamState = streamLoader.fda_typedStream(StateQuery.result)(db)(64,64)(println("***Finally*** the end of streamState!!!"))

/*
  viewState.startRun
  viewState.take(2).startRun
  streamState.startRun
  streamState.take(3).startRun
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of viewState!!!
  //  ***Finally*** the end of streamState!!!
  //  ***Finally*** the end of streamState!!!
*/



  def trackRows: FDAUserTask[FDAROW] = row => {
    row match {
      case m@StateModel(id,name) =>
        println(s"State: $id $name")
        println( "----------------")
        fda_next(m)
      case DivideZeroError(msg, e) => //error row
        println(s"***Error:$msg***")
        fda_skip
      case m@_ => fda_next(m)
    }
  }

  def errorRow: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        val idx = id / (id - 3)
        fda_next(StateModel(idx,name))
      case m@_ => fda_next(m)
    }
  }

  case class DivideZeroError(msg: String, e: Exception) extends FDAROW
  def catchError: FDAUserTask[FDAROW] = row => {
    row match {
      case StateModel(id,name) =>
        try {
          val idx = id / (id - 3)
          fda_next(StateModel(idx, name))
        } catch {
          case e: Exception => //pass an error row
            fda_next(DivideZeroError(s"Divide by zero excption at ${id}",e))
        }
      case m@_ => fda_next(m)
    }
  }



  /*
  streamState.appendTask(errorRow).appendTask(trackRows).startRun
//  State: 0 Alabama
//  ----------------
//  State: -2 Alaska
//  ----------------
//  Exception in thread "main" java.lang.ArithmeticException: / by zero
//  at examples.ExceptionsAndFinalizers$$anonfun$errorRow$1.apply(ExceptionsAndFinalizers.scala:46)
//  ...
//  at java.lang.Thread.run(Thread.java:745)
//  ***Finally*** the end of streamState!!!
*/
  /*
   val v = viewState.appendTask(errorRow).appendTask(trackRows)
   val v1 = v.onError {case e: Exception => println(s"Caught Error in viewState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
   v1.startRun

   val s = streamState.appendTask(errorRow).appendTask(trackRows)
   val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
   s1.startRun
  */

  val s = streamState.take(5).appendTask(catchError).appendTask(trackRows)
  val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
  s1.startRun


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏

即时通信服务器架构的一些思考

对于一个即时通信服务器来说,在用户量少的时候,一台服务器就足以提供所有的服务。而这种架构也最简单,举个例子,用户A与用户B互为好友,A向B发消息,服务器接收到消...

1855
来自专栏JavaQ

分布式锁那点事

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。本篇将详细分析分布式锁的三种实现方式及其优化方案,分析其优缺点。 为什么要使用分布式锁 为了保证一个方...

36412
来自专栏涤生的博客

服务框架之注册中心,你不知道的内幕

前一篇服务框架技术栈粗略分析了服务框架需要的各个核心模块,首先提到的就是注册中心,注册中心实现了服务注册和发现的功能,在服务框架中也发挥着重要的作用。今天主要围...

712
来自专栏PingCAP的专栏

TiDB 源码阅读系列文章(十八)tikv-client(上)

在整个 SQL 执行过程中,需要经过 Parser,Optimizer,Executor,DistSQL 这几个主要的步骤,最终数据的读写是通过 tikv-cl...

140
来自专栏IT技术精选文摘

架构师详解:Nginx 架构

1044
来自专栏JAVA同学会

MongoDB之分片集群(Sharding)

分片(sharding)是一个通过多台机器分配数据的方法。MongoDB使用分片支持大数据集和高吞吐量的操作。大数据集和高吞吐量的数据库系统挑战着单一服务的性能...

563
来自专栏magicsoar

mysql复制

一、复制的意义 mysql的复制功能是构建基于MySql大规模,高性能应用的基础,我们可以通过为服务器配置一个或多个备库来进行数据同步;复制功能不仅有利于构建高...

1887
来自专栏Java技术分享

集群分片

复制的问题  由于复制中,每个数据库都是拥有完整的数据,因此复制的总数据存储量受限于内存最小的数据库节点,如果数据量过大,复制就无能为力了。 分片 分片(Pat...

1678
来自专栏程序员的SOD蜜

分布式系统的消息&服务模式简单总结

分布式系统的消息&服务模式简单总结 在一个分布式系统中,有各种消息的处理,有各种服务模式,有同步异步,有高并发问题甚至应对高并发问题的Actor编程模型,本文尝...

3447
来自专栏

即时通信服务器架构的一些思考

对于一个即时通信服务器来说,在用户量少的时候,一台服务器就足以提供所有的服务。而这种架构也最简单,举个例子,用户A与用户B互为好友,A向B发消息,服务器接收到消...

1819

扫码关注云+社区