在很多应用场景中都会出现在系统中需要某类Actor的唯一实例(only instance)。这个实例在集群环境中可能在任何一个节点上,但保证它是唯一的。Akka的Cluster-Singleton提供对这种Singleton Actor模式的支持,能做到当这个实例所在节点出现问题需要脱离集群时自动在另一个节点上构建一个同样的Actor,并重新转交控制。当然,由于涉及了一个新构建的Actor,内部状态会在这个过程中丢失。Single-Actor的主要应用包括某种对外部只能支持一个接入的程序接口,或者一种带有由多个其它Actor运算结果产生的内部状态的累积型Actor(aggregator)。当然,如果使用一种带有内部状态的Singleton-Actor,可以考虑使用PersistenceActor来实现内部状态的自动恢复。如此Cluster-Singleton变成了一种非常实用的模式,可以在许多场合下应用。

  Cluster-Singleton模式也恰恰因为它的唯一性特点存在着一些隐忧,需要特别关注。唯一性容易造成的隐忧包括:容易造成超负荷、无法保证稳定在线、无法保证消息投递。这些需要用户在编程时增加特别处理。

好了,我们设计个例子来了解Cluster-Singleton,先看看Singleton-Actor的功能:

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

class SingletonActor extends PersistentActor with ActorLogging {
  import SingletonActor._
  val cluster = Cluster(context.system)  var freeHoles = 0
  var freeTrees = 0
  var ttlMatches = 0

  override def persistenceId = self.path.parent.name + "-" + self.path.name

  def updateState(evt: Event): Unit = evt match {    case AddHole =>      if (freeTrees > 0) {
        ttlMatches += 1
        freeTrees -= 1
      } else freeHoles += 1
    case AddTree =>      if (freeHoles > 0) {
        ttlMatches += 1
        freeHoles -= 1
      } else freeTrees += 1

  }  override def receiveRecover: Receive = {    case evt: Event => updateState(evt)    case SnapshotOffer(_,ss: State) =>
      freeHoles = ss.nHoles
      freeTrees = ss.nTrees
      ttlMatches = ss.nMatches
  }  override def receiveCommand: Receive = {    case Dig =>
      persist(AddHole){evt =>
        updateState(evt)
      }
      sender() ! AckDig   //notify sender message received
      log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches")    case Plant =>
      persist(AddTree) {evt =>
        updateState(evt)
      }
      sender() ! AckPlant   //notify sender message received
      log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches")    case Disconnect =>  //this node exits cluster. expect switch to another node
      log.info(s"${cluster.selfAddress} is leaving cluster ...")
      cluster.leave(cluster.selfAddress)    case CleanUp =>      //clean up ...
      self ! PoisonPill
  }

}

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

这个SingletonActor就是一种特殊的Actor,它继承了PersistentActor,所以需要实现PersistentActor的抽象函数。SingletonActor维护了几个内部状态,分别是各类运算的当前累积结果freeHoles,freeTrees,ttlMatches。SingletonActor模拟的是一个种树场景:当收到Dig指令后产生登记树坑AddHole事件,在这个事件中更新当前状态值;当收到Plant指令后产生AddTree事件并更新状态。因为Cluster-Singleton模式无法保证消息安全投递所以应该加个回复机制AckDig,AckPlant让消息发送者可用根据情况补发消息。我们是用Cluster.selfAddress来确认当前集群节点的转换。

我们需要在所有承载SingletonActor的集群节点上构建部署ClusterSingletonManager,如下:

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

  def create(port: Int) = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
        .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]"))
        .withFallback(ConfigFactory.load())
    val singletonSystem = ActorSystem("SingletonClusterSystem",config)

    startupSharedJournal(singletonSystem, (port == 2551), path =
        ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store"))

    val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props(
      singletonProps = Props[SingletonActor],
      terminationMessage = CleanUp,
      settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton"))
    ), name = "singletonManager")

  }

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

可以看的出来,ClusterSingletonManager也是一种Actor,通过ClusterSingletonManager.props配置其所管理的SingletonActor。我们的目的主要是去求证当前集群节点出现故障需要退出集群时,这个SingletonActor是否能够自动转移到其它在线的节点上。ClusterSingletonManager的工作原理是首先在所有选定的集群节点上构建和部署,然后在最先部署的节点上启动SingletonActor,当这个节点不可使用时(unreachable)自动在次先部署的节点上重新构建部署SingletonActor。

同样作为一种Actor,ClusterSingletonProxy是通过与ClusterSingletonManager消息沟通来调用SingletonActor的。ClusterSingletonProxy动态跟踪在线的SingletonActor,为用户提供它的ActorRef。我们可以通过下面的代码来具体调用SingletonActor:

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

object SingletonUser {
  def create = {
    val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]")
      .withFallback(ConfigFactory.load())
    val suSystem = ActorSystem("SingletonClusterSystem",config)

    val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props(
      singletonManagerPath = "/user/singletonManager",
      settings = ClusterSingletonProxySettings(suSystem).withRole(None)
    ), name= "singletonUser")


    import suSystem.dispatcher    //send Dig messages every 2 seconds to SingletonActor through prox
    suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig)    //send Plant messages every 3 seconds to SingletonActor through prox
    suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant)    //send kill message to hosting node every 30 seconds
    suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect)
  }

}

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

我们分不同的时间段通过ClusterSingletonProxy向SingletonActor发送Dig和Plant消息。然后每隔30秒向SingletonActor发送一个Disconnect消息通知它所在节点开始脱离集群。然后我们用下面的代码来试着运行:

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

package clustersingleton.demo

import clustersingleton.sa.SingletonActor
import clustersingleton.frontend.SingletonUserobject ClusterSingletonDemo extends App {

  SingletonActor.create(2551)    //seed-node  
  SingletonActor.create(0)   //ClusterSingletonManager node
  SingletonActor.create(0)
  SingletonActor.create(0)
  SingletonActor.create(0)

  SingletonUser.create     //ClusterSingletonProxy node}

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

运算结果如下:

移动开发培训,Android培训,安卓培训,手机开发培训,手机维修培训,手机软件培训

[INFO] [07/09/2017 20:17:28.210] [main] [akka.remote.Remoting] Starting remoting
[INFO] [07/09/2017 20:17:28.334] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:2551][INFO] [07/09/2017 20:17:28.489] [main] [akka.remote.Remoting] Starting remoting
[INFO] [07/09/2017 20:17:28.493] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55839][INFO] [07/09/2017 20:17:28.514] [main] [akka.remote.Remoting] Starting remoting
[INFO] [07/09/2017 20:17:28.528] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55840][INFO] [07/09/2017 20:17:28.566] [main] [akka.remote.Remoting] Starting remoting
[INFO] [07/09/2017 20:17:28.571] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55841][INFO] [07/09/2017 20:17:28.595] [main] [akka.remote.Remoting] Starting remoting
[INFO] [07/09/2017 20:17:28.600] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55842][INFO] [07/09/2017 20:17:28.620] [main] [akka.remote.Remoting] Starting remoting
[INFO] [07/09/2017 20:17:28.624] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55843][INFO] [07/09/2017 20:17:28.794] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton][INFO] [07/09/2017 20:17:28.817] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=0,ttlMatches=0[INFO] [07/09/2017 20:17:29.679] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=1,freeTrees=0,ttlMatches=0...
[INFO] [07/09/2017 20:17:38.676] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:2551 is leaving cluster ...[INFO] [07/09/2017 20:17:39.664] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=4[INFO] [07/09/2017 20:17:40.654] [SingletonClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=2,ttlMatches=4[INFO] [07/09/2017 20:17:41.664] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=5[INFO] [07/09/2017 20:17:42.518] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton][INFO] [07/09/2017 20:17:43.653] [SingletonClusterSystem-akka.actor.default-dispatcher-19] 
http://www.cnblogs.com/tiger-xc/p/7144879.html