Akka(5): ConsistentHashing Router - 可选定Routee的任务分配模式

    上一篇讨论里我们介绍了几种任务分配(Routing)模式。Akka提供的几种现成智能化Routing模式大多数是通过对用户屏蔽具体的运算Routee选择方式来简化Router使用,提高智能程度,所以我们提到Router的运算是一种无序的运算,消息之间绝对不容许任何形式的依赖,因为向Router发送的消息可能在任何Routee上运算。但是,如果我们能够把运算任务按照任务的类型分配给专门负责处理此等类型任务的Routee,那么我们就可以充分利用Routing模式所带来的运算拓展能力来提高整体运算效率。Akka的ConsistentHashingRouter就是为了满足这样的需求而提供的。ConsistentHashingRouter是通过消息的特征来分辨消息类型,然后自动构建和管理处理各种类型消息的Routees。当然,这就要求系统的消息必须具备预先设定的特征,使ConsistentHashingRouter可以正确分辨并分配给指定的Routee去运算。如果我们确定只有一个Routee负责处理一种类型消息的话,甚至可以在这个Routee中维护某种状态。我们可以设计一个场景来示范ConsistentHashingRouter的应用:模拟一个多货币的存钱盒,分n次随意从盒里取出钱币然后统计各种货币的总额。这个场景中的特征很明显:就是货币种类了,我们把抽出的货币按币种、金额合成消息发给ConsistentHashingRouter。例子里的Routee应该是按照币种由Router自动构建的,维护各种货币当前总额作为内部状态。向ConsistentHashingRouter发送的消息被分配给相应币种的Routee去登记更新货币当前总额。这个统计金额的Routee可以如下定义:

import akka.actor._

val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")

object MoneyCounter {
  sealed trait Counting
  case class OneHand(cur: String, amt: Double) extends Counting
  case class ReportTotal(cur: String) extends Counting
}
class MoneyCounter extends Actor with ActorLogging {
  import MoneyCounter._
  var currency: String = "RMB"
  var amount: Double = 0

  override def receive: Receive = {
    case OneHand(cur,amt) => 
      currency = cur
      amount += amt
      log.info(s"${self.path.name} received one hand of $amt$cur")
    case ReportTotal(_) => 
      log.info(s"${self.path.name} has a total of $amount$currency")
  }
}

MoneyCounter支持两项功能:一是统计某种货币收到的总额,二是按指令汇报当前总额。我们在前一篇讨论里了解到如果MoneyCounter是Routee类型,那它们应该被视为具相同功能的Actor。而且用户无法分辨或者直接面对某个特定的Routee。任何MoneyCounter都可以收到一手任何货币,不同的货币金额相加结果是错误的。所以我们要用Akka提供的ConsistentHashingRouter来解决这个问题。ConsistentHashingRouter的主要特点是能够分辨消息类型,然后按照消息类型对应到选定的Routee。在我们上面的例子里每个Routee负责一种货币,这样就可以保证每个Routee里的金额总数都是正确的了。ConsistentHashingRouter有三种分辨消息的方法:

1、定义ConsistentHashingRouter的hashMapping函数:这是个PartialFunction[Any,Any],如下:

object HashingRouter extends App {
 import MoneyCounter._

  val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")

  val routerSystem = ActorSystem("routerSystem")

  def mcHashMapping: PartialFunction[Any,Any] = {
    case OneHand(cur,_) => cur
    case ReportTotal(cur) => cur
  }

  val router = routerSystem.actorOf(ConsistentHashingPool(
    nrOfInstances = 5,hashMapping = mcHashMapping,virtualNodesFactor = 2)
  .props(MoneyCounter.props),name = "moneyCounter" )


  router ! OneHand("RMB",10.00)
  router ! OneHand("USD",10.00)
  router ! OneHand("HKD",10.00)
  router ! OneHand("RMB",10.00)
  router ! OneHand("CHF",10.00)

  router ! ReportTotal("RMB")
  router ! ReportTotal("USD")

  scala.io.StdIn.readLine()

  routerSystem.terminate()
}

我们在定义router时直接把mcHashingMapping传到ConsistentHashingPool的构建器里就行了。特别要注意nrOfInstances,这个参数必须比消息类型的数量大才行,否则Router会错误引导消息。测试运算结果显示如下:

INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-3] [akka://routerSystem/user/moneyCounter/$b] $b received one hand of 10.0USD
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$d] $d received one hand of 10.0CHF
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-2] [akka://routerSystem/user/moneyCounter/$a] $a received one hand of 10.0HKD
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:20:09.337] [routerSystem-akka.actor.default-dispatcher-2] [akka://routerSystem/user/moneyCounter/$b] $b has a total of 10.0USD
[INFO] [06/05/2017 15:20:09.337] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e has a total of 20.0RMB

Router自动调用了e,b,d,a4个Routees,并且能把消息引导到正确的Routee。

2、可以让消息继承ConsistentHashable,如此我们要在消息里实现函数constentHashKey, 如下:

object MoneyCounter {
  sealed class Counting(cur: String) extends ConsistentHashable {
    override def consistentHashKey: Any = cur
  }
  case class OneHand(cur: String, amt: Double) extends Counting(cur)
  case class ReportTotal(cur: String) extends Counting(cur)
  def props = Props(new MoneyCounter)
}

现在消息都是ConsistentHashable类型的了。构建新的Router来测试效果:

  val router = routerSystem.actorOf(ConsistentHashingPool(
    nrOfInstances = 5, virtualNodesFactor = 2).props(
    MoneyCounter.props),name = "moneyCounter")


  router ! OneHand("RMB",10.00)
  router ! OneHand("USD",10.00)
  router ! OneHand("HKD",10.00)
  router ! OneHand("RMB",10.00)
  router ! OneHand("CHF",10.00)

  router ! ReportTotal("RMB")
  router ! ReportTotal("USD")

运算结果同样正确:

[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-5] [akka://routerSystem/user/moneyCounter/$b] $b received one hand of 10.0USD
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$a] $a received one hand of 10.0HKD
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-4] [akka://routerSystem/user/moneyCounter/$d] $d received one hand of 10.0CHF
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:36:29.749] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e has a total of 20.0RMB
[INFO] [06/05/2017 15:36:29.749] [routerSystem-akka.actor.default-dispatcher-4] [akka://routerSystem/user/moneyCounter/$b] $b has a total of 10.0USD

3、直接把消息包在ConsistentHashableEnvelope里:

  router ! ConsistentHashableEnvelope(message = OneHand("RMB",23.00),hashKey = "RMB")

这种方式需要用户手工指定Routee,如果用这种方式,我们其实不必用Router,直接把消息传给专职的Actor就行了。

看来还是第二种方法比较合适。因为比起第一种方法多了类型安全和与Router的松散耦合。下面就是一个用第二种方法的完整示范源代码:

import akka.actor._
import akka.routing.ConsistentHashingRouter.{ConsistentHashMapping, ConsistentHashable, ConsistentHashableEnvelope}
import akka.routing._


object MoneyCounter {
  sealed class Counting(cur: String) extends ConsistentHashable {
    override def consistentHashKey: Any = cur
  }
  case class OneHand(cur: String, amt: Double) extends Counting(cur)
  case class ReportTotal(cur: String) extends Counting(cur)
  def props = Props(new MoneyCounter)
}
class MoneyCounter extends Actor with ActorLogging {
  import MoneyCounter._
  var currency: String = "RMB"
  var amount: Double = 0

  override def receive: Receive = {
    case OneHand(cur,amt) =>
      currency = cur
      amount += amt
      log.info(s"${self.path.name} received one hand of $amt$cur")
    case ReportTotal(_) =>
      log.info(s"${self.path.name} has a total of $amount$currency")
  }
}
object HashingRouter extends App {
 import MoneyCounter._
  import scala.util.Random

  val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")

  val routerSystem = ActorSystem("routerSystem")

  val router = routerSystem.actorOf(ConsistentHashingPool(
    nrOfInstances = currencies.size+1, virtualNodesFactor = 2).props(
    MoneyCounter.props),name = "moneyCounter")

  (1 to 20).toList foreach (_ => router ! OneHand(
    currencies(Random.nextInt(currencies.size-1))
  ,Random.nextInt(100) * 1.00))

  currencies foreach (c => router ! ReportTotal(c))

  scala.io.StdIn.readLine()

  routerSystem.terminate()
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏猿人谷

C++命名规则

如果想要有效的管理一个稍微复杂一点的体系,针对其中事物的一套统一、带层次结构、清晰明了的命名准则就是必不可少而且非常好用的工具。 活跃在生物学、化学、军队、监狱...

1906
来自专栏java一日一条

在Java 8下更好地利用枚举

在我们的云使用分析API中,返回了格式化过的分析数据(这里指生成分析图)。最近,我们添加了一个特性,允许用户选择时间段(最开始只可以按天选择)。问题是,代码中每...

531
来自专栏大内老A

事件(Event),绝大多数内存泄漏(Memory Leak)的元凶[上篇]

最近这两天一直在忙着为一个项目检查内存泄漏(Memory Leak)的问题,对相关的知识进行了一下简单的学习和探索,其间也有了一些粗浅的经验积累,今天特意写一篇...

2196
来自专栏编程

Ruby和Python 分析器是如何工作的?

你好! 我作为一名编写Ruby profiler的先驱,我想对现有的Ruby和Python profiler如何工作进行一次调查。 这也有助于回答很多人的问题:...

1669
来自专栏JadePeng的技术博客

Angular快速学习笔记(4) -- Observable与RxJS

2202
来自专栏京东技术

Android Architecture Paging Library详解 | Google I/O大会上的最新发布

Android高级工程师,6年以上开发经验,有丰富的代码重构和架构设计经验,负责京东商城我的京东的开发工作,热衷于学习和研究新技术。

1772
来自专栏何俊林

Android Multimedia框架总结(九)Stagefright框架之数据处理及到OMXCodec过程

不知不觉到第九篇了,感觉还有好多好多没有写,路漫漫其修远兮 ,吾将上下而求索。先说福利吧,此前在关于我, ? 曾说过,不定期搞活动,vip,书啥的,都可以有,...

1916
来自专栏华章科技

Ruby 和 Python 分析器是如何工作的?

你好! 我作为一名编写Ruby profiler的先驱,我想对现有的Ruby和Python profiler如何工作进行一次调查。 这也有助于回答很多人的问题:...

922
来自专栏三流程序员的挣扎

Android 优化——内存优化

在 GC 的过程中,其它在工作的线程会暂停,包括负责绘制的 UI 线程,并且在不同区域的内存释放速度也有一定的差异,但不管在哪个区域,都要到这次 GC 内存回收...

521
来自专栏非著名程序员

Android进阶:Android内存管理之道

对于移动应用开发,不管是Android还是IOS,内存都是永远的痛。但是合理的编写代码,会避免OOM的出现。 相信一步步走过来的Android从业者,每个人都...

1959

扫码关注云+社区