Akka(7): FSM:通过状态变化来转换运算行为

  在上篇讨论里我们提到了become/unbecome。由于它们本质上是堆栈操作,所以只能在较少的状态切换下才能保证堆栈操作的协调及维持程序的清晰逻辑。对于比较复杂的程序流程,Akka提供了FSM:一种通过状态变化进行功能切换的Actor。FSM模式的状态转变特别适合对应现实情况中的程序流程,我们可以用每一种状态来代表一个程序流程。FSM是个trait,定义如下:

trait FSM[S, D] extends Actor with Listeners with ActorLogging {...}

我们看到:FSM就是一个特殊的Actor。带着两个类型参数:S代表状态类型,D代表状态数据类型。实际上S和D结合起来就是FSM的内部状态,即:SomeState+DataX 和 SomeState+DataY分别代表不同的Actor内部状态,这点从State定义可以得到信息:

/**
   * This captures all of the managed state of the [[akka.actor.FSM]]: the state
   * name, the state data, possibly custom timeout, stop reason and replies
   * accumulated while processing the last message.
   */
  case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {...}

我们可以用下面的表达式来代表FSM功能:

State(SA) x Event(E) -> Actions (A), State(SB)

意思是:假如在状态SA发生了事件E,那么FSM应该实施操作A并把状态转换到SB。这里面操作Action代表某项功能,事件Event是个新的类型,定义如下:

/**
   * All messages sent to the [[akka.actor.FSM]] will be wrapped inside an
   * `Event`, which allows pattern matching to extract both state and data.
   */
  final case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded

Event[D]是个包嵌消息和数据的类型。理论上FSM是通过接收Event来确定运行功能和下一个状态转换,就像普通的Actor接收Message原理一样。我们用上一篇里的FillSeasons作为这次FSM示范的例子。首先定义State,Data:

trait Seasons   //States
case object Spring extends Seasons
case object Summer extends Seasons
case object Fall extends Seasons
case object Winter extends Seasons

class SeasonInfo(talks: Int, month: Int)  //Data
case object BeginSeason extends SeasonInfo(0,1)

四个状态分别是:春夏秋冬。SeasonInfo代表数据类型,含被问候次数talks,季中月份month两个参数,每季含1,2,3三个月份。目前我们只支持两种功能消息:

object FillSeasons {
  sealed trait Messages    //功能消息
  case object HowYouFeel extends Messages
  case object NextMonth extends Messages
}

