Akka(21): Stream:实时操控:人为中断-KillSwitch

 akka-stream是多线程non-blocking模式的,一般来说,运算任务提交到另外线程后这个线程就会在当前程序控制之外自由运行了。任何时候如果需要终止运行中的数据流就必须采用一种任务柄(handler)方式来控制在其它线程内运行的任务。这个handler可以在提交运算任务时获取。akka-stream提供了KillSwitch trait来支持这项功能:

/**
 * A [[KillSwitch]] allows completion of [[Graph]]s from the outside by completing [[Graph]]s of [[FlowShape]] linked
 * to the switch. Depending on whether the [[KillSwitch]] is a [[UniqueKillSwitch]] or a [[SharedKillSwitch]] one or
 * multiple streams might be linked with the switch. For details see the documentation of the concrete subclasses of
 * this interface.
 */
//#kill-switch
trait KillSwitch {
  /**
   * After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally.
   */
  def shutdown(): Unit
  /**
   * After calling [[KillSwitch#abort()]] the linked [[Graph]]s of [[FlowShape]] are failed.
   */
  def abort(ex: Throwable): Unit
}
//#kill-switch

可以想象:我们必须把这个KillSwitch放在一个流图中间,所以它是一种FlowShape的,这可以从KillSwitch的构建器代码里可以看得到:

object KillSwitches {

  /**
   * Creates a new [[SharedKillSwitch]] with the given name that can be used to control the completion of multiple
   * streams from the outside simultaneously.
   *
   * @see SharedKillSwitch
   */
  def shared(name: String): SharedKillSwitch = new SharedKillSwitch(name)

  /**
   * Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion
   * of that unique materialization. Different materializations result in different, independent switches.
   *
   * For a Bidi version see [[KillSwitch#singleBidi]]
   */
  def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch] =
    UniqueKillSwitchStage.asInstanceOf[Graph[FlowShape[T, T], UniqueKillSwitch]]

  /**
   * Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion
   * of that unique materialization. Different materializations result in different, independent switches.
   *
   * For a Flow version see [[KillSwitch#single]]
   */
  def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch] =
    UniqueBidiKillSwitchStage.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]]
...}

akka-stream提供了single,shared,singleBidi三种KillSwitch的构建方式,它们的形状都是FlowShape。KillSwitches.single返回结果类型是Graph[FlowShape[T,T],UniqueKillSwitch]。因为我们需要获取这个KillSwitch的控制柄,所以要用viaMat来可运算化(materialize)这个Graph,然后后选择右边的类型UniqueKillSwitch。这个类型可以控制一个可运算化FlowShape的Graph,如下:

  val source = Source(Stream.from(1,2)).delay(1.second,DelayOverflowStrategy.backpressure)
  val sink = Sink.foreach(println)
  val killSwitch = source.viaMat(KillSwitches.single)(Keep.right).to(sink).run()

  scala.io.StdIn.readLine()
  killSwitch.shutdown()
  println("terminated!")
  actorSys.terminate()

当然,也可以用异常方式中断运行:

killSwitch.abort(new RuntimeException("boom!"))

source是一个不停顿每秒发出一个数字的数据源。如上所述:必须把KillSwitch放在source和sink中间形成数据流完整链状。运算这个数据流时返回了handle killSwitch,我们可以使用这个killSwitch来shutdown或abort数据流运算。

KillSwitches.shared构建了一个SharedKillSwitch类型。这个类型可以被用来控制多个FlowShape Graph的终止运算。SharedKillSwitch类型里的flow方法可以返回终止运算的控制柄handler:

 /**
   * Returns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking
   * [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this
   * switch will be stopped normally or failed.
   *
   * @tparam T Type of the elements the Flow will forward
   * @return   A reusable [[Graph]] that is linked with the switch. The materialized value provided is this switch itself.
   */
  def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch] = _flow.asInstanceOf[Graph[FlowShape[T, T], SharedKillSwitch]]

用flow构建的SharedKillSwitch实例就像immutable对象,我们可以在多个数据流中插入SharedKillSwitch,然后用这一个共享的handler去终止使用了这个SharedKillSwitch的数据流运算。下面是SharedKillSwitch的使用示范:

  val sharedKillSwitch = KillSwitches.shared("multi-ks")
  val source2 = Source(Stream.from(1)).delay(2.second,DelayOverflowStrategy.backpressure)

  source2.via(sharedKillSwitch.flow).to(sink).run()
  source.via(sharedKillSwitch.flow).to(sink).run()

  scala.io.StdIn.readLine()
  killSwitch.shutdown()
  sharedKillSwitch.shutdown()

