Akka-Cluster(1)- Cluster Singleton 单例节点

关于cluster-singleton我在前面的博文已经介绍过,在这篇我想回顾一下它的作用和使用方法。首先,cluster-singleton就是集群某个节点上的一个actor。任何时间在集群内保证只会有一个这种actor的实例。它可以是在任何节点上,具体位置由akka-cluster系统的leader节点根据一定规则选定。当cluster-singleton所处的节点停止运作时leader会选择另一个节点,然后系统会将cluster-singleton迁移到新的节点上来保证集群中一定有一个活着的cluster-singleton实例,不过值得注意的是迁移的actor会丢失它的内部状态。在编程实践中常常会需要保证一项程序功能只能由唯一的actor来运行的情况,比如我们需要保证某种运算的顺序,这时在集群环境里就可以使用cluster-singleton了。下面是cluster-singleton可能的一些使用场景:

1、在集群中的一个单点运算决策角色,或者是集群各节点交互运算的协调角色

2、集群与外界软件唯一的接口点

3、单一主角,多个从属

4、中央命名机制,或者中央路由逻辑

cluster-singleton的工作原理是:在集群的所有节点上都部署一个能产生、启动某singleton类型的ClusterSingletonManager,这样可以保证singleton可以迁移到任何节点。集群中的leader节点动态决定singleton的具体生存节点并指示该节点上的ClusterSingletonManager创建singleton实例。其它actor与singleton的交互是通过这个singleton类型的ClusterSingletonProxy进行的,这是cluster系统提供的一项与singleton交互的渠道服务,在需要接触singleton时可以创建ClusterSingletonProxy实例,然后把它当作目标发送操作消息,这样就可以保证cluster-singleton的位置透明特性了。

下面我们就用个简单的例子来示范cluster-singleton的使用看看它的唯一性和自动迁移特点:

构建一个简单的actor:

class SingletonActor extends Actor with ActorLogging {
  import SingletonActor._
  import akka.cluster._
  override def receive: Receive = {
    case Greeting(msg) =>
      log.info("*********got {} from {}********", msg, sender().path.address)
    case Relocate =>
      log.info("*********I'll move from {}********", self.path.address)
      val cluster = Cluster(context.system)
      cluster.leave(cluster.selfAddress)
    case Die =>
      log.info("*******I'm shutting down ... ********")
      self ! PoisonPill
  }
}

把SingletonActor包嵌在ClusterSingletonManager里:

bject SingletonActor {
  trait SingletonMsg {}
  case class Greeting(msg: String) extends SingletonMsg
  case object Relocate extends SingletonMsg
  case object Die extends SingletonMsg

  def props = Props(new SingletonActor)

  def createSingleton(port: Int) = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
      .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]"))
      .withFallback(ConfigFactory.load())
    val singletonSystem = ActorSystem("ClusterSingletonSystem",config)

    val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props(
      singletonProps = props,
      terminationMessage = Die,
      settings = ClusterSingletonManagerSettings(singletonSystem)
        .withRole(Some("singleton"))   //只部署在角色是singleton的节点上
    ),
      name = "singletonManager"
    )
  }

}

注意:singletonManager就是一个actor,所以是用actorOf(...)来构建的。现在这个singletonManager只能部署在singleton角色的节点上。

调用SingletonActor是通过ClusterSingletonProxy进行的:

object SingletonUser {
  import SingletonActor._
  def sendToSingleton(msg: SingletonMsg) = {
    val config = ConfigFactory.parseString("akka.cluster.roles=[greeter]")
      .withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSingletonSystem",config)
    val singletonProxy = system.actorOf(ClusterSingletonProxy.props(
      "user/singletonManager",
      ClusterSingletonProxySettings(system).withRole(None)
    ))

    singletonProxy ! msg
  }
}

withRole(None)代表singletonProxy可以部署在任何节点上。下面是测试代码:

import SingletonActor._
import SingletonUser._
object SingletonDemo extends App {

  createSingleton(2551)    //seednode
  createSingleton(2552)
  createSingleton(2553)
  createSingleton(2554)

  scala.io.StdIn.readLine()

  sendToSingleton(Greeting("hello from tiger"))
  scala.io.StdIn.readLine()

  sendToSingleton(Relocate)
  scala.io.StdIn.readLine()

  sendToSingleton(Greeting("hello from cat"))
  scala.io.StdIn.readLine()

  sendToSingleton(Die)
  scala.io.StdIn.readLine()

}