这种普通Actor的消息类型对应到FSM的Event类型中的event:Any。也就是说FSM在收到功能消息后需要构建一个Event类型实例并把消息包嵌在里面。因为FSM继承了Actor,所以它必须实现receive函数。下面是FSM.receive的源代码:

 /*
   * *******************************************
   *       Main actor receive() method
   * *******************************************
   */
  override def receive: Receive = {
    case TimeoutMarker(gen) ⇒
      if (generation == gen) {
        processMsg(StateTimeout, "state timeout")
      }
    case t @ Timer(name, msg, repeat, gen) ⇒
      if ((timers contains name) && (timers(name).generation == gen)) {
        if (timeoutFuture.isDefined) {
          timeoutFuture.get.cancel()
          timeoutFuture = None
        }
        generation += 1
        if (!repeat) {
          timers -= name
        }
        processMsg(msg, t)
      }
    case SubscribeTransitionCallBack(actorRef) ⇒
      // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list
      listeners.add(actorRef)
      // send current state back as reference point
      actorRef ! CurrentState(self, currentState.stateName)
    case Listen(actorRef) ⇒
      // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list
      listeners.add(actorRef)
      // send current state back as reference point
      actorRef ! CurrentState(self, currentState.stateName)
    case UnsubscribeTransitionCallBack(actorRef) ⇒
      listeners.remove(actorRef)
    case Deafen(actorRef) ⇒
      listeners.remove(actorRef)
    case value ⇒ {
      if (timeoutFuture.isDefined) {
        timeoutFuture.get.cancel()
        timeoutFuture = None
      }
      generation += 1
      processMsg(value, sender())
    }

除timer,subscription等特殊功能外,case value => ... 就是处理自定义消息的地方了。我们看到FSM是用processMsg(value, sender())来处理消息的。processMsg又调用了processEvent:

  private def processMsg(value: Any, source: AnyRef): Unit = {
    val event = Event(value, currentState.stateData)
    processEvent(event, source)
  }

  private[akka] def processEvent(event: Event, source: AnyRef): Unit = {
    val stateFunc = stateFunctions(currentState.stateName)
    val nextState = if (stateFunc isDefinedAt event) {
      stateFunc(event)
    } else {
      // handleEventDefault ensures that this is always defined
      handleEvent(event)
    }
    applyState(nextState)
  }

在processEvent里的stateFunction是个Map,以stateName为主键存放StateFunction:

 /*
   * State definitions
   */
  private val stateFunctions = mutable.Map[S, StateFunction]()

而StateFuction是:

 type StateFunction = scala.PartialFunction[Event, State]

FSM的receive函数在收到消息后把消息包嵌入新构建的Event然后在processEvent里通过stateName取出相应的StateFunction后传入Event产生新的状态State。用户提供的StateFunction是通过FSM的when函数压进stateFunction Map里的:

  /**
   * Insert a new StateFunction at the end of the processing chain for the
   * given state. If the stateTimeout parameter is set, entering this state
   * without a differing explicit timeout setting will trigger a StateTimeout
   * event; the same is true when using #stay.
   *
   * @param stateName designator for the state
   * @param stateTimeout default state timeout for this state
   * @param stateFunction partial function describing response to input
   */
  final def when(stateName: S, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit =
    register(stateName, stateFunction, Option(stateTimeout))

  private def register(name: S, function: StateFunction, timeout: Timeout): Unit = {
    if (stateFunctions contains name) {
      stateFunctions(name) = stateFunctions(name) orElse function
      stateTimeouts(name) = timeout orElse stateTimeouts(name)
    } else {
      stateFunctions(name) = function
      stateTimeouts(name) = timeout
    }
  }

我们看到when调用了register在stateFunction Map中按stateName放置StateFunction。FSM的这个stateFunction Map解决了become/unbecome产生的堆栈问题。FSM有个比较规范的结构,拿上面例子的FeelingSeasons结构做个示范:

class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {
  import FillSeasons._
  startWith(Spring,SeasonInfo(0,1))  //起始状态
  when(Spring) {   //状态在春季
    case Event(HowYouFeel,seasonInfo) => ...
  }
  when(Summer) {  //夏季状态
    case Event(HowYouFeel,_) =>
  }
  when(Fall) {  //秋季状态
    case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>
  }
  when(Winter) {  //冬季状态
    case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>
  }
  whenUnhandled {  //所有状态未处理的Event
    case Event(NextMonth,seasonInfo) =>
  }
  onTransition {
    case Spring -> Summer => log.info("Season changed from Spring to Summer month 1")
    case Summer -> Fall => log.info("Season changed from Summer to Fall month 1")
    case Fall -> Winter => log.info("Season changed from Fall to Winter month 1")
    case Winter -> Spring => log.info("Season changed from Winter to Spring month 1")
  }

  initialize()  //设定起始状态
 }

基本上是按照各状态定义事件处理函数StateFunction的。也可以包括状态转换处理函数TransitionHandler:

  type TransitionHandler = PartialFunction[(S, S), Unit]

最后,initialize()确定起始状态是否安排正确:

  /**
   * Verify existence of initial state and setup timers. This should be the
   * last call within the constructor, or [[akka.actor.Actor#preStart]] and
   * [[akka.actor.Actor#postRestart]]
   *
   * An initial `currentState -> currentState` notification will be triggered by calling this method.
   *
   * @see [[#startWith]]
   */
  final def initialize(): Unit =
    if (currentState != null) makeTransition(currentState)
    else throw new IllegalStateException("You must call `startWith` before calling `initialize`")

完整的FSM FeelingSeasons定义如下: 

class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {
  import FillSeasons._
  startWith(Spring,SeasonInfo(0,1))  //起始状态
  when(Spring) {   //状态在春季
    case Event(HowYouFeel,seasonInfo) =>
      val numtalks = seasonInfo.talks + 1
      log.info(s"It's ${stateName.toString}, feel so gooood! You've asked me ${numtalks}times.")
      stay using seasonInfo.copy(talks = numtalks)
  }
  when(Summer) {  //夏季状态
    case Event(HowYouFeel,_) =>
      val numtalks = stateData.talks + 1
      log.info(s"It's ${stateName.toString}, it's so hot! You've asked me ${numtalks}times")
      stay().using(stateData.copy(talks = numtalks))
  }
  when(Fall) {  //秋季状态
    case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>
      val numtalks = tks + 1
      log.info(s"It's ${stateName.toString}, it's no so bad. You've asked me ${numtalks}times.")
      stay using SeasonInfo(numtalks,mnth)
  }
  when(Winter) {  //冬季状态
    case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>
      val numtalks = tks + 1
      log.info(s"It's ${stateName.toString}, it's freezing cold! You've asked me ${numtalks}times.")
      stay using si.copy(talks = numtalks)
  }
  whenUnhandled {  //所有状态未处理的Event
    case Event(NextMonth,seasonInfo) =>
      val mth = seasonInfo.month
      if (mth <= 3) {
        log.info(s"It's month ${mth+1} of ${stateName.toString}")
        stay using seasonInfo.copy(month = mth + 1)
      }
      else {
        goto(nextSeason(stateName)) using SeasonInfo(0,1)
      }
  }
  onTransition {
    case Spring -> Summer => log.info(s"Season changed from Spring to Summer month ${nextStateData.month}")
    case Summer -> Fall => log.info(s"Season changed from Summer to Fall month ${nextStateData.month}")
    case Fall -> Winter => log.info(s"Season changed from Fall to Winter month ${nextStateData.month}")
    case Winter -> Spring => log.info(s"Season changed from Winter to Spring month ${nextStateData.month}")
  }

  initialize()  //设定起始状态
  log.info(s"It's month 1 of ${stateName.toString}")


  //季节转换顺序
  def nextSeason(season: Seasons): Seasons =
    season match {
      case Spring => Summer
      case Summer => Fall
      case Fall => Winter
      case Winter => Spring
    }
}

首先注意StateFunction中SeasonInfo的各种意思同等的表达方式及nextStateData。FSM状态数据用不可变对象(immutable object)最安全,所以在更新时必须用case class 的copy或直接构建新的SeasonInfo实例。

我们再来看看processEvent的作业流程:

 private[akka] def processEvent(event: Event, source: AnyRef): Unit = {
    val stateFunc = stateFunctions(currentState.stateName)
    val nextState = if (stateFunc isDefinedAt event) {
      stateFunc(event)
    } else {
      // handleEventDefault ensures that this is always defined
      handleEvent(event)
    }
    applyState(nextState)
  }

先运算用户定义的StateFunction处理事件Event获取新的状态State。然后调用applyState运算makeTransition处理状态转换(currentState = nextState):

  private[akka] def applyState(nextState: State): Unit = {
    nextState.stopReason match {
      case None ⇒ makeTransition(nextState)
      case _ ⇒
        nextState.replies.reverse foreach { r ⇒ sender() ! r }
        terminate(nextState)
        context.stop(self)
    }
  }

  private[akka] def makeTransition(nextState: State): Unit = {
    if (!stateFunctions.contains(nextState.stateName)) {
      terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))
    } else {
      nextState.replies.reverse foreach { r ⇒ sender() ! r }
      if (currentState.stateName != nextState.stateName || nextState.notifies) {
        this.nextState = nextState
        handleTransition(currentState.stateName, nextState.stateName)
        gossip(Transition(self, currentState.stateName, nextState.stateName))
        this.nextState = null
      }
      currentState = nextState

      def scheduleTimeout(d: FiniteDuration): Some[Cancellable] = {
        import context.dispatcher
        Some(context.system.scheduler.scheduleOnce(d, self, TimeoutMarker(generation)))
      }

      currentState.timeout match {
        case SomeMaxFiniteDuration                    ⇒ // effectively disable stateTimeout
        case Some(d: FiniteDuration) if d.length >= 0 ⇒ timeoutFuture = scheduleTimeout(d)
        case _ ⇒
          val timeout = stateTimeouts(currentState.stateName)
          if (timeout.isDefined) timeoutFuture = scheduleTimeout(timeout.get)
      }
    }
  }

