Akka(15): 持久化模式:AtLeastOnceDelivery-消息保证送达模式

  消息保证送达是指消息发送方保证在任何情况下都会至少一次确定的消息送达。AtleastOnceDelivery是一个独立的trait,主要作用是对不确定已送达的消息进行补发,这是一种自动的操作,无需用户干预。既然涉及到消息的补发,就不可避免地影响发送方和接收方之间消息传递的顺序、接收方重复收到相同的消息等问题,这些用户必须加以关注。从另一个方面,AtleastOnceDelivery模式保证了强韧性Actor系统的不丢失消息,这项要求可能是一些系统的核心要求。

AtleastOnceDelivery模式既然需要保证消息必达,就必须保证自身在出现任何异常情况下都能恢复到原来的状态,这些都可通过状态持久化来实现。与PersistentActor不同而且更复杂的是AtleastOnceDelivery-Actor的状态除自定义的结构外还必须包括未确认收到的消息(outstanding messages)。所以AtleastOnceDelivery提供了自身特殊的事件(event)和快照(snapshot)类型,它们都包括消息送达状态。

AtleastOnceDelivery模式的原理是一套收到确认回复机制,是通过deliver,confirmDelivery两个函数实现的。deliver是消息发送函数:

/**
 * Scala API: Mix-in this trait with your `PersistentActor` to send messages with at-least-once
 * delivery semantics to destinations. It takes care of re-sending messages when they
 * have not been confirmed within a configurable timeout. Use the [[AtLeastOnceDeliveryLike#deliver]] method to
 * send a message to a destination. Call the [[AtLeastOnceDeliveryLike#confirmDelivery]] method when the destination
 * has replied with a confirmation message.
 *
 * At-least-once delivery implies that original message send order is not always retained
 * and the destination may receive duplicate messages due to possible resends.
 *
 * The interval between redelivery attempts can be defined by [[AtLeastOnceDeliveryLike#redeliverInterval]].
 * After a number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message
 * will be sent to `self`. The re-sending will still continue, but you can choose to call
 * [[AtLeastOnceDeliveryLike#confirmDelivery]] to cancel the re-sending.
 *
 * The `AtLeastOnceDelivery` trait has a state consisting of unconfirmed messages and a
 * sequence number. It does not store this state itself. You must persist events corresponding
 * to the `deliver` and `confirmDelivery` invocations from your `PersistentActor` so that the
 * state can be restored by calling the same methods during the recovery phase of the
 * `PersistentActor`. Sometimes these events can be derived from other business level events,
 * and sometimes you must create separate events. During recovery calls to `deliver`
 * will not send out the message, but it will be sent later if no matching `confirmDelivery`
 * was performed.
 *
 * Support for snapshots is provided by [[AtLeastOnceDeliveryLike#getDeliverySnapshot]] and [[AtLeastOnceDeliveryLike#setDeliverySnapshot]].
 * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
 * If you need a custom snapshot for other parts of the actor state you must also include the
 * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
 * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
 * as a blob in your custom snapshot.
 *
 * @see [[AtLeastOnceDeliveryLike]]
 * @see [[AbstractPersistentActorWithAtLeastOnceDelivery]] for Java API
 */
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {

  /**
   * Scala API: Send the message created by the `deliveryIdToMessage` function to
   * the `destination` actor. It will retry sending the message until
   * the delivery is confirmed with [[#confirmDelivery]]. Correlation
   * between `deliver` and `confirmDelivery` is performed with the
   * `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
   * function. The `deliveryId` is typically passed in the message to the
   * destination, which replies with a message containing the same `deliveryId`.
   *
   * The `deliveryId` is a strictly monotonically increasing sequence number without
   * gaps. The same sequence is used for all destinations of the actor, i.e. when sending
   * to multiple destinations the destinations will see gaps in the sequence if no
   * translation is performed.
   *
   * During recovery this method will not send out the message, but it will be sent
   * later if no matching `confirmDelivery` was performed.
   *
   * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
   * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
   */
  def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = {
    internalDeliver(destination)(deliveryIdToMessage)
  }

  /**
   * Scala API: Send the message created by the `deliveryIdToMessage` function to
   * the `destination` actor. It will retry sending the message until
   * the delivery is confirmed with [[#confirmDelivery]]. Correlation
   * between `deliver` and `confirmDelivery` is performed with the
   * `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
   * function. The `deliveryId` is typically passed in the message to the
   * destination, which replies with a message containing the same `deliveryId`.
   *
   * The `deliveryId` is a strictly monotonically increasing sequence number without
   * gaps. The same sequence is used for all destinations of the actor, i.e. when sending
   * to multiple destinations the destinations will see gaps in the sequence if no
   * translation is performed.
   *
   * During recovery this method will not send out the message, but it will be sent
   * later if no matching `confirmDelivery` was performed.
   *
   * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
   * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
   */
  def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = {
    internalDeliver(destination)(deliveryIdToMessage)
  }

}

