以下是代码:
import akka.persistence._
import akka.actor.{Actor, ActorRef, ActorSystem, Props, ActorLogging}
class Counter extends PersistentActor with ActorLogging {
import Counter._
var state: State = new State(0)
override def receiveRecover: Receive = {
case RecoveryCompleted => println("Recovery completed.")
case SnapshotOffer(_, snapshot: State) => state = snapshot
case op: Operation => updateState(op)
}
override def persistenceId: String = "counter-persistent"
override def receiveCommand: Receive = {
case op: Operation =>
println(s"Counter receive ${op}")
persist(op) {
op => updateState(op)
}
case "print" => println(s"The current state of couter is ${state}")
case SaveSnapshotFailure(_, reason) => println(s"save snapshot failed, reason: ${reason}")
case SaveSnapshotSuccess(_) => println(s"snapshot saved")
}
def updateState(op: Operation): Unit = op match {
case Increment(n) =>
state = state.inc(n)
takeSnapshot
case Decrement(n) =>
state = state.dec(n)
takeSnapshot
}
def takeSnapshot: Unit = {
// if (state % 5 == 0) saveSnapshot()
saveSnapshot()
}
}
object Counter {
sealed trait Operation {
val count: Int
}
case class Increment(override val count: Int) extends Operation
case class Decrement(override val count: Int) extends Operation
final case class State(n: Int) {
def inc(x: Int) = State(n + x)
def dec(x: Int) = State(n - x)
}
}
object Persistent extends App {
import Counter._
val system = ActorSystem("persistent-actors")
val counter = system.actorOf(Props[Counter])
counter ! Increment(3)
counter ! Increment(5)
counter ! Decrement(3)
counter ! "print"
Thread.sleep(1000)
system.terminate()
}配置(application.conf):
akka {
persistence {
journal {
plugin = "akka.persistence.journal.leveldb",
leveldb {
dir = "target/example/journal",
native = false
}
},
snapshot-store {
plugin = "akka.persistence.snapshot-store.local",
local {
dir = "target/example/snapshots"
}
}
}
}运行该应用程序两次表明该状态根本不持久:
Recovery completed.
Counter receive Increment(3)
Counter receive Increment(5)
Counter receive Decrement(3)
The current state of couter is State(5)
snapshot saved
snapshot saved
snapshot saved
Recovery completed.
Counter receive Increment(3)
Counter receive Increment(5)
Counter receive Decrement(3)
The current state of couter is State(5)
snapshot saved
snapshot saved
snapshot saved为什么?
发布于 2016-04-03 08:18:59
这里的问题是,在参与者接收到的每一条操作消息之后,您都要获取快照,但是在获取快照时,您并没有保存它的状态。如果仔细查看您的takeSnapshot代码:
def takeSnapshot: Unit = {
// if (state % 5 == 0) saveSnapshot()
saveSnapshot()
}对saveSnapshot()的调用不会快照您的状态,因为没有传递给它的参数。
您需要修改takeSnapshot方法,如下所示:
def takeSnapshot: Unit = {
// if (state % 5 == 0) saveSnapshot()
saveSnapshot(state) // Pass the states you need to store while taking a snapshot.
}这会管用的。
https://stackoverflow.com/questions/36313962
复制相似问题