Akka(24): Stream:从外部系统控制数据流-control live stream from external system

 在数据流应用的现实场景中常常会遇到与外界系统对接的需求。这些外部系统可能是Actor系统又或者是一些其它类型的系统。与这些外界系统对接的意思是在另一个线程中运行的数据流可以接收外部系统推送的事件及做出行为改变的响应。

如果一个外界系统需要控制一个运行中数据流的功能环节GraphStage,首先必须在这个GraphStage内部构建一个控制函数,这样才能接触并更改GraphStage的内部状态。外部系统可以通过调用这个控制函数来向GraphStage发送信息,控制GraphStage行为。akka-stream是多线程异步模式的程序,所以这个函数只能是一个异步运行的回调callback。akka-stream提供了一个函数getAsyncCallback函数,能够把一个函数放到另一个线程里并返回它的callback:

  /**
   * Obtain a callback object that can be used asynchronously to re-enter the
   * current [[GraphStage]] with an asynchronous notification. The [[invoke()]] method of the returned
   * [[AsyncCallback]] is safe to be called from other threads and it will in the background thread-safely
   * delegate to the passed callback function. I.e. [[invoke()]] will be called by the external world and
   * the passed handler will be invoked eventually in a thread-safe way by the execution environment.
   *
   * This object can be cached and reused within the same [[GraphStageLogic]].
   */
  final def getAsyncCallback[T](handler: T ⇒ Unit): AsyncCallback[T] = {
    new AsyncCallback[T] {
      override def invoke(event: T): Unit =
        interpreter.onAsyncInput(GraphStageLogic.this, event, handler.asInstanceOf[Any ⇒ Unit])
    }
  }

getAsyncCallback把一个函数T=>Unit变成了异步运算函数并通过AsyncCallback返回它的回调callback。下面是getAsyncCallback的一个用例: 

//external system
object Injector {
  var callback: AsyncCallback[String] = null
   def inject(m: String) = {
     if (callback != null)
     callback.invoke(m)
   }
}
class InjectControl(injector: Injector.type) extends GraphStage[FlowShape[String,String]] {
  val inport = Inlet[String]("input")
  val outport = Outlet[String]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var extMessage = ""
      override def preStart(): Unit = {
        val callback = getAsyncCallback[String] { m =>
          if (m.length > 0)
             m match {
               case "Stop" => completeStage()
               case s: String => extMessage = s
             }

        }
        injector.callback = callback
      }

      setHandler(inport, new InHandler {
        override def onPush(): Unit =
          if (extMessage.length > 0) {
            push(outport,extMessage)
            extMessage=""
          }
        else
          push(outport, grab(inport))
      })
      setHandler(outport, new OutHandler {
        override def onPull(): Unit = pull(inport)
      })
    }

}

上面这个例子里的object Injector模拟一个外部系统。我们重写了GraphStage InjectControl.createLogic里的preStart()函数,在这里把一个String=>Unit函数的callback登记在Injector里。这个callback函数能接受传入的String并更新内部状态extMessage,或者当传入String==“Stop"时终止数据流。在onPush()里extMessage最终会被当作流元素插入到数据流中。下面我们就构建这个GraphStage的测试运行程序:

object InteractWithStreams extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure)
  val graph = new InjectControl(Injector)
  val flow = Flow.fromGraph(graph)

  source.via(flow).to(Sink.foreach(println)).run()
  Thread.sleep(2000)
  Injector.inject("hello")
  Thread.sleep(2000)
  Injector.inject("world!")
  Thread.sleep(2000)
  Injector.inject("Stop")

  scala.io.StdIn.readLine()
  sys.terminate()

}

试运行结果显示:

1
2
hello
4
5
6
world!
8
9
10


Process finished with exit code 0

正确地把"hello world!"插入了一个正在运行中的数据流中并在最后终止了这个数据流。

另外,一个GraphStage也可以被外界当作一种Actor来进行交流。我们可以在GraphStage内部构建一个(ActorRef,Any)=>Unit款式的函数,然后用getStageActor(func).ref以一种ActorRef形式返回这个函数:

 /**
   * Initialize a [[StageActorRef]] which can be used to interact with from the outside world "as-if" an [[Actor]].
   * The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify
   * internal state of this stage.
   *
   * This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running,
   * for example from the [[preStart]] callback the graph stage logic provides.
   *
   * Created [[StageActorRef]] to get messages and watch other actors in synchronous way.
   *
   * The [[StageActorRef]]'s lifecycle is bound to the Stage, in other words when the Stage is finished,
   * the Actor will be terminated as well. The entity backing the [[StageActorRef]] is not a real Actor,
   * but the [[GraphStageLogic]] itself, therefore it does not react to [[PoisonPill]].
   *
   * @param receive callback that will be called upon receiving of a message by this special Actor
   * @return minimal actor with watch method
   */
  // FIXME: I don't like the Pair allocation :(
  @ApiMayChange
  final protected def getStageActor(receive: ((ActorRef, Any)) ⇒ Unit): StageActor = {
    _stageActor match {
      case null ⇒
        val actorMaterializer = ActorMaterializerHelper.downcast(interpreter.materializer)
        _stageActor = new StageActor(actorMaterializer, getAsyncCallback, receive)
        _stageActor
      case existing ⇒
        existing.become(receive)
        existing
    }
  }