检验一下输出简要:

[INFO] [10/25/2018 13:41:25.642] [ClusterSingletonSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSingletonSystem@127.0.0.1:51660/user/$a] Singleton identified at [akka.tcp://ClusterSingletonSystem@127.0.0.1:2551/user/singletonManager/singleton]
[INFO] [10/25/2018 13:41:25.654] [ClusterSingletonSystem-akka.actor.default-dispatcher-20] [akka.tcp://ClusterSingletonSystem@127.0.0.1:2551/user/singletonManager/singleton] *********got hello from tiger from akka.tcp://ClusterSingletonSystem@127.0.0.1:51660********


[INFO] [10/25/2018 13:43:10.839] [ClusterSingletonSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSingletonSystem@127.0.0.1:2551/user/singletonManager/singleton] *********I'll move from akka://ClusterSingletonSystem********
INFO] [10/25/2018 13:43:18.885] [ClusterSingletonSystem-akka.actor.default-dispatcher-2] [akka.tcp://ClusterSingletonSystem@127.0.0.1:51670/user/$a] Singleton identified at [akka.tcp://ClusterSingletonSystem@127.0.0.1:2552/user/singletonManager/singleton]
[INFO] [10/25/2018 13:43:18.886] [ClusterSingletonSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSingletonSystem@127.0.0.1:2552/user/singletonManager/singleton] *********got hello from cat from akka.tcp://ClusterSingletonSystem@127.0.0.1:51670********

[INFO] [10/25/2018 13:43:18.156] [ClusterSingletonSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSingletonSystem@127.0.0.1:2551/user/singletonManager/singleton] *******I'm shutting down ... ********
[INFO] [10/25/2018 13:43:18.177] [ClusterSingletonSystem-akka.remote.default-remote-dispatcher-18] [akka.tcp://ClusterSingletonSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [10/25/2018 13:43:18.178] [ClusterSingletonSystem-akka.remote.default-remote-dispatcher-18] [akka.tcp://ClusterSingletonSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [10/25/2018 13:43:18.215] [ClusterSingletonSystem-akka.remote.default-remote-dispatcher-14] [akka.tcp://ClusterSingletonSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏nnngu

经典Java面试题收集(二)

经典的Java面试题(第二部分),这部分主要是与Java Web和Web Service相关的面试题。 96、阐述Servlet和CGI的区别? 答:Servl...

5078
来自专栏xingoo, 一个梦想做发明家的程序员

如何在Java应用中提交Spark任务?

最近看到有几个Github友关注了Streaming的监控工程——Teddy,所以思来想去还是优化下代码,不能让别人看笑话啊。于是就想改一下之前觉得最丑陋的一...

6446
来自专栏Python专栏

无所不能的Python,这次把手机APP也攻占了

2594
来自专栏技术博文

Memcache

Memcached概念:     Memcached是一个免费开源的,高性能的,具有分布式对象的缓存系统,它可以用来保存一些经常存取的对象或数据,保存的数据像一...

4014
来自专栏大数据挖掘DT机器学习

Python爬虫:抓取手机APP的传输数据

原文 http://my.oschina.net/jhao104/blog/606922 大多数APP里面返回的是json格式数据,或者一堆加密过的数据 。这里...

3924
来自专栏老码农专栏

原 荐 RESTFul 服务测试自动化的艺术

1573
来自专栏JAVA高级架构

2017 年你不能错过的 Java 类库

各位读者好, 这篇文章是在我看过 Andres Almiray 的一篇介绍文后,整理出来的。 因为内容非常好,我便将它整理成参考列表分享给大家, 同时附上各个库...

2888
来自专栏gnnngu

经典Java面试题收集(二)

答:Servlet与CGI的区别在于Servlet处于服务器进程中,它通过多线程方式运行其service()方法,一个实例可以服务于多个请求,并且其实例一般不会...

94020
来自专栏函数式编程语言及工具

Akka-Cluster(0)- 分布式应用开发的一些想法

  当我初接触akka-cluster的时候,我有一个梦想,希望能充分利用actor自由分布、独立运行的特性实现某种分布式程序。这种程序的计算任务可以进行人为的...

1353
来自专栏java、Spring、技术分享

记一次unable to create new native thread错误处理过程

unable to create new native thread,看到这里,首先想到的是让运维搞一份线上的线程堆栈(可能通过jstack命令搞定的)。...

8361

扫码关注云+社区

领取腾讯云代金券