Akka(26): Stream:异常处理-Exception handling

   akka-stream是基于Actor模式的,所以也继承了Actor模式的“坚韧性(resilient)”特点,在任何异常情况下都有某种整体统一的异常处理策略和具体实施方式。在akka-stream的官方文件中都有详细的说明和示范例子。我们在这篇讨论里也没有什么更好的想法和范例,也只能略做一些字面翻译和分析理解的事了。下面列出了akka-stream处理异常的一些实用方法:

1、recover:这是一个函数,发出数据流最后一个元素然后根据上游发生的异常终止当前数据流

2、recoverWithRetries:也是个函数,在上游发生异常后改选用后备数据流作为上游继续运行

3、Backoff restart strategy:是RestartSource,RestartFlow,RestartSink的一个属性。为它们提供“逐步延迟重启策略”

4、Supervision strategy:是数据流构件的“异常监管策略”属性。为发生异常的功能阶段Stage提供异常情况处理方法

下面我们就用一些代码例子来示范它们的使用方法:

1、recover:Flow[T].recover函数的款式如下:

  /**
   * Recover allows to send last element on failure and gracefully complete the stream
   * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
   * This stage can recover the failure signal, but not the skipped elements, which will be dropped.
   *
   * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
   *
   * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
   *
   * '''Backpressures when''' downstream backpressures
   *
   * '''Completes when''' upstream completes or upstream failed with exception pf can handle
   *
   * '''Cancels when''' downstream cancels
   *
   */
  def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = via(Recover(pf))

下面是一个用例:

  Source(0 to 10).map { n =>
    if (n < 5) n.toString
    else throw new Exception("Boooommm!")
  }.recover{
    case e: Exception => s"truncate stream: ${e.getMessage}"
  }.runWith(Sink.foreach(println))

运算结果:

0
1
2
3
4
truncate stream: Boooommm!

2、recoverWithRetries:看看它的函数款式:

 /**
   * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
   * a failure has been recovered up to `attempts` number of times so that each time there is a failure
   * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
   * attempt to recover at all.
   *
   * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
   *
   * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
   * This stage can recover the failure signal, but not the skipped elements, which will be dropped.
   *
   * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically.
   *
   * '''Emits when''' element is available from the upstream or upstream is failed and element is available
   * from alternative Source
   *
   * '''Backpressures when''' downstream backpressures
   *
   * '''Completes when''' upstream completes or upstream failed with exception pf can handle
   *
   * '''Cancels when''' downstream cancels
   *
   * @param attempts Maximum number of retries or -1 to retry indefinitely
   * @param pf Receives the failure cause and returns the new Source to be materialized if any
   * @throws IllegalArgumentException if `attempts` is a negative number other than -1
   *
   */
  def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] =
    via(new RecoverWith(attempts, pf))

attempts代表发生异常过程中尝试恢复次数,0代表不尝试恢复,直接异常中断。<0代表无限尝试次数。下面是一个用例示范: 

 val backupSource = Source(List("five","six","seven","eight","nine"))
  Source(0 to 10).map { n =>
    if (n < 5) n.toString
    else throw new RuntimeException("Boooommm!")
  }.recoverWithRetries(attempts = 1, {
      case e: RuntimeException => backupSource
    }
  ).runWith(Sink.foreach(println))

运算结果:

0
1
2
3
4
five
six
seven
eight
nine

3、Backoff-Restart-Strategy:aka-stream预设定了RestartSource,RestartFlow,RestartSink来在Source,Flow,Sink节点实现“逐步延迟重启策略”,即采取一种逐步延后重启时间点的方式来避免多个进程同时争取某一项资源。下面是这三个类型的定义:

/**
 * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
 *
 * They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for
 * example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The
 * RestartSource ensures that the graph can continue running while the [[Source]] restarts.
 */
object RestartSource {

  /**
   * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential
   * backoff.
   *
   * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
   * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
   * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
   * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right
   * after this [[Source]] in the graph.
   *
   * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
   *
   * @param minBackoff minimum (initial) duration until the child actor will
   *   started again, if it is terminated
   * @param maxBackoff the exponential back-off is capped to this duration
   * @param randomFactor after calculation of the exponential back-off an additional
   *   random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
   *   In order to skip this additional delay pass in `0`.
   * @param sourceFactory A factory for producing the [[Source]] to wrap.
   */
  def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = {
    Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor))
  }
}

/**
 * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
 *
 * They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for
 * example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The
 * RestartFlow ensures that the graph can continue running while the [[Flow]] restarts.
 */
object RestartFlow {

  /**
   * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential
   * backoff.
   *
   * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
   * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination
   * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]]
   * will be allowed to terminate without being restarted.
   *
   * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
   * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
   * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
   *
   * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
   *
   * @param minBackoff minimum (initial) duration until the child actor will
   *   started again, if it is terminated
   * @param maxBackoff the exponential back-off is capped to this duration
   * @param randomFactor after calculation of the exponential back-off an additional
   *   random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
   *   In order to skip this additional delay pass in `0`.
   * @param flowFactory A factory for producing the [[Flow]] to wrap.
   */
  def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () ⇒ Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
    Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor))
  }
}