deliver自动产生一个deliveryId,这个deliveryId是发送方与接收方沟通的标志。confirmDelivery(deliveryId)用这个id来确认某条消息已经送达:

  /**
   * Call this method when a message has been confirmed by the destination,
   * or to abort re-sending.
   * @see [[#deliver]]
   * @return `true` the first time the `deliveryId` is confirmed, i.e. `false` for duplicate confirm
   */
  def confirmDelivery(deliveryId: Long): Boolean = {
    if (unconfirmed.contains(deliveryId)) {
      unconfirmed -= deliveryId
      true
    } else false
  }

confirmDelivery同时还被用来取消未确认送达消息。系统对超过配置文件中重发次数设置的消息通过自发送一条UnconformedWarning信息,这个信息包嵌了当前未确认送达消息清单:

 /**
   * @see [[AtLeastOnceDeliveryLike#warnAfterNumberOfUnconfirmedAttempts]]
   */
  @SerialVersionUID(1L)
  case class UnconfirmedWarning(unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]) {
    /**
     * Java API
     */
    def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = {
      import scala.collection.JavaConverters._
      unconfirmedDeliveries.asJava
    }
  }

unconfirmedDeliveries是在下面这个函数里形成的:

private def redeliverOverdue(): Unit = {
    val now = System.nanoTime()
    val deadline = now - redeliverInterval.toNanos
    var warnings = Vector.empty[UnconfirmedDelivery]

    unconfirmed
      .iterator
      .filter { case (_, delivery) ⇒ delivery.timestamp <= deadline }
      .take(redeliveryBurstLimit)
      .foreach {
        case (deliveryId, delivery) ⇒
          send(deliveryId, delivery, now)

          if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts)
            warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message)
      }

    if (warnings.nonEmpty)
      self ! UnconfirmedWarning(warnings)
  }

在状态恢复和消息处理是都会调用这个redeliverOverdue函数:

 override private[akka] def onReplaySuccess(): Unit = {
    redeliverOverdue()
    startRedeliverTask()
    super.onReplaySuccess()
  }

  /**
   * INTERNAL API
   */
  override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
    message match {
      case RedeliveryTick ⇒
        redeliverOverdue()

      case x ⇒
        super.aroundReceive(receive, message)
    }

AtLeastOnceDelivery模式的快照snapshot类型定义如下:

object AtLeastOnceDelivery {

  /**
   * Snapshot of current `AtLeastOnceDelivery` state. Can be retrieved with
   * [[AtLeastOnceDeliveryLike#getDeliverySnapshot]] and saved with [[PersistentActor#saveSnapshot]].
   * During recovery the snapshot received in [[SnapshotOffer]] should be set
   * with [[AtLeastOnceDeliveryLike#setDeliverySnapshot]].
   */
  @SerialVersionUID(1L)
  case class AtLeastOnceDeliverySnapshot(currentDeliveryId: Long, unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery])
    extends Message {

    /**
     * Java API
     */
    def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = {
      import scala.collection.JavaConverters._
      unconfirmedDeliveries.asJava
    }

  }

