Akka-Cluster(0)- 分布式应用开发的一些想法

  当我初接触akka-cluster的时候,我有一个梦想,希望能充分利用actor自由分布、独立运行的特性实现某种分布式程序。这种程序的计算任务可以进行人为的分割后再把细分的任务分派给分布在多个服务器上的actor上去运算。这些服务器都处于同一集群环境里,它们都是akka-cluster中的节点(node)。akka-cluster的节点数量只需要通过系统配置方式按照计算能力要求随意增减,在集群上运行的分布式程序可以在不修改软件的情况下自动调整actors在各节点上的分布,重新平衡程序运算负载,不受任何影响继续运行。

   在前面akka系列的博客里也介绍了一些akka-cluster的情况,最近在“集群环境内编程模式(PICE)”的专题系列里又讨论了如何在集群环境里通过protobuf-gRPC把多个不同类型的数据库服务集成起来。因为集群中的数据库服务是用akka-stream连接的,我们把程序与数据一起作为stream的流元素用Flow发送给相应的数据库服务进行处理。这时一个想法就产生了:当数据库服务接收了一项服务要求后(假设数据处理多是耗时、耗资源的任务)可以对任务进行分割,然后把这些小任务再分发给所属集群内的多个节点上去运算,再按计算要求收集,汇总结果。那么如果能按用户数量和运算任务的规模来任意添减服务器数量就能满足任何规模的运算需求了。最重要的是这种集群节点规模调整必须是某种配置方式,即通过修改配置文件,但不需要修改软件代码。这些需要恰恰又是akka-cluster的特殊能力。所以决定开个akka-cluster的专题系列来具体讨论集群环境下的分布式软件开发模式。

akka-cluster提供的以下几种方式比较符合我们的要求:

1、distributed pub/sub - 分布式发布订阅模式

2、cluster-singleton - 单例actor模式

3、cluster-load-balancing - 集群负载均衡模式

4、cluster-sharding - 集群分片模式

在这个系列下面的博客里我们会逐个模式讨论它们在具体编程的使用细节。但首先探讨一下如何通过配置文件来定义akka-cluster节点,实现集群规模调整。

集群节点(cluster node)的生命周期会经历以下阶段:

Joining->Up,Leaving->Exiting,Exiting->Removed,Unreachable->Up,Unreachable->Down,Down->Removed

下面我们就用运行在不同集群节点的actor,通过订阅系统的集群成员状态转换消息来观察每个节点的状态转变:

class EventListener extends Actor with ActorLogging {
  import EventListner._

  val cluster = Cluster(context.system)

  override def preStart(): Unit = {
    cluster.subscribe(subscriber = self,initialStateMode = InitialStateAsEvents
    ,classOf[MemberEvent],classOf[UnreachableMember])
    super.preStart()
  }
  override def postStop(): Unit = {
    cluster.unsubscribe(self)
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info("{} is JOINING...", member.address)
    case MemberUp(member) =>
      log.info("{} is UP!", member.address)
    case MemberWeaklyUp(member) =>
      log.info("{} is weakly UP!", member.address)
    case MemberLeft(member) =>
      log.info("{} is LEAVING...", member.address)
    case MemberExited(member) =>
      log.info("{} is EXITING...", member.address)
    case MemberRemoved(member, prevStatus) =>
      log.info("{} is REMOVED! from state {}", member.address, prevStatus)
    case UnreachableMember(member) =>
      log.info("{} is UNREACHABLE!", member.address)
    case ReachableMember(member) =>
      log.info("{} is REACHABLE!", member.address)
    case UnreachableDataCenter(datacenter) =>
      log.info("Data Center {} is UNREACHABLE!", datacenter)
    case ReachableDataCenter(datacenter) =>
      log.info("Data Center {} is REACHABLE!", datacenter)
    case Leave =>
      cluster.leave(cluster.selfAddress)
      log.info("{} is asked to leave cluster.",cluster.selfAddress)
    case Down =>
      cluster.down(cluster.selfAddress)
      log.info("{} is asked to shutdown cluster.",cluster.selfAddress)
  }

}

Leave和Down是自定义消息类型:

object EventListner {
  trait Messages {}
  case object Leave extends Messages
  case object Down extends Messages
  def props = Props(new EventListener)
...
}

akka-cluster最基本的配置文件内容如下:

akka {
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "localhost"
      port = 2551
    }
  }
  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@localhost:2551"]
  }
}

实际上hostname,port,seed-nodes这些参数都可以在程序里配置,如果有需要,我们只要在配置文件里注明这是一个集群模式的程序就行了,其它参数放到程序里去定义:

akka {
  actor {
    provider = "cluster"
  }
}

然后我们可以在程序里配置缺失的集群参数:

object EventListner {
  trait Messages {}
  case object Leave extends Messages
  case object Down extends Messages
  def props = Props(new EventListener)

def create(host: String = "localhost", port: Int = 0, seednode: String = "") = {
    var config = ConfigFactory.parseString(s"akka.remote.netty.tcp.hostname=${host}")
                 .withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}"))
    if (seednode.length > 0) {
      val strConfig = "akka.cluster.seed-nodes=[\"" + seednode + "\"]"
      val configSeed = ConfigFactory.parseString(strConfig)
      config = config.withFallback(configSeed)
    }
    config = config.withFallback(ConfigFactory.load("akka-cluster-config"))
    val clusterSystem = ActorSystem(name="ClusterSystem",config=config)
    clusterSystem.actorOf(Props[EventListener])
  }

}

在create函数里ConfigFactory.parseString可以把一个字符串转换成集群配置参数,多个参数可以用withFallback来补充定义。