...
    /**
     * The ActorRef by which this StageActor can be contacted from the outside.
     * This is a full-fledged ActorRef that supports watching and being watched
     * as well as location transparent (remote) communication.
     */
    def ref: ActorRef = functionRef

下面是receive:((ActorRef,Any))=>Unit这个函数的实现例子:

      def behavior(m:(ActorRef,Any)): Unit = {
        val (sender, msg) = m

        msg.asInstanceOf[String] match {
          case "Stop" => completeStage()
          case s@ _ => extMessage = s
        }
      }

这个函数的输入参数(sender,msg)代表发送消息的Actor和发送的消息。与上个例子一样,作为一个GraphStage的内部函数,它可以使用、更新GraphStage内部状态。GraphStage的实现如下:

class StageAsActor(extActor: ActorRef) extends GraphStage[FlowShape[String,String]] {
  val inport = Inlet[String]("input")
  val outport = Outlet[String]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var extMessage = ""
      override def preStart(): Unit = {
        extActor ! getStageActor(behavior).ref
      }

      def behavior(m:(ActorRef,Any)): Unit = {
        val (sender, msg) = m

        msg.asInstanceOf[String] match {
          case "Stop" => completeStage()
          case s@ _ => extMessage = s
        }
      }

      setHandler(inport, new InHandler {
        override def onPush(): Unit =
          if (extMessage.length > 0) {
            push(outport,extMessage)
            extMessage=""
          }
          else
            push(outport, grab(inport))
      })
      setHandler(outport, new OutHandler {
        override def onPull(): Unit = pull(inport)
      })
    }

}

参数extActor就是外部的控制Actor。在creatLogic.preStart()里我们先把StageActor传给extActor。外部系统就可以通过extActor来控制数据流行为:

class Messenger extends Actor with ActorLogging {
  var stageActor: ActorRef = _
  override def receive: Receive = {
    case r: ActorRef =>
      stageActor = r
      log.info("received stage actorRef")
    case s: String => stageActor forward s
      log.info(s"forwarding message:$s")

  }
}
object GetStageActorDemo extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )
  

  val stageActorMessenger = sys.actorOf(Props[Messenger],"forwarder")

  val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure)
  val graph = new StageAsActor(stageActorMessenger)
  val flow = Flow.fromGraph(graph)

  source.via(flow).to(Sink.foreach(println)).run()

   Thread.sleep(2000)
  stageActorMessenger ! "Hello"
  Thread.sleep(1000)
  stageActorMessenger ! "World!"
  Thread.sleep(2000)
  stageActorMessenger ! "Stop"
  

  scala.io.StdIn.readLine()
  sys.terminate()
  

}

Messenger就是一个存粹的中介,把控制消息通过StageActor转发给运行中的数据流。

下面是本次示范的源代码:

GetAsyncCallBack.scala

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import scala.concurrent.duration._

//external system
object Injector {
  var callback: AsyncCallback[String] = null
   def inject(m: String) = {
     if (callback != null)
     callback.invoke(m)
   }
}
class InjectControl(injector: Injector.type) extends GraphStage[FlowShape[String,String]] {
  val inport = Inlet[String]("input")
  val outport = Outlet[String]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var extMessage = ""
      override def preStart(): Unit = {
        val callback = getAsyncCallback[String] { m =>
          if (m.length > 0)
             m match {
               case "Stop" => completeStage()
               case s: String => extMessage = s
             }

        }
        injector.callback = callback
      }

      setHandler(inport, new InHandler {
        override def onPush(): Unit =
          if (extMessage.length > 0) {
            push(outport,extMessage)
            extMessage=""
          }
        else
          push(outport, grab(inport))
      })
      setHandler(outport, new OutHandler {
        override def onPull(): Unit = pull(inport)
      })
    }

}

object GetAsyncCallbackDemo extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure)
  val graph = new InjectControl(Injector)
  val flow = Flow.fromGraph(graph)

  source.via(flow).to(Sink.foreach(println)).run()
  Thread.sleep(2000)
  Injector.inject("hello")
  Thread.sleep(2000)
  Injector.inject("world!")
  Thread.sleep(2000)
  Injector.inject("Stop")

  scala.io.StdIn.readLine()
  sys.terminate()


}