可以看到这个类型包括了未确认送达清单UnconfirmedDelivery。快照存写和恢复用下面这两个函数: 

/**
   * Full state of the `AtLeastOnceDelivery`. It can be saved with [[PersistentActor#saveSnapshot]].
   * During recovery the snapshot received in [[SnapshotOffer]] should be set
   * with [[#setDeliverySnapshot]].
   *
   * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
   * If you need a custom snapshot for other parts of the actor state you must also include the
   * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
   * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
   * as a blob in your custom snapshot.
   */
  def getDeliverySnapshot: AtLeastOnceDeliverySnapshot =
    AtLeastOnceDeliverySnapshot(
      deliverySequenceNr,
      unconfirmed.map { case (deliveryId, d) ⇒ UnconfirmedDelivery(deliveryId, d.destination, d.message) }(breakOut))

  /**
   * If snapshot from [[#getDeliverySnapshot]] was saved it will be received during recovery
   * in a [[SnapshotOffer]] message and should be set with this method.
   */
  def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit = {
    deliverySequenceNr = snapshot.currentDeliveryId
    val now = System.nanoTime()
    unconfirmed = snapshot.unconfirmedDeliveries.map(d ⇒
      d.deliveryId → Delivery(d.destination, d.message, now, 0))(breakOut)
  }

我们还是设计个例子来示范保证送达模式:

package atleastonce.calculation
import akka.actor._
import akka.persistence._
import akka.persistence.AtLeastOnceDelivery._
import atleastonce.calculator.Calculator
object CalcAggregator {
  sealed trait Command
  case class Add(x: Int, y: Int) extends Command
  case class Sub(x: Int, y: Int) extends Command
  case class Mul(x: Int, y: Int) extends Command
  case class Div(x: Int, y: Int) extends Command
  case class Result(id: Long, res: Int) extends Command
/*
  sealed trait Event
  case class Added(x: Int, y: Int) extends Event
  case class Substracted(x: Int, y: Int) extends Event
  case class Multiplied(x: Int, y: Int) extends Event
  case class Divided(x: Int, y: Int) extends Event
  case class GotResult(id: Long, res: Int) extends Event

*/
  case class Snap(results: Set[Int], deliverySnapshot: AtLeastOnceDeliverySnapshot)
}

class CalcAggregator(calculators: Map[String,ActorPath]) extends PersistentActor
  with AtLeastOnceDelivery with ActorLogging {
 import CalcAggregator._
  var results: Set[Int] = Set()

  override def persistenceId = "calculation-actor"

  //sending commands and update state only with delivery ack
  def updateState(cmd: Command) = cmd match {
    case Add(x,y) => deliver(calculators("ADD")){id => Calculator.Add(id,x,y)}
    case Sub(x,y) => deliver(calculators("SUB")){id => Calculator.Sub(id,x,y)}
    case Mul(x,y) => deliver(calculators("MUL")){id => Calculator.Mul(id,x,y)}
    case Div(x,y) => deliver(calculators("DIV")){id => Calculator.Div(id,x,y)}
    case Result(id,res) =>
      results += res
      confirmDelivery(id)
  }

  override def receiveCommand: Receive = {
    case cmd: Command => persistAsync(cmd){updateState}

    case UnconfirmedWarning(unconfirmedDeliveries) =>  //cancel outstanding message
      unconfirmedDeliveries.foreach{u => confirmDelivery(u.deliveryId)}
  }

  override def receiveRecover: Receive = {
    case cmd: Command => updateState(cmd)
    case SnapshotOffer(_,snap: Snap) =>
      results = snap.results
      setDeliverySnapshot(snap.deliverySnapshot)
  }

}