我们用FSM DSL的stay, goto,using来取得新的FSM状态和数据:

 /**
   * Produce transition to other state.
   * Return this from a state function in order to effect the transition.
   *
   * This method always triggers transition events, even for `A -> A` transitions.
   * If you want to stay in the same state without triggering an state transition event use [[#stay]] instead.
   *
   * @param nextStateName state designator for the next state
   * @return state transition descriptor
   */
  final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)

  /**
   * Produce "empty" transition descriptor.
   * Return this from a state function when no state change is to be effected.
   *
   * No transition event will be triggered by [[#stay]].
   * If you want to trigger an event like `S -> S` for `onTransition` to handle use `goto` instead.
   *
   * @return descriptor for staying in current state
   */
  final def stay(): State = goto(currentState.stateName).withNotification(false) // cannot directly use currentState because of the timeout field

stay,goto返回结果都是State[S,D]类型。using是State类型的一个方法:

   /**
     * Modify state transition descriptor with new state data. The data will be
     * set when transitioning to the new state.
     */
    def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D] = {
      copy(stateData = nextStateData)
    }

我们看到using的主要作用是把当前状态数据替换成新状态的数据。

Akka的FSM是一个功能强大的Actor类型,所以配备了一套完整的DSL来方便FSM编程。FSM的DSL语句包括:

  final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit =
    currentState = FSM.State(stateName, stateData, timeout)

  final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)

  final def stay(): State = goto(currentState.stateName).withNotification(false) 

  final def stop(): State = stop(Normal)

  final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)

  ...