GetStageActorDemo.scala

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import scala.concurrent.duration._


class StageAsActor(extActor: ActorRef) extends GraphStage[FlowShape[String,String]] {
  val inport = Inlet[String]("input")
  val outport = Outlet[String]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var extMessage = ""
      override def preStart(): Unit = {
        extActor ! getStageActor(behavior).ref
      }

      def behavior(m:(ActorRef,Any)): Unit = {
        val (sender, msg) = m

        msg.asInstanceOf[String] match {
          case "Stop" => completeStage()
          case s@ _ => extMessage = s
        }
      }

      setHandler(inport, new InHandler {
        override def onPush(): Unit =
          if (extMessage.length > 0) {
            push(outport,extMessage)
            extMessage=""
          }
          else
            push(outport, grab(inport))
      })
      setHandler(outport, new OutHandler {
        override def onPull(): Unit = pull(inport)
      })
    }

}

class Messenger extends Actor with ActorLogging {
  var stageActor: ActorRef = _
  override def receive: Receive = {
    case r: ActorRef =>
      stageActor = r
      log.info("received stage actorRef")
    case s: String => stageActor forward s
      log.info(s"forwarding message:$s")

  }
}
object GetStageActorDemo extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )


  val stageActorMessenger = sys.actorOf(Props[Messenger],"forwarder")

  val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure)
  val graph = new StageAsActor(stageActorMessenger)
  val flow = Flow.fromGraph(graph)

  source.via(flow).to(Sink.foreach(println)).run()

   Thread.sleep(2000)
  stageActorMessenger ! "Hello"
  Thread.sleep(1000)
  stageActorMessenger ! "World!"
  Thread.sleep(2000)
  stageActorMessenger ! "Stop"


  scala.io.StdIn.readLine()
  sys.terminate()


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏扎心了老铁

springboot与thrift集成实现服务端和客户端

我们这里用一个简单的小功能来演示一下如何使用springboot集成thrift 这个功能是,判断hdfs路径存在。 1、先解决依赖 <dependencie...

1.6K9
来自专栏智能大石头

老瓶装新酒 - C#调用WM手机发送短信(源码)

一些系统,需要能够发送短信,量很小,平均每日10条。 运营商平台太贵,白名单很严格,小额只能发省内; 各短信平台有各种限制,大事件前后会关闭; 飞信以前可以用W...

1775
来自专栏Jed的技术阶梯

SparkStreaming 写数据到 HBase,由于共用连接造成的数据丢失问题

有如下程序,SparkStreaming 读取 Kafka 中的数据,经过处理后,把数据写入到 Hbase 中

852
来自专栏Spark学习技巧

Spark 下操作 HBase(1.0.0 新 API)

hbase1.0.0版本提供了一些让人激动的功能,并且,在不牺牲稳定性的前提下,引入了新的API。虽然 1.0.0 兼容旧版本的 API,不过还是应该尽早地来熟...

732
来自专栏JadePeng的技术博客

基于spring security 实现前后端分离项目权限控制

前后端分离的项目,前端有菜单(menu),后端有API(backendApi),一个menu对应的页面有N个API接口来支持,本文介绍如何基于spring se...

680
来自专栏ASP.NET MVC5 后台权限管理系统

ASP.NET MVC5+EF6+EasyUI 后台管理系统(53)-工作流设计-我的批阅

前言:由于工作原因工作流一直没时间更新,虽然没有更新,但是批阅和申请差不多,改变一下数据的状态字段就行,有几个园友已经率先完成了 说句实话,一个工作流用文章表达...

27610
来自专栏函数式编程语言及工具

SDP(9):MongoDB-Scala - data access and modeling

    MongoDB是一种文件型数据库,对数据格式没有硬性要求,所以可以实现灵活多变的数据存储和读取。MongoDB又是一种分布式数据库,与传统关系数据库不同...

3334
来自专栏向治洪

android优化之省电

Android程序中耗电最多的地方在以下几个方面 : 1、 大数据量的传输。 2、 不停的在网络间切换。 3、 解析大量的文本数据。 那么我们怎么样来改...

16810
来自专栏大内老A

ASP.NET MVC的Razor引擎:RazorViewEngine

基于Web Form引擎的WebFormViewEngine和针对Razor引擎的RazorViewEngine都是抽象类型BuildManagerViewEn...

1949
来自专栏草根专栏

Rx.NET 简介

官网: http://reactivex.io/ 它支持基本所有的主流语言. 这里我简单介绍一下Rx.NET. 基本概念和RxJS是一样的. 下面开始切入正题....

2939

扫码关注云+社区