以上是一个典型的任务分发器,主要功能是通过发送指令消息驱动其它Calculator Actor进行运算,我们希望保证发出的指令都能送达。首先,指令和事件是一一对应的,无需进行指令事件转换,可以统一直接采用指令。再就是存写指令时无需验证,因为状态results更新是在收到指令接收方回复Results(id,res)之后进行的。从这个例子比较简单的功能操作中我们可明显感觉到写入日志的流量:CalcAggregator好像就是在不断的把经历的指令写入日志然后等待回复,回复时间就是Calculator运算时间。试想下次启动系统进行日志重演时会怎样:启动时间长度等于系统累积运算的时间长度。这很可怕,花几个小时来恢复状态可能是常态。所以必须充分利用快照,采用一套有效的日志、快照维护方式来提高状态恢复效率。下面是加入了日志快照维护功能的新代码:

package atleastonce.calculation
import akka.actor._
import akka.persistence._
import akka.persistence.AtLeastOnceDelivery._
import atleastonce.calculator.Calculator
import scala.util.control.NoStackTrace

object CalcAggregator {
  sealed trait Command
  case class Add(x: Int, y: Int) extends Command
  case class Sub(x: Int, y: Int) extends Command
  case class Mul(x: Int, y: Int) extends Command
  case class Div(x: Int, y: Int) extends Command
  case class Result(id: Long, res: Int) extends Command
/*
  sealed trait Event
  case class Added(x: Int, y: Int) extends Event
  case class Substracted(x: Int, y: Int) extends Event
  case class Multiplied(x: Int, y: Int) extends Event
  case class Divided(x: Int, y: Int) extends Event
  case class GotResult(id: Long, res: Int) extends Event
*/
  case class Snap(results: List[Int],
                  deliverySnapshot: AtLeastOnceDeliverySnapshot)


  case object ShowResults
  case object Boom
  case object ClearJournal


  def props(calculators: Map[String,ActorRef],keepJournalNr: Int) =
    Props(new CalcAggregator(calculators,keepJournalNr))
}

class CalcAggregator(calculators: Map[String,ActorRef],keepJournalNr: Int)
  extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
  import CalcAggregator._
  var results: List[Int] = List()
  var resultsId: Int = 0
  override def persistenceId = "calculation-actor023"

  //sending commands and update state only with delivery ack
  def updateState(cmd: Command) = {
    if (!recoveryRunning && !cmd.isInstanceOf[Result])
        log.info(s"Sending command message: $cmd at: $lastSequenceNr")

    cmd match {
      case Add(x,y) => deliver(calculators("ADD").path){id => Calculator.Add(id,x,y)}
      case Sub(x,y) => deliver(calculators("SUB").path){id => Calculator.Sub(id,x,y)}
      case Mul(x,y) => deliver(calculators("MUL").path){id => Calculator.Mul(id,x,y)}
      case Div(x,y) => deliver(calculators("DIV").path){id => Calculator.Div(id,x,y)}
      case Result(id,res) =>
        log.info(s"Receive calculation result $res with ack id: $id")
        if ( res != 0) {
          results = res :: results
          confirmDelivery(id)
          log.info(s"Current state updated to: $results at $lastSequenceNr")
          resultsId += 1
          if (resultsId % keepJournalNr == 0) {
            resultsId = 0
            saveSnapshot(Snap(results, getDeliverySnapshot))
            log.info(s"Saving snapshot with state $results, snapshot: $getDeliverySnapshot")
          }
        }
    }
  }

  override def receiveCommand: Receive = {
    case cmd: Command => persist(cmd){updateState}
    case ack: Calculator.Ack =>
      updateState(Result(ack.id,ack.x))
      
    case UnconfirmedWarning(unconfirmedDeliveries) =>  //cancel outstanding message
      log.info(s"UnconfirmedWarning: $unconfirmedDeliveries ...")
      unconfirmedDeliveries.foreach{u =>
        log.info(s"Cancelling unconfirmedDeliveris $u")
        confirmDelivery(u.deliveryId)}

    case SaveSnapshotSuccess(m) =>
      log.info(s"Sucessfull saving snapshot: ${m} at: $lastSequenceNr")
      //clear journal and snapshot
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = m.sequenceNr - 1))
      deleteMessages(m.sequenceNr)
    case SaveSnapshotFailure(m,cause) =>
      log.info(s"Saving snapshot failed because: ${cause}")
    case DeleteMessagesSuccess(toSeq) =>
      log.info(s"Succefull deleting journal upto: $toSeq")
    case DeleteMessagesFailure(cause,toSeq) =>
      log.info(s"Failed to delete journal upto: $toSeq because: $cause")
    case DeleteSnapshotsSuccess(crit) =>
      log.info(s"Successful delete snapshots for $crit")
    case DeleteSnapshotSuccess(m) =>
      log.info(s"Successful delete snapshot upto: ${m.sequenceNr}")
    case DeleteSnapshotsFailure(crit,cause) =>
      log.info(s"Failed to delete snapshots $crit because: $cause")
    case DeleteSnapshotFailure(m,cause) =>
      log.info(s"Failed to delete snapshot upto: ${m.sequenceNr} because: $cause")

    case ShowResults =>
      log.info(s"Show Current State: $results and lastSequenceNr : $lastSequenceNr")

    case "TakeSnapshot" =>
      log.info(s"Saving snapshot with state: $results ...")
      saveSnapshot(Snap(results, getDeliverySnapshot))

    case Boom =>
      log.info("Boom!")
      throw new RuntimeException("boom") with NoStackTrace
    case ClearJournal =>
      deleteMessages(lastSequenceNr)
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = lastSequenceNr))

  }

  override def receiveRecover: Receive = {
    case cmd: Command => updateState(cmd)
      log.info(s"Replaying command: $cmd")
    case SnapshotOffer(md,snap: Snap) =>
      log.info(s"Loading snapshot at: ${md.sequenceNr} with state: ${snap.results}")
      results = snap.results
      setDeliverySnapshot(snap.deliverySnapshot)
      log.info(s"Updated state to $results with snapshot")
    case RecoveryCompleted =>
      log.info(s"Recovery compeleted with State: $results and lastSequenceNr=$lastSequenceNr")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Aggregator restarting with reason: ${reason.getMessage}")
    super.preRestart(reason, message)
  }

  override def warnAfterNumberOfUnconfirmedAttempts = 1
}