State[S,D]也提供了一些比较实用的方法函数:

case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
   ...

    // defined here to be able to override it in SilentState
    def copy(stateName: S = stateName, stateData: D = stateData, timeout: Option[FiniteDuration] = timeout, stopReason: Option[Reason] = stopReason, replies: List[Any] = replies): State[S, D] = {
      new State(stateName, stateData, timeout, stopReason, replies)
    }

    /**
     * Modify state transition descriptor to include a state timeout for the
     * next state. This timeout overrides any default timeout set for the next
     * state.
     *
     * Use Duration.Inf to deactivate an existing timeout.
     */
    def forMax(timeout: Duration): State[S, D] = timeout match {
      case f: FiniteDuration ⇒ copy(timeout = Some(f))
      case Duration.Inf      ⇒ copy(timeout = SomeMaxFiniteDuration) // we map the Infinite duration to a special marker,
      case _                 ⇒ copy(timeout = None) // that means "cancel stateTimeout". This marker is needed
    } // so we do not have to break source/binary compat.
    // TODO: Can be removed once we can break State#timeout signature to `Option[Duration]`

    /**
     * Send reply to sender of the current message, if available.
     *
     * @return this state transition descriptor
     */
    def replying(replyValue: Any): State[S, D] = {
      copy(replies = replyValue :: replies)
    }

    /**
     * Modify state transition descriptor with new state data. The data will be
     * set when transitioning to the new state.
     */
    def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D] = {
      copy(stateData = nextStateData)
    }

    ...
    }

FSM DSL中的transform是这样定义的:

  final class TransformHelper(func: StateFunction) {
    def using(andThen: PartialFunction[State, State]): StateFunction =
      func andThen (andThen orElse { case x ⇒ x })
  }

  final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)

我们看到TransformHelper用using对入参func:StateFunction施用用户提供的andThen: PartialFunction[State,State]后返回新的状态State。这个using与State.using是不同的。下面是一个transform用法例子:

when(Running) {
    transform {
      case Event(m, Target(Actor.noSender)) =>
        goto(Uninitialised) using NoConfig
 
      case Event(m, Target(ref)) =>
        ref ! m
        stay
    } using targetTransformer
  }
 
  def targetTransformer: PartialFunction[State, State] = {
    case s @ State(stateName, Target(ref), _, _, _) if ref.path.name.startsWith("testActor") =>
      log.debug("Setting target to dead letters")
      s.using(Target(Actor.noSender))
  }

transform{...}产生的State传给了targetTransformer然后经过模式匹配拆分出properties后用s.using更新stateData。

与become/unbecome相同,我们也可以在FSM里使用stashing。下面是一个用例:

when(Uninitialised) {
    case Event(Config(ref), _) =>
      goto(Running) using Target(ref)
 
    case Event(_, _) =>
      stash
      stay
  }
 
  when(Running) {
    case Event(m, Target(ref)) =>
      ref ! m
      stay
  }
   
  onTransition {
    case Uninitialised -> Running => unstashAll()
  }

当然,还有如stop,setTimer,replying,forMax,onTermination等方法和函数,这里就不一一详述了,有兴趣可以直接查询Akka/actor/FSM.scala。

下面是本次讨论的示范源码:

import akka.actor._

sealed trait Seasons   //States
case object Spring extends Seasons
case object Summer extends Seasons
case object Fall extends Seasons
case object Winter extends Seasons

//sealed trait SeasonData  //Data
case class SeasonInfo(talks: Int, month: Int)


object FillSeasons {
  sealed trait Messages    //功能消息
  case object HowYouFeel extends Messages
  case object NextMonth extends Messages

  def props = Props(new FillSeasons)
}

class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {
  import FillSeasons._
  startWith(Spring,SeasonInfo(0,1))  //起始状态
  when(Spring) {   //状态在春季
    case Event(HowYouFeel,seasonInfo) =>
      val numtalks = seasonInfo.talks + 1
      log.info(s"It's ${stateName.toString}, feel so gooood! You've asked me ${numtalks} times.")
      stay using seasonInfo.copy(talks = numtalks)
  }
  when(Summer) {  //夏季状态
    case Event(HowYouFeel,_) =>
      val numtalks = stateData.talks + 1
      log.info(s"It's ${stateName.toString}, it's so hot! You've asked me ${numtalks} times")
      stay().using(stateData.copy(talks = numtalks))
  }
  when(Fall) {  //秋季状态
    case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>
      val numtalks = tks + 1
      log.info(s"It's ${stateName.toString}, it's no so bad. You've asked me ${numtalks} times.")
      stay using SeasonInfo(numtalks,mnth)
  }
  when(Winter) {  //冬季状态
    case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>
      val numtalks = tks + 1
      log.info(s"It's ${stateName.toString}, it's freezing cold! You've asked me ${numtalks} times.")
      stay using si.copy(talks = numtalks)
  }
  whenUnhandled {  //所有状态未处理的Event
    case Event(NextMonth,seasonInfo) =>
      val mth = seasonInfo.month
      if (mth <= 3) {
        log.info(s"It's month ${mth+1} of ${stateName.toString}")
        stay using seasonInfo.copy(month = mth + 1)
      }
      else {
        goto(nextSeason(stateName)) using SeasonInfo(0,1)
      }
  }
  onTransition {
    case Spring -> Summer => log.info(s"Season changed from Spring to Summer month ${nextStateData.month}")
    case Summer -> Fall => log.info(s"Season changed from Summer to Fall month ${nextStateData.month}")
    case Fall -> Winter => log.info(s"Season changed from Fall to Winter month ${nextStateData.month}")
    case Winter -> Spring => log.info(s"Season changed from Winter to Spring month ${nextStateData.month}")
  }

  initialize()  //设定起始状态
  log.info(s"It's month 1 of ${stateName.toString}")


  //季节转换顺序
  def nextSeason(season: Seasons): Seasons =
    season match {
      case Spring => Summer
      case Summer => Fall
      case Fall => Winter
      case Winter => Spring
    }
}

object FSMDemo extends App {
  import scala.util.Random
  val fsmSystem = ActorSystem("fsmSystem")
  val fsmActor = fsmSystem.actorOf(FillSeasons.props,"fsmActor")

  (1 to 15).foreach { _ =>
    (1 to Random.nextInt(3)).foreach{ _ =>
      fsmActor ! FillSeasons.HowYouFeel
    }
    fsmActor ! FillSeasons.NextMonth
  }


  scala.io.StdIn.readLine()
  fsmSystem.terminate()
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏草根专栏

RxJS速成 (上)

What is RxJS? RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX是一种针对异步数据流的编程。简单来说,它将一切数据...

2534
来自专栏Android先生

【漫画技术】Android跨进程通信

Tips:4个环节,共计约9小时的精心打磨完成上线,同时也非常感谢参与审稿的同学。

882
来自专栏函数式编程语言及工具

Akka(3): Actor监管 - 细述BackoffSupervisor

    在上一篇讨论中我们谈到了监管:在Akka中就是一种直属父子监管树结构,父级Actor负责处理直属子级Actor产生的异常。当时我们把BackoffSup...

2306
来自专栏Winter漫聊技术

Retrofit进阶

什么是Retrofit? 这类文章太多了,这里就不多做介绍,贴个官方链接: http://square.github.io/retrofit/

802
来自专栏斑斓

Spray中的Authentication和JMeter测试

Spray Authentication 在Spray中,如果需要对REST API添加认证,可以使用Spray提供的Authenticate功能。本质上,Au...

3519
来自专栏大内老A

WCF后续之旅(16): 消息是如何分发到Endpoint的--消息筛选(Message Filter)

在介绍终结点的ListenUriMode时,我们提到了两个特殊的对象ChannelDispatcher和ChannelListener。这两个对象在整个WCF的...

1857
来自专栏斑斓

AKKA中的事件流

在《企业应用集成模式》一书中,定义了许多与消息处理有关的模式,其中运用最为广泛的模式为Publisher-Subscriber模式,尤其是在异步处理场景下。 基...

3444
来自专栏领域驱动设计DDD实战进阶

DDD实战进阶第一波(十一):开发一般业务的大健康行业直销系统(实现经销商代注册用例与登录令牌分发)

752
来自专栏博客园

mongo 监听指定语句

252
来自专栏张善友的专栏

WCF服务中操作FormsAuthentication的Cookie

在asp.net 应用程序和WCF服务之间共享FormsAuthentication,默认是不支持的,设置一下非常的简单,只需要两步就可以了: 1、在web.c...

1985

扫码关注云+社区