注意:我们先构建了一个SharedKillSwitch实例,然后在source2,source数据通道中间加入了这个实例。因为我们已经获取了sharedKillSwitch控制柄,所以不必理会返回结果,直接用via和to来连接上下游节点(默认为Keep.left)。

还有一个KillSwitches.singleBidi类型,这种KillSwitch是用来终止双流向数据流运算的。我们将在下篇讨论里介绍。

下面是本次示范的源代码:

import akka.stream.scaladsl._
import akka.stream._
import akka.actor._
import scala.concurrent.duration._
object KillSwitchDemo extends App {
  implicit val actorSys = ActorSystem("sys")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSys)
      .withInputBuffer(16,16)
  )

  val source = Source(Stream.from(1,2)).delay(1.second,DelayOverflowStrategy.backpressure)
  val sink = Sink.foreach(println)
  val killSwitch = source.viaMat(KillSwitches.single)(Keep.right).to(sink).run()

  val sharedKillSwitch = KillSwitches.shared("multi-ks")
  val source2 = Source(Stream.from(1)).delay(2.second,DelayOverflowStrategy.backpressure)

  source2.via(sharedKillSwitch.flow).to(sink).run()
  source.via(sharedKillSwitch.flow).to(sink).run()


  scala.io.StdIn.readLine()
  killSwitch.shutdown()
  sharedKillSwitch.shutdown()
  println("terminated!")
  actorSys.terminate()
  

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏用户画像

3.2.3页面置换算法

进程运行时,若其访问的页面不在内存而徐将其调入,但内存已无空闲时间时,就需要从内存中调出一页程序或数据,送入磁盘的对换区。 而选择调入页面的算法就称为页面置...

902
来自专栏落影的专栏

iOS开发-OpenGLES进阶教程3

教程 OpenGLES入门教程1-Tutorial01-GLKit OpenGLES入门教程2-Tutorial02-shader入门 OpenGLES入门...

3407
来自专栏决胜机器学习

PHP数据结构(十一) ——图的连通性问题与最小生成树算法(2)

PHP数据结构(十一)——图的连通性问题与最小生成树算法(2) (原创内容,转载请注明来源,谢谢) 再次遇到微信公众号限制字数3000字的问题。因此将...

36610
来自专栏AILearning

Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN

GraphX Programming Guide 概述 入门 属性 Graph 示例属性 Graph Graph 运算符 运算符的汇总表 P...

3828
来自专栏向治洪

java的双缓冲技术

Java的强大特性让其在游戏编程和多媒体动画处理方面也毫不逊色。在Java游戏编程和动画编程中最常见的就是对于屏幕闪烁的处理。本文从J2SE的一个再现了屏幕闪...

2398
来自专栏菩提树下的杨过

Flash/Flex学习笔记(22):滤镜学习

Silverlight中称之为“效果(Effect)”的东东,在Flash里叫“滤镜(Filter)",而且Flash里内置的滤镜要比Silverlight丰富...

1939
来自专栏郭诗雅的专栏

Three.js 粒子系统学习小记:礼花效果实现

在3D建模过程中,当我们需要创建很多细小的物体时,并不会一个个地创建这些物体,而是通过创建粒子,粒子可以模拟很多效果,例如烟花、火焰、雨滴、雪花、云朵等等。Th...

4.2K3
来自专栏向治洪

FLAnimatedImage -ios gif图片加载框架介绍

简介 FLAnimatedImage 是 Flipboard 团队开发的在它们 App 中渲染 GIF 图片使用的库。 后来 Flipboard 将 FLAni...

5169
来自专栏向治洪

FLAnimatedImage -ios gif图片加载框架介绍

简介 FLAnimatedImage 是 Flipboard 团队开发的在它们 App 中渲染 GIF 图片使用的库。 后来 Flipboard 将 FLAni...

2557
来自专栏腾讯Bugly的专栏

Android自绘动画实现与优化实战——以Tencent OS录音机波形动画为实例

前言 我们所熟知的,Android 的图形绘制主要是基于 View 这个类实现。 每个 View 的绘制都需要经过 onMeasure、onLayout、onD...

3284

扫码关注云+社区