当收到Calculator运算结果次数大于等于保留日志长度keepJournal则把快照写入一次: 

    case Result(id,res) =>
        log.info(s"Receive calculation result $res with ack id: $id")
        if ( res != 0) {
          results = res :: results
          confirmDelivery(id)
          log.info(s"Current state updated to: $results at $lastSequenceNr")
          resultsId += 1
          if (resultsId % keepJournalNr == 0) {
            resultsId = 0
            saveSnapshot(Snap(results, getDeliverySnapshot))
            log.info(s"Saving snapshot with state $results, snapshot: $getDeliverySnapshot")
          }
        }

每次快照写入成功则把之前的Journal和Snapshot都删除:

  case SaveSnapshotSuccess(m) =>
      log.info(s"Sucessfull saving snapshot: ${m} at: $lastSequenceNr")
      //clear journal and snapshot
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = m.sequenceNr - 1))
      deleteMessages(m.sequenceNr)

使用了下面这段代码来测试:

package atleastonce.demo
import atleastonce.calculation.CalcAggregator
import atleastonce.calculation.CalcAggregator._
import atleastonce.calculator.Calculator
import akka.actor._

object AtLeastOnceDemo extends App {
  val atLeastOnceSystem = ActorSystem("atleastonceSystem")

  val addActor = atLeastOnceSystem.actorOf(Calculator.props,"addActor")
  val subActor = atLeastOnceSystem.actorOf(Calculator.props,"subActor")
  val mulActor = atLeastOnceSystem.actorOf(Calculator.props,"mulActor")
  val divActor = atLeastOnceSystem.actorOf(Calculator.props,"divActor")
  var actors = Map[String,ActorRef]()
  actors += ("ADD" -> addActor)
  actors += ("SUB" -> subActor)
  actors += ("MUL" -> mulActor)
  actors += ("DIV" -> divActor)


  val aggregator = atLeastOnceSystem.actorOf(CalcAggregator.props(actors,5), "aggregator")