/**
 * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails.
 *
 * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for
 * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The
 * RestartSink ensures that the graph can continue running while the [[Sink]] restarts.
 */
object RestartSink {

  /**
   * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential
   * backoff.
   *
   * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it.
   * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that
   * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered
   * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the
   * graph.
   *
   * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
   * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already
   * sent may have been lost.
   *
   * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
   *
   * @param minBackoff minimum (initial) duration until the child actor will
   *   started again, if it is terminated
   * @param maxBackoff the exponential back-off is capped to this duration
   * @param randomFactor after calculation of the exponential back-off an additional
   *   random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
   *   In order to skip this additional delay pass in `0`.
   * @param sinkFactory A factory for producing the [[Sink]] to wrap.
   */
  def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = {
    Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor))
  }
}

注意这些withBackoff[T]中的sourceFactor,flowFactor,sinkFactory,是它们构建了目标构件。下面我们就虚构一个由RestartSource,RestartFlow,RestartSink合组成的数据流:

  val backoffSource = RestartSource.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ){ () => Source(List("FileA","FileB","FileC"))}

  val backoffFlow = RestartFlow.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ){ () => Flow[String].map(_.toUpperCase())}

  val backoffSink = RestartSink.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ){ () => Sink.foreach(println)}

  backoffSource.via(backoffFlow).to(backoffSink).run()

当然,在现实应用中这几个构件都可能涉及到一些资源的占用,如数据库、网络服务等。下面是运算结果显示:

FILEA
FILEB
FILEC
FILEA
FILEB
FILEC
FILEA
FILEB
FILEC

这个stream是重复循环的。我们只有通过KillSwitch来手动终止它:

  val killSwitch = backoffSource.viaMat(KillSwitches.single)(Keep.right)
    .viaMat(backoffFlow)(Keep.left)
    .toMat(backoffSink)(Keep.left)
    .run()

  Thread.sleep(1000)
  killSwitch.shutdown()

4、Supervisor-Strategy:这种模式是受Actor监管策略模式的启发,在aka-stream的一些功能节点Stage上实现的。对于某些功能节点Stage来说,可能这种监管模式就根本不适用,如连接外部系统的Stage,因为造成异常失败的因素可能还是会重复造成异常。对于出现异常的stream,Supervisor-Strategy提供了三种处理方法:

Stop:终结stream,返回异常

Resume:越过当前元素,继续运行

Restart:重新启动、越过当前元素、清除任何内部状态

akka-stream的默认异常处理方式是Stop,即立即终止数据流,返回异常。

