*
   * INTERNAL API.   */
  private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =    if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this

  /**
   * INTERNAL API.   */ http://www.cnblogs.com/tiger-xc/p/7215898.htmlLong,http://www.cnblogs.com/tiger-xc/p/7215898.html
  
  
  
  
  
  http://www.cnblogs.com/tiger-xc/p/7215898.html
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
             minSequenceNr: Long, minTimestamp: Long) =
    SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp)  /**
   * Java API.   */
  def latest() = Latest  /**
   * Java API.   */
  def none() = None
}/**
 * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]].
 *
 * @param metadata snapshot metadata.
 * @param snapshot snapshot. */final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)object SelectedSnapshot {  /**
   * Java API, Plugin API.   */
  def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot =
    SelectedSnapshot(metadata, snapshot)
}/**
 * INTERNAL API.
 *
 * Defines messages exchanged between persistent actors and a snapshot store. */private[persistence] object SnapshotProtocol {  /** Marker trait shared by internal snapshot messages. */
  sealed trait Message extends Protocol.Message  /** Internal snapshot command. */
  sealed trait Request extends Message  /** Internal snapshot acknowledgement. */
  sealed trait Response extends Message  /**
   * Instructs a snapshot store to load a snapshot.
   *
   * @param persistenceId persistent actor id.
   * @param criteria criteria for selecting a snapshot from which recovery should start.
   * @param toSequenceNr upper sequence number bound (inclusive) for recovery.   */
  final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
    extends Request  /**
   * Response message to a [[LoadSnapshot]] message.
   *
   * @param snapshot loaded snapshot, if any.   */
  final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
    extends Response  /**
   * Reply message to a failed [[LoadSnapshot]] request.
   * @param cause failure cause.   */
  final case class LoadSnapshotFailed(cause: Throwable) extends Response  /**
   * Instructs snapshot store to save a snapshot.
   *
   * @param metadata snapshot metadata.
   * @param snapshot snapshot.   */
  final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
    extends Request  /**
   * Instructs snapshot store to delete a snapshot.
   *
   * @param metadata snapshot metadata.   */
  final case class DeleteSnapshot(metadata: SnapshotMetadata)
    extends Request  /**
   * Instructs snapshot store to delete all snapshots that match `criteria`.
   *
   * @param persistenceId persistent actor id.
   * @param criteria criteria for selecting snapshots to be deleted.   */
  final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria)
    extends Request
}

大学生就业培训,高中生培训,在职人员转行培训,企业团训

好了,下面我们再分析一下这个事件来源模式(event-sourcing):首先对指令消息进行验证,然后把指令转换成事件,把事件写入日志然后再更新内部状态。其中验证是为了保证该事件在更新内部状态时不会出现异常。这个验证必须在事件写入日志前,否则造成异常的事件就会被写入日志,就会在恢复时的事件重演中重复发生异常。我们用个例子来示范如何来设计指令消息到事件的转换的:

大学生就业培训,高中生培训,在职人员转行培训,企业团训

import akka.actor._
import akka.persistence._object Calculator {  sealed trait Command  case class Operand(x: Int) extends Command  case class Add(x: Int) extends Command  case class Sub(x: Int) extends Command  case class Mul(x: Int) extends Command  case class Div(x: Int) extends Command  case class ShowResult(x: Double) extends Command  sealed trait Event  case class SetNum(x: Int) extends Event  case class Added(x: Int) extends Event  case class Subtracted(x: Int) extends Event  case class Multiplied(x: Int) extends Event  case class Divided(x: Int) extends Event  case class State(result: Int) {
    def updateState(evt: Event): State = evt match {      case SetNum(x) => copy(result = x)      case Added(x) => copy(result = this.result + x)      case Subtracted(x) => copy(result = this.result - x)      case Multiplied(x) => copy(result = this.result * x)      case Divided(x) => copy(result = this.result / x)
    }
  }
}class Calculator extends PersistentActor with ActorLogging {
import Calculator._  var state: State = State(0)  override def persistenceId: String = "persistence-actor"
  override def receiveCommand: Receive = {    case Operand(x) =>
      persist(SetNum(x)){evt => state = state.updateState(evt)}    case Add(x) =>
      persist(Added(x)){evt => state = state.updateState(evt)}    case Sub(x) =>
      persist(Subtracted(x)){evt => state = state.updateState(evt)}    case Mul(x) =>
      persist(Multiplied(x)){evt => state = state.updateState(evt)}    case Divided(x) if (x != 0) =>
      persist(Added(x)){evt => state = state.updateState(evt)}
  }  override def receiveRecover: Receive = {    case evt: Event => state = state.updateState(evt)    case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result)
  }
}

大学生就业培训,高中生培训,在职人员转行培训,企业团训

以上代码基本上进行了Command和Event的直接对应。这是一种比较直观的关系对应方式。我们注意到只有在收到Div(x)指令时才进行了指令验证(x == 0)。因为这个例子比较简单,所以我们可以肯定只有指令Div对状态进行施用时才有可能造成异常。这样才能使我们比较直观地进行Command与Event关系对应。假如对内部状态的更新涉及到一些非常复杂的算法,我们无法肯定哪个指令会产生异常,那我们只有先进行运算指令得出一个结果,然后直接替换状态,这个动作肯定是安全的了。按这个逻辑,我们把上面的例子调整一下: 

