* * INTERNAL API. */ private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria = if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this /** * INTERNAL API. */ http://www.cnblogs.com/tiger-xc/p/7215898.htmlLong,http://www.cnblogs.com/tiger-xc/p/7215898.html http://www.cnblogs.com/tiger-xc/p/7215898.html minSequenceNr: Long, minTimestamp: Long) = SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp) /** * Java API. */ def latest() = Latest /** * Java API. */ def none() = None }/** * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]]. * * @param metadata snapshot metadata. * @param snapshot snapshot. */final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)object SelectedSnapshot { /** * Java API, Plugin API. */ def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot = SelectedSnapshot(metadata, snapshot) }/** * INTERNAL API. * * Defines messages exchanged between persistent actors and a snapshot store. */private[persistence] object SnapshotProtocol { /** Marker trait shared by internal snapshot messages. */ sealed trait Message extends Protocol.Message /** Internal snapshot command. */ sealed trait Request extends Message /** Internal snapshot acknowledgement. */ sealed trait Response extends Message /** * Instructs a snapshot store to load a snapshot. * * @param persistenceId persistent actor id. * @param criteria criteria for selecting a snapshot from which recovery should start. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. */ final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) extends Request /** * Response message to a [[LoadSnapshot]] message. * * @param snapshot loaded snapshot, if any. */ final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long) extends Response /** * Reply message to a failed [[LoadSnapshot]] request. * @param cause failure cause. */ final case class LoadSnapshotFailed(cause: Throwable) extends Response /** * Instructs snapshot store to save a snapshot. * * @param metadata snapshot metadata. * @param snapshot snapshot. */ final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any) extends Request /** * Instructs snapshot store to delete a snapshot. * * @param metadata snapshot metadata. */ final case class DeleteSnapshot(metadata: SnapshotMetadata) extends Request /** * Instructs snapshot store to delete all snapshots that match `criteria`. * * @param persistenceId persistent actor id. * @param criteria criteria for selecting snapshots to be deleted. */ final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria) extends Request }
好了,下面我们再分析一下这个事件来源模式(event-sourcing):首先对指令消息进行验证,然后把指令转换成事件,把事件写入日志然后再更新内部状态。其中验证是为了保证该事件在更新内部状态时不会出现异常。这个验证必须在事件写入日志前,否则造成异常的事件就会被写入日志,就会在恢复时的事件重演中重复发生异常。我们用个例子来示范如何来设计指令消息到事件的转换的:
import akka.actor._ import akka.persistence._object Calculator { sealed trait Command case class Operand(x: Int) extends Command case class Add(x: Int) extends Command case class Sub(x: Int) extends Command case class Mul(x: Int) extends Command case class Div(x: Int) extends Command case class ShowResult(x: Double) extends Command sealed trait Event case class SetNum(x: Int) extends Event case class Added(x: Int) extends Event case class Subtracted(x: Int) extends Event case class Multiplied(x: Int) extends Event case class Divided(x: Int) extends Event case class State(result: Int) { def updateState(evt: Event): State = evt match { case SetNum(x) => copy(result = x) case Added(x) => copy(result = this.result + x) case Subtracted(x) => copy(result = this.result - x) case Multiplied(x) => copy(result = this.result * x) case Divided(x) => copy(result = this.result / x) } } }class Calculator extends PersistentActor with ActorLogging { import Calculator._ var state: State = State(0) override def persistenceId: String = "persistence-actor" override def receiveCommand: Receive = { case Operand(x) => persist(SetNum(x)){evt => state = state.updateState(evt)} case Add(x) => persist(Added(x)){evt => state = state.updateState(evt)} case Sub(x) => persist(Subtracted(x)){evt => state = state.updateState(evt)} case Mul(x) => persist(Multiplied(x)){evt => state = state.updateState(evt)} case Divided(x) if (x != 0) => persist(Added(x)){evt => state = state.updateState(evt)} } override def receiveRecover: Receive = { case evt: Event => state = state.updateState(evt) case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result) } }
以上代码基本上进行了Command和Event的直接对应。这是一种比较直观的关系对应方式。我们注意到只有在收到Div(x)指令时才进行了指令验证(x == 0)。因为这个例子比较简单,所以我们可以肯定只有指令Div对状态进行施用时才有可能造成异常。这样才能使我们比较直观地进行Command与Event关系对应。假如对内部状态的更新涉及到一些非常复杂的算法,我们无法肯定哪个指令会产生异常,那我们只有先进行运算指令得出一个结果,然后直接替换状态,这个动作肯定是安全的了。按这个逻辑,我们把上面的例子调整一下:
import akka.actor._ import akka.persistence._object Calculator { sealed trait Command case class Operand(x: Int) extends Command case class Add(x: Int) extends Command case class Sub(x: Int) extends Command case class Mul(x: Int) extends Command case class Div(x: Int) extends Command case class ShowResult(x: Int) extends Command sealed trait Event case class SetResult(x: Int) extends Event def getResult(res: Int, cmd: Command): Int = cmd match { case Operand(x) => x case Add(x) => res + x case Sub(x) => res - x case Mul(x) => res * x case Div(x) => res / x case _ => 0 } case class State(result: Int) { def updateState(evt: Event): State = evt match { case SetResult(x) => copy(result = x) } } }class Calculator extends PersistentActor with ActorLogging { import Calculator._ var state: State = State(0) override def persistenceId: String = "persistence-actor" override def receiveCommand: Receive = { case opr: Operand => persist(SetResult(getResult(state.result,opr)))(evt => state = state.updateState(evt)) case add: Add => persist(SetResult(getResult(state.result,add)))(evt => state = state.updateState(evt)) case sub: Sub => persist(SetResult(getResult(state.result,sub)))(evt => state = state.updateState(evt)) case mul: Mul => persist(SetResult(getResult(state.result,mul)))(evt => state = state.updateState(evt)) case div: Div => persist(SetResult(getResult(state.result,div)))(evt => state = state.updateState(evt)) } override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = { log.info(s"Persistence Error: ${cause.getMessage}") } override def receiveRecover: Receive = { case evt: Event => state = state.updateState(evt) case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result) } }
现在在写入日志前进行了验证。注意如果验证结果异常可以在onPersistFailure中处理。
下面我们就在Calculator里增加一些消息发布机制把恢复过程、消息处理过程和当前状态都发布出去:
class Calculator extends PersistentActor with ActorLogging { import Calculator._ var state: State = State(0) override def persistenceId: String = "persistence-actor" val snapShotInterval = 5 override def receiveCommand: Receive = { case Operand(x) => persist(SetNum(x))(handleEvent) case Add(x) => persist(Added(x))(handleEvent) case Sub(x) => persist(Subtracted(x))(handleEvent) case Mul(x) => persist(Multiplied(x))(handleEvent) case Div(x) if (x != 0) => persist(Divided(x))(handleEvent) case ShowResult => context.system.eventStream.publish(LogMessage(s"Current state: $state")) case BackupResult => saveSnapshot(state) context.system.eventStream.publish(LogMessage(s"Manual saving snapshot: $state")) case SaveSnapshotSuccess(metadata) => context.system.eventStream.publish(LogMessage(s"Successfully saved state: $state")) case SaveSnapshotFailure(metadata, reason) => context.system.eventStream.publish(LogMessage(s"Saving state: $state failed!")) } def handleEvent(evt: Event) = { //update state and publish progress state = state.updateState(evt) context.system.eventStream.publish(LogMessage(s"Logged event: $evt")) if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) { saveSnapshot(state) context.system.eventStream.publish(LogMessage(s"Saving snapshot: $state after $snapShotInterval events")) } } override def receiveRecover: Receive = { case evt: Event => { state = state.updateState(evt) context.system.eventStream.publish(LogMessage(s"Restoring event: $evt")) } case SnapshotOffer(mdata, sts: State) => { state = sts.copy(sts.result) context.system.eventStream.publish(LogMessage(s"Restoring snapshot: $mdata")) } case RecoveryCompleted => log.info(s"Recovery completed with starting state: $state") } override def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { log.info(s"Persistence Rejected: ${cause.getMessage}") } override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = { log.info(s"Persistence Error: ${cause.getMessage}") } }
发布消息内容已经可以说明每段代码的作用。
下面是测试运算Calculator的代码:
package persistence.demo import akka.actor._ import persistence.calculator.Calculator import persistence.tracker.EventTrackerobject persistenceDemo extends App { val persistenceSystem = ActorSystem("persistenceSystem")
http://www.cnblogs.com/tiger-xc/p/7215898.html