以下是EventListener的测试程序:

import EventListner._
object EventDemo extends App {

  val listner1 = EventListner.create(port = 2551)  //seed node
  scala.io.StdIn.readLine()
  val listner2 = EventListner.create()    //port=0 random port
  scala.io.StdIn.readLine()
  val listner3 = EventListner.create()    //port=0 random port

  scala.io.StdIn.readLine()

  listner3 ! Leave
  scala.io.StdIn.readLine()

  listner2 ! Down
  scala.io.StdIn.readLine()

  listner1 ! Leave
  scala.io.StdIn.readLine()

}

第一个运行的必须是seednode,因为每个节点在启动时都需要连接seednode。下面是每个阶段的输出结果:

[INFO] [10/22/2018 18:50:40.888] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Started up successfully
[INFO] [10/22/2018 18:50:40.931] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Node [akka.tcp://ClusterSystem@localhost:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[INFO] [10/22/2018 18:50:40.933] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Cluster Node [akka.tcp://ClusterSystem@localhost:2551] dc [default] is the new leader
[INFO] [10/22/2018 18:50:40.943] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:2551] to [Up]
[INFO] [10/22/2018 18:50:41.037] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:2551 is UP!
[INFO] [10/22/2018 18:50:47.363] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is JOINING...
[INFO] [10/22/2018 18:50:47.930] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51679] to [Up]
[INFO] [10/22/2018 18:50:47.931] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51679 is UP!
[INFO] [10/22/2018 18:50:48.109] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is UP!
[INFO] [10/22/2018 18:50:53.765] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is JOINING...
[INFO] [10/22/2018 18:50:53.930] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is JOINING...
[INFO] [10/22/2018 18:50:54.929] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51681] to [Up]
[INFO] [10/22/2018 18:50:54.929] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is UP!
[INFO] [10/22/2018 18:52:00.806] [ClusterSystem-akka.actor.default-dispatcher-32] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is asked to leave cluster.
[INFO] [10/22/2018 18:52:00.807] [ClusterSystem-akka.actor.default-dispatcher-28] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Marked address [akka.tcp://ClusterSystem@localhost:51681] as [Leaving]
[INFO] [10/22/2018 18:52:00.808] [ClusterSystem-akka.actor.default-dispatcher-42] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:00.809] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is asked to shutdown cluster.
[INFO] [10/22/2018 18:52:00.809] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Marking node [akka.tcp://ClusterSystem@localhost:51679] as [Down]
[INFO] [10/22/2018 18:52:00.810] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:00.933] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:01.101] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Shutting down myself
[INFO] [10/22/2018 18:52:01.102] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Shutting down...
[INFO] [10/22/2018 18:52:01.104] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Successfully shut down
[INFO] [10/22/2018 18:52:01.110] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:2551 is REMOVED! from state Up
[INFO] [10/22/2018 18:52:01.110] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is REMOVED! from state Down
[INFO] [10/22/2018 18:52:01.111] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is REMOVED! from state Leaving
[INFO] [10/22/2018 18:52:02.925] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51681] to [Exiting]
[INFO] [10/22/2018 18:52:02.926] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is EXITING...
[INFO] [10/22/2018 18:52:02.927] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Exiting, starting coordinated shutdown
[INFO] [10/22/2018 18:52:02.927] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is EXITING...
[INFO] [10/22/2018 18:52:02.934] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Exiting completed

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大魏分享(微信公众号:david-share)

六种开发环境部署大全:基于Openshift

前言 本文包含在Openshift上部署六种开发环境的步骤,分别是: OpenShift for Fuse Developers Eclipse Vert.x ...

1.6K6
来自专栏大数据挖掘DT机器学习

Python爬虫:抓取手机APP的传输数据

原文 http://my.oschina.net/jhao104/blog/606922 大多数APP里面返回的是json格式数据,或者一堆加密过的数据 。这里...

3924
来自专栏nnngu

经典Java面试题收集(二)

经典的Java面试题(第二部分),这部分主要是与Java Web和Web Service相关的面试题。 96、阐述Servlet和CGI的区别? 答:Servl...

5078
来自专栏JAVA高级架构

2017 年你不能错过的 Java 类库

各位读者好, 这篇文章是在我看过 Andres Almiray 的一篇介绍文后,整理出来的。 因为内容非常好,我便将它整理成参考列表分享给大家, 同时附上各个库...

2888
来自专栏编舟记

R3 Corda 和 springboot 集成

因为Corda内置的Corda Webserver已经被标记成弃用了,一般不再提供支持;再者,springboot的生态明显占优。

2362
来自专栏芋道源码1024

2018 年你不能错过的 Java 类库

因为内容非常好,我便将它整理成参考列表分享给大家, 同时附上各个库的特性简介和示例。

1332
来自专栏技术博文

Memcache

Memcached概念:     Memcached是一个免费开源的,高性能的,具有分布式对象的缓存系统,它可以用来保存一些经常存取的对象或数据,保存的数据像一...

4014
来自专栏IT技术精选文摘

Spring Boot使用过滤器和拦截器分别实现REST接口简易安全认证

7102
来自专栏携程技术中心

干货 | JAVA反序列化安全实例解析

作者简介 迟长峰,携程技术中心信息安全部应用安全工程师。 什么是序列化 序列化 (Serialization)是指将对象的状态信息转换为可以存储或传输的形式的过...

33910
来自专栏老码农专栏

原 荐 RESTFul 服务测试自动化的艺术

1573

扫码关注云+社区

领取腾讯云代金券