大学生就业培训,高中生培训,在职人员转行培训,企业团训

import akka.actor._
import akka.persistence._object Calculator {  sealed trait Command  case class Operand(x: Int) extends Command  case class Add(x: Int) extends Command  case class Sub(x: Int) extends Command  case class Mul(x: Int) extends Command  case class Div(x: Int) extends Command  case class ShowResult(x: Int) extends Command  sealed trait Event  case class SetResult(x: Int) extends Event

  def getResult(res: Int, cmd: Command): Int = cmd match {    case Operand(x) => x    case Add(x) => res + x    case Sub(x) => res - x    case Mul(x) => res * x    case Div(x) => res / x    case _ => 0
  }  case class State(result: Int) {
    def updateState(evt: Event): State = evt match {      case SetResult(x) => copy(result = x)
    }
  }
}class Calculator extends PersistentActor with ActorLogging {
import Calculator._  var state: State = State(0)  override def persistenceId: String = "persistence-actor"
  override def receiveCommand: Receive = {    case opr: Operand =>
      persist(SetResult(getResult(state.result,opr)))(evt => state = state.updateState(evt))    case add: Add =>
      persist(SetResult(getResult(state.result,add)))(evt => state = state.updateState(evt))    case sub: Sub =>
      persist(SetResult(getResult(state.result,sub)))(evt => state = state.updateState(evt))    case mul: Mul =>
      persist(SetResult(getResult(state.result,mul)))(evt => state = state.updateState(evt))    case div: Div =>
      persist(SetResult(getResult(state.result,div)))(evt => state = state.updateState(evt))

  }  override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
    log.info(s"Persistence Error: ${cause.getMessage}")
  }  override def receiveRecover: Receive = {    case evt: Event => state = state.updateState(evt)    case SnapshotOffer(_, sts: State) => state = sts.copy(sts.result)
  }
}

大学生就业培训,高中生培训,在职人员转行培训,企业团训

现在在写入日志前进行了验证。注意如果验证结果异常可以在onPersistFailure中处理。

下面我们就在Calculator里增加一些消息发布机制把恢复过程、消息处理过程和当前状态都发布出去:

大学生就业培训,高中生培训,在职人员转行培训,企业团训

class Calculator extends PersistentActor with ActorLogging {
  import Calculator._  var state: State = State(0)  override def persistenceId: String = "persistence-actor"
  val snapShotInterval = 5
  override def receiveCommand: Receive = {    case Operand(x) => persist(SetNum(x))(handleEvent)    case Add(x) => persist(Added(x))(handleEvent)    case Sub(x) => persist(Subtracted(x))(handleEvent)    case Mul(x) => persist(Multiplied(x))(handleEvent)    case Div(x) if (x != 0) => persist(Divided(x))(handleEvent)    case ShowResult =>
      context.system.eventStream.publish(LogMessage(s"Current state: $state"))    case BackupResult =>
      saveSnapshot(state)
      context.system.eventStream.publish(LogMessage(s"Manual saving snapshot: $state"))    case SaveSnapshotSuccess(metadata) =>
      context.system.eventStream.publish(LogMessage(s"Successfully saved state: $state"))    case SaveSnapshotFailure(metadata, reason) =>
      context.system.eventStream.publish(LogMessage(s"Saving state: $state failed!"))

  }
  def handleEvent(evt: Event) = {   //update state and publish progress
    state = state.updateState(evt)
    context.system.eventStream.publish(LogMessage(s"Logged event: $evt"))    if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) {
      saveSnapshot(state)
      context.system.eventStream.publish(LogMessage(s"Saving snapshot: $state after $snapShotInterval events"))
    }
  }  override def receiveRecover: Receive = {    case evt: Event => {
      state = state.updateState(evt)
      context.system.eventStream.publish(LogMessage(s"Restoring event: $evt"))
    }    case SnapshotOffer(mdata, sts: State) => {
      state = sts.copy(sts.result)
      context.system.eventStream.publish(LogMessage(s"Restoring snapshot: $mdata"))
    }    case RecoveryCompleted => log.info(s"Recovery completed with starting state: $state")
  }  override def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
    log.info(s"Persistence Rejected: ${cause.getMessage}")
  }  override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
    log.info(s"Persistence Error: ${cause.getMessage}")
  }
}

大学生就业培训,高中生培训,在职人员转行培训,企业团训

发布消息内容已经可以说明每段代码的作用。

下面是测试运算Calculator的代码:

大学生就业培训,高中生培训,在职人员转行培训,企业团训

package persistence.demo
import akka.actor._
import persistence.calculator.Calculator
import persistence.tracker.EventTrackerobject persistenceDemo extends App {
  val persistenceSystem = ActorSystem("persistenceSystem")

http://www.cnblogs.com/tiger-xc/p/7215898.html