我们可以通过ActorMaterializerSettings().withSupervisionStrategy以及Flow[T].withAttributes(ActorAttributes.withSupervisionStrategy来设定异常监管策略。下面这个例子使用了ActorMaterializerSettings来设定Supervision:

 implicit val mat2 = ActorMaterializer(
    ActorMaterializerSettings(sys).withSupervisionStrategy(decider)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  Source(1 to 5).map { n =>
    if (n != 3) n.toString
    else throw new ArithmeticException("no 3 please!")
  }.runWith(Sink.foreach(println))

  Thread.sleep(1000)
  println("")
  Thread.sleep(1000)

  Source(1 to 5).map { n =>
    if (n != 5) n.toString
    else throw new Exception("no 3 please!")
  }.runWith(Sink.foreach(println))

上面两个stream分别示范了Resume和Stop策略的效果,如下:

1
2
4
5

1
2
3
4

在下面的这个例子里我们在Flow构件的属性Attributes里设定了SupervisionStrategy:

  val decider : Supervision.Decider = {
    case _: IllegalArgumentException => Supervision.Restart
    case _ => Supervision.Stop
  }
  val flow = Flow[Int]
    .scan(0) { (acc, elem) =>
      if (elem < 0) throw new IllegalArgumentException("negative not allowed")
      else acc + elem
    }.withAttributes(ActorAttributes.supervisionStrategy(decider))

  Source(List(1, 3, -1, 5, 7)).via(flow)
    .runWith(Sink.foreach(println))

以上例子中对异常采用了Restart。从下面的运算结果中我们确定了Restart在重启过程中清除了内部状态,也就是说从发生异常的位置开始重新进行计算了:

0
1
4
0
5
12

好了,下面是这次示范涉及的完整源代码:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.duration._

object ExceptionHandling extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

/*
  Source(0 to 10).map { n =>
    if (n < 5) n.toString
    else throw new Exception("Boooommm!")
  }.recover{
    case e: Exception => s"truncate stream: ${e.getMessage}"
  }.runWith(Sink.foreach(println))
*/
/*
  val backupSource = Source(List("five","six","seven","eight","nine"))
  Source(0 to 10).map { n =>
    if (n < 5) n.toString
    else throw new RuntimeException("Boooommm!")
  }.recoverWithRetries(attempts = 0, {
      case e: RuntimeException => backupSource
    }
  ).runWith(Sink.foreach(println))

  val backoffSource = RestartSource.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ){ () => Source(List("FileA","FileB","FileC"))}

  val backoffFlow = RestartFlow.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ){ () => Flow[String].map(_.toUpperCase())}

  val backoffSink = RestartSink.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ){ () => Sink.foreach(println)}

  //backoffSource.via(backoffFlow).to(backoffSink).run()

  val killSwitch = backoffSource.viaMat(KillSwitches.single)(Keep.right)
    .viaMat(backoffFlow)(Keep.left)
    .toMat(backoffSink)(Keep.left)
    .run()

  Thread.sleep(1000)
  killSwitch.shutdown()
*/
  /*
  val decider: Supervision.Decider = {
    case _: ArithmeticException => Supervision.Resume
    case _ => Supervision.Stop
  }

  implicit val mat2 = ActorMaterializer(
    ActorMaterializerSettings(sys).withSupervisionStrategy(decider)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  Source(1 to 5).map { n =>
    if (n != 3) n.toString
    else throw new ArithmeticException("no 3 please!")
  }.runWith(Sink.foreach(println))

  Thread.sleep(1000)
  println("")
  Thread.sleep(1000)

  Source(1 to 5).map { n =>
    if (n != 5) n.toString
    else throw new Exception("no 3 please!")
  }.runWith(Sink.foreach(println))
*/
  val decider : Supervision.Decider = {
    case _: IllegalArgumentException => Supervision.Restart
    case _ => Supervision.Stop
  }
  val flow = Flow[Int]
    .scan(0) { (acc, elem) =>
      if (elem < 0) throw new IllegalArgumentException("negative not allowed")
      else acc + elem
    }.withAttributes(ActorAttributes.supervisionStrategy(decider))

  Source(List(1, 3, -1, 5, 7)).via(flow)
    .runWith(Sink.foreach(println))


  scala.io.StdIn.readLine()
  sys.terminate()

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏用户3030674的专栏

Android性能优化——之防止内存泄露

又是好久没有写博客了,一直都比较忙,最近终于有时间沉淀和整理一下最近学到和解决的一些问题。

521
来自专栏练小习的专栏

关于margin-top的百分比值

周末朋友发出江湖救急的消息,说他有个莫名其妙的bug搞不定了,于是我让他把bug的代码发过来看看。 他的需求是绝对居中,代码是这样的 width:100px; ...

1899
来自专栏QQ会员技术团队的专栏

剖析Vue原理&实现双向绑定MVVM

本文能帮你做什么? 1、了解vue的双向数据绑定原理以及核心代码模块 2、缓解好奇心的同时了解如何实现双向绑定 为了便于说明原理与实现,本文相关代码主要摘自vu...

1.1K7
来自专栏猿人谷

addSubView需要注意的几个点为什么要在addsubview:一个view对象后,release它?

addSubview: Adds a view to the end of the receiver’s list of subviews. 译:增加一个视图到...

18310
来自专栏desperate633

设计模式之状态模式(state模式)状态模式的具体实例状态模式的分析

面向对象编程中,类用来表示对象,一般情况下,我们需要考虑用类来表示什么具体的东西。类对应的东西可能存在于真实世界中,也可能不存在于真实世界中。 状态模式所表示...

822
来自专栏GreenLeaves

C# 事件

一、前言:前面的随笔中说完了委托,现在看看事件到底可以干什么,在前面的随笔中,使用委托的过程中,有一个很别扭,也很显然易见的问题,就是委托第一次必须初始化用"=...

18710
来自专栏mathor

事件模型

 事件模型如何理解呢,举个例子,你老婆出门了,让你在家看孩子,难道你每过一分钟就去看一次孩子吗,对于计算机来说,我做一个按钮,难道就一直监听这个按钮使用没有...

671
来自专栏偏前端工程师的驿站

JS魔法堂:再识IE的内存泄露

一、前言                               IE6~8除了不遵守W3C标准和各种诡异外,我想最让人诟病的应该是内存泄露的问题了。这阵子...

1925
来自专栏腾讯玄武实验室的专栏

IE 浏览器 DOM 树结构概览(下)

DOM (Document Object Model)作为现代浏览器的基础,其设计和实现方式影响着整个浏览器的表现。对安全研究者而言,了解 DOM 的结构更是有...

3340
来自专栏Golang语言社区

morestack与goroutine pool

o语言的goroutine初始栈大小只有2K,如果运行过程中调用链比较长,超过的这个大小的时候,栈会自动地扩张。这个时候会调用到一个函数runtime.more...

4486

扫码关注云+社区