  aggregator ! Sub(0,0)
  aggregator ! Add(6,3)
  aggregator ! Sub(8,0)
  aggregator ! Mul(3,2)
  aggregator ! Boom
  aggregator ! Div(12,3)
  Thread.sleep(10000)
  aggregator ! ShowResults

 // aggregator ! ClearJournal


  scala.io.StdIn.readLine()
  atLeastOnceSystem.terminate()


}

连续运算几次,状态都恢复正确。 下面是本次示范的源代码: build.sbt

name := "atleastonce-delivery"

version := "1.0"

scalaVersion := "2.11.9"

sbtVersion := "0.13.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka"           %% "akka-actor"       % "2.5.3",
  "com.typesafe.akka"           %% "akka-persistence" % "2.5.3",
  "ch.qos.logback" % "logback-classic" % "1.1.7",
  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.54",
  "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.54" % Test
)

application.conf

akka {
  persistence {
    journal.plugin = "cassandra-journal"
    snapshot-store.plugin = "cassandra-snapshot-store"
  }
}
akka.actor.warn-about-java-serializer-usage = off
akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts = 1

Calculator.scala

package atleastonce.calculator
import akka.actor._
import scala.util.Random
import scala.concurrent.duration._
object Calculator {
  sealed trait Math
  case class Add(id: Long, x: Int, y: Int) extends Math
  case class Sub(id: Long, x: Int, y: Int) extends Math
  case class Mul(id: Long, x: Int, y: Int) extends Math
  case class Div(id: Long, x: Int, y: Int) extends Math

  case class Ack(id: Long, x: Int)
  case class CalcMath(expr: Math)

  def props = Props(new Calculator)

}

class Calculator extends Actor with ActorLogging {
  import Calculator._
  import context.dispatcher
  override def receive: Receive = {
    case CalcMath(expr) => {
      val delay: FiniteDuration = (100 millis) * Random.nextInt(10)
      context.system.scheduler.scheduleOnce(delay,self,expr)
    }
    case add: Add => sender() ! Ack(add.id,add.x + add.y)
    case sub: Sub => sender() ! Ack(sub.id,sub.x - sub.y)
    case mul: Mul => sender() ! Ack(mul.id,mul.x * mul.y)
    case div: Div => sender() ! Ack(div.id,div.x / div.y)
  }
}

CalcAggregator

package atleastonce.calculation
import akka.actor._
import akka.persistence._
import akka.persistence.AtLeastOnceDelivery._
import atleastonce.calculator.Calculator
import scala.util.control.NoStackTrace

object CalcAggregator {
  sealed trait Command
  case class Add(x: Int, y: Int) extends Command
  case class Sub(x: Int, y: Int) extends Command
  case class Mul(x: Int, y: Int) extends Command
  case class Div(x: Int, y: Int) extends Command
  case class Result(id: Long, res: Int) extends Command
/*
  sealed trait Event
  case class Added(x: Int, y: Int) extends Event
  case class Substracted(x: Int, y: Int) extends Event
  case class Multiplied(x: Int, y: Int) extends Event
  case class Divided(x: Int, y: Int) extends Event
  case class GotResult(id: Long, res: Int) extends Event
*/
  case class Snap(results: List[Int],
                  deliverySnapshot: AtLeastOnceDeliverySnapshot)


  case object ShowResults
  case object Boom
  case object ClearJournal


  def props(calculators: Map[String,ActorRef],keepJournalNr: Int) =
    Props(new CalcAggregator(calculators,keepJournalNr))
}

class CalcAggregator(calculators: Map[String,ActorRef],keepJournalNr: Int)
  extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
  import CalcAggregator._
  var results: List[Int] = List()
  var resultsId: Int = 0
  override def persistenceId = "calculation-actor023"

  //sending commands and update state only with delivery ack
  def updateState(cmd: Command) = {
    if (!recoveryRunning && !cmd.isInstanceOf[Result])
        log.info(s"Sending command message: $cmd at: $lastSequenceNr")

    cmd match {
      case Add(x,y) => deliver(calculators("ADD").path){id => Calculator.Add(id,x,y)}
      case Sub(x,y) => deliver(calculators("SUB").path){id => Calculator.Sub(id,x,y)}
      case Mul(x,y) => deliver(calculators("MUL").path){id => Calculator.Mul(id,x,y)}
      case Div(x,y) => deliver(calculators("DIV").path){id => Calculator.Div(id,x,y)}
      case Result(id,res) =>
        log.info(s"Receive calculation result $res with ack id: $id")
        if ( res != 0) {
          results = res :: results
          confirmDelivery(id)
          log.info(s"Current state updated to: $results at $lastSequenceNr")
          resultsId += 1
          if (resultsId % keepJournalNr == 0) {
            resultsId = 0
            saveSnapshot(Snap(results, getDeliverySnapshot))
            log.info(s"Saving snapshot with state $results, snapshot: $getDeliverySnapshot")
          }
        }
    }
  }

  override def receiveCommand: Receive = {
    case cmd: Command => persist(cmd){updateState}
    case ack: Calculator.Ack =>
      updateState(Result(ack.id,ack.x))

    case UnconfirmedWarning(unconfirmedDeliveries) =>  //cancel outstanding message
      log.info(s"UnconfirmedWarning: $unconfirmedDeliveries ...")
      unconfirmedDeliveries.foreach{u =>
        log.info(s"Cancelling unconfirmedDeliveris $u")
        confirmDelivery(u.deliveryId)}

    case SaveSnapshotSuccess(m) =>
      log.info(s"Sucessfull saving snapshot: ${m} at: $lastSequenceNr")
      //clear journal and snapshot
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = m.sequenceNr - 1))
      deleteMessages(m.sequenceNr)
    case SaveSnapshotFailure(m,cause) =>
      log.info(s"Saving snapshot failed because: ${cause}")
    case DeleteMessagesSuccess(toSeq) =>
      log.info(s"Succefull deleting journal upto: $toSeq")
    case DeleteMessagesFailure(cause,toSeq) =>
      log.info(s"Failed to delete journal upto: $toSeq because: $cause")
    case DeleteSnapshotsSuccess(crit) =>
      log.info(s"Successful delete snapshots for $crit")
    case DeleteSnapshotSuccess(m) =>
      log.info(s"Successful delete snapshot upto: ${m.sequenceNr}")
    case DeleteSnapshotsFailure(crit,cause) =>
      log.info(s"Failed to delete snapshots $crit because: $cause")
    case DeleteSnapshotFailure(m,cause) =>
      log.info(s"Failed to delete snapshot upto: ${m.sequenceNr} because: $cause")

    case ShowResults =>
      log.info(s"Show Current State: $results and lastSequenceNr : $lastSequenceNr")

    case "TakeSnapshot" =>
      log.info(s"Saving snapshot with state: $results ...")
      saveSnapshot(Snap(results, getDeliverySnapshot))

    case Boom =>
      log.info("Boom!")
      throw new RuntimeException("boom") with NoStackTrace
    case ClearJournal =>
      deleteMessages(lastSequenceNr)
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = lastSequenceNr))

  }

  override def receiveRecover: Receive = {
    case cmd: Command => updateState(cmd)
      log.info(s"Replaying command: $cmd")
    case SnapshotOffer(md,snap: Snap) =>
      log.info(s"Loading snapshot at: ${md.sequenceNr} with state: ${snap.results}")
      results = snap.results
      setDeliverySnapshot(snap.deliverySnapshot)
      log.info(s"Updated state to $results with snapshot")
    case RecoveryCompleted =>
      log.info(s"Recovery compeleted with State: $results and lastSequenceNr=$lastSequenceNr")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Aggregator restarting with reason: ${reason.getMessage}")
    super.preRestart(reason, message)
  }

  override def warnAfterNumberOfUnconfirmedAttempts = 1
}

AtLeastOnceDemo.scala

package atleastonce.demo
import atleastonce.calculation.CalcAggregator
import atleastonce.calculation.CalcAggregator._
import atleastonce.calculator.Calculator
import akka.actor._

object AtLeastOnceDemo extends App {
  val atLeastOnceSystem = ActorSystem("atleastonceSystem")

  val addActor = atLeastOnceSystem.actorOf(Calculator.props,"addActor")
  val subActor = atLeastOnceSystem.actorOf(Calculator.props,"subActor")
  val mulActor = atLeastOnceSystem.actorOf(Calculator.props,"mulActor")
  val divActor = atLeastOnceSystem.actorOf(Calculator.props,"divActor")
  var actors = Map[String,ActorRef]()
  actors += ("ADD" -> addActor)
  actors += ("SUB" -> subActor)
  actors += ("MUL" -> mulActor)
  actors += ("DIV" -> divActor)


  val aggregator = atLeastOnceSystem.actorOf(CalcAggregator.props(actors,5), "aggregator")


  aggregator ! Sub(0,0)
  aggregator ! Add(6,3)
  aggregator ! Sub(8,0)
  aggregator ! Mul(3,2)
  aggregator ! Boom
  aggregator ! Div(12,3)
  Thread.sleep(10000)
  aggregator ! ShowResults

 // aggregator ! ClearJournal



  scala.io.StdIn.readLine()
  atLeastOnceSystem.terminate()


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏老马说编程

(61) 内存映射文件及其应用 - 实现一个简单的消息队列 / 计算机程序的思维逻辑

本节介绍内存映射文件,内存映射文件不是Java引入的概念,而是操作系统提供的一种功能,大部分操作系统都支持。 我们先来介绍内存映射文件的基本概念,它是什么,能解...

2265
来自专栏扎心了老铁

springboot使用zookeeper(curator)实现注册发现与负载均衡

最简单的实现服务高可用的方法就是集群化,也就是分布式部署,但是分布式部署会带来一些问题。比如: 1、各个实例之间的协同(锁) 2、负载均衡 3、热删除 这里通过...

1.8K10
来自专栏菩提树下的杨过

Silverlight数据绑定/IValueConverter学习笔记

先回忆一下aspx中的处理: 在aspx中,可以直接在后台定义一个变量,然后前台就可以用<%=xxx%>来将其"绑定"html控件上,比如下面这样,实在是很方便...

2527
来自专栏大内老A

我的WCF之旅(5):面向服务架构(SOA)和面向对象编程(OOP)的结合——如何实现Service Contract的重载(Overloading)

对于.NET重载(Overloading)——定义不同参数列表的同名方法(顺便提一下,我们但可以在参数列表上重载方法,我们甚至可以在返回类型层面来重载我们需要的...

2286
来自专栏cmazxiaoma的架构师之路

SpringBoot之路(二)之Web进阶

2084
来自专栏大内老A

WCF后续之旅(3): WCF Service Mode Layer 的中枢—Dispatcher

在本系列的第一部分、第二部分中,我们对WCF的channel layer进行了深入的讨论。我们接下来继续讨论WCF的service mode layer。本篇文...

1717
来自专栏大内老A

WCF技术剖析之十二:数据契约(Data Contract)和数据契约序列化器(DataContractSerializer)

大部分的系统都是以数据为中心的(Data Central),功能的实现表现在对相关数据的正确处理。而数据本身,是有效信息的载体,在不同的环境具有不同的表示。一个...

3388
来自专栏https://www.cnblogs.com/L

Hadoop源码篇---解读Mapprer源码outPut输出

上次讲完MapReduce的输入后,这次开始讲MapReduce的输出。注意MapReduce的原语很重要:

943
来自专栏技术记录

shiro权限控制(二):分布式架构中shiro的实现

前言: 前段时间在搭建公司游戏框架安全验证的时候,就想到之前web最火的shiro框架,虽然后面实践发现在netty中不太适用,最后自己模仿shiro写了一个缩...

4567
来自专栏分布式系统和大数据处理

Go语言Tips

转型到Go以后,因为语言的不熟悉,以往很常见的一些操作有时候也需要去Google一下。这里将一些结果记录下来,方便日后查阅。

1102

扫码关注云+社区