Akka(4): Routers - 智能任务分配

    Actor模式最大的优点就是每个Actor都是一个独立的任务运算器。这种模式让我们很方便地把一项大型的任务分割成若干细小任务然后分配给不同的Actor去完成。优点是在设计时可以专注实现每个Actor的功能,在实际运算时由于每个Actor都在独立的线程里运行,能充分利用多核CPU的优势实现相似并行运算的效率。我们同样可以把一个独立的功能按不同的输入分配给多个Actor去完成以达到同样的效率提高目的,这就是Akka的Routing模式了。Routing模式的特点是所有运算Actor的运算逻辑都是相同的,分别对不同的输入进行相同的运算。不过我们应该知道运算结果的顺序是无法预计的,毕竟Actor模式是典型的无序运算。Routing模式由Router和Routee组成:Routee是负责具体运算的Actor(因为运算逻辑必须在Actor的receive里实现),Router的主要功能是把外界发来的运算指令按照某种指定的方式分配给Routee去运算。可以说Router不是标准的Actor,因为它不需要实现任何其它的功能,基本功能是预设嵌入的。Router的信箱直接代表了任务分配逻辑,与标准Actor逐个运算信箱中消息相比,能大大提高任务分配效率。Akka自带许多现成的任务分配模式,以不同的算法来满足不同的任务分配要求。这些算法的配置可以在配置文件或者代码中定义。Router又可分Pool和Group两种模式:在Router-Pool模式中Router负责构建所有的Routee。如此所有Routee都是Router的直属子级Actor,可以实现Router对Routees的直接监管。由于这种直接的监管关系,Router-Pool又可以按运算负载自动增减Routee,能更有效地分配利用计算资源。Router-Group模式中的Routees由外界其它Actor产生,特点是能实现灵活的Routee构建和监控,可以用不同的监管策略来管理一个Router下的Routees,比如可以使用BackoffSupervisor。从另一方面来讲,Router-Group的缺点是Routees的构建和管理复杂化了,而且往往需要人为干预。

下面我们先做个示范:

import akka.actor._
import akka.routing._
import scala.annotation.tailrec

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int)
  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._

  override def receive: Receive = {
    case FibonacciNumber(nbr) =>
      val answer = fibonacci(nbr)
      log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }
}
object RouterDemo extends App {
  import FibonacciRoutee._
  val routingSystem = ActorSystem("routingSystem")
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")

  router ! FibonacciNumber(10)
  router ! FibonacciNumber(13)
  router ! FibonacciNumber(15)
  router ! FibonacciNumber(17)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}

在这个例子里我们用3个Routees来根据指示计算Fibonacci。FibonacciRoutee只有一项功能:就是按输入计算Fibonacci数。我们看到,Router构建过程十分简单。在我们的例子里只需要读出配置文件内容就可以了。balance-pool-router是配置文件里的一个定义项:

akka {
  prio-dispatcher {
    mailbox-type = "PriorityMailbox"
  }
  actor {
    deployment {
      /balance-pool-router {
        router = balancing-pool
        nr-of-instances = 3
        pool-dispatcher {
          executor = "fork-join-executor"
          # Configuration for the fork join pool
          fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 3
            # Parallelism (threads) ... ceil(available processors * factor)
            parallelism-factor = 2.0
            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 3
          }
          # Throughput defines the maximum number of messages to be
          # processed per actor before the thread jumps to the next actor.
          # Set to 1 for as fair as possible.
          throughput = 1
        }
      }
    }
  }

}

Routing模式设置的完整标识是akka.actor.deployment{/balance-pool-router}。完成构建router后我们直接向router发送计算指令,运算结果如下:

[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-5] [akka://routingSystem/user/balance-pool-router/$b] $b's answer: Fibonacci(13)=233
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-7] [akka://routingSystem/user/balance-pool-router/$a] $a's answer: Fibonacci(10)=55
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c's answer: Fibonacci(15)=610
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c's answer: Fibonacci(17)=1597

我们看到,router按配置自动构建了3个FibonacciRoutee。Routee的构建过程是无法人工干预的。向router发送的计算指令被分配给b,a,c,c去运算了。从显示顺序可以证明每个参与的Actor占用运算时间不同,产生了无序的运算结果。

下面我们在Routee里加一个延迟效应。这样运算结果显示会更自然些:

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
      }
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
    super.postRestart(reason)
  }

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")
    super.postStop()
  }

}

因为在Actor内部不能使用Thread.sleep,所以我们用了个scheduleOnce在延迟时间后向自己发送一个唤醒消息。注意,scheduleOnce是无阻塞non-blocking代码,调用后程序不会停留等待计划动作。在上面修改后的代码里增加了监管策略SupervisorStrategy的使用测试。Router的默认监管策略是Esculate,即把某个Routee发生的异常提交给Router的直属父级处理。如果Router直属父级对Routee异常的处理方式是重启的话,那么首先重启Router,然后是作为直属子级的所有Routees都会被重启,结果并不是我们想要的。所以必须人为的设定Router监管策略。由于Router的SupervisorStrategy无法在设置文件中定义,所以这次我们只有用代码方式来设置routing模式了:

object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")
    */
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  }
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    routingDecider.orElse(SupervisorStrategy.defaultDecider)
  )
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher")
      .props(FibonacciRoutee.props)
    ,"balance-pool-router"
  )

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}

注意:我们在FibonacciRoutee的preRestart接口中增加了向自己补发产生异常消息的过程。运算结果显示:虽然出现了多次异常,router重启了f发生异常的Routee,所有消息都得到了处理。

Akka中有些routing模式支持Router-Pool Routee的自动增减。由于BalancingPool不支持此项功能,下面我们就用RoundRobinPool来做个示范。由于需要定义监管策略,只有在代码中设置Resizer了:

 val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)
      .props(FibonacciRoutee.props)
    ,"roundrobin-pool-router"
  )

以上resizer设置为:Routee最少2个,可以自动增加到5个。运行后routingSystem自动增加了两个Routee: c,d。

下面是本次示范的完整源代码:

import akka.actor._
import akka.routing._
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.Random

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer")
      }
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
    super.postRestart(reason)
  }

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")
    super.postStop()
  }

}
object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")
    */
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  }
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    routingDecider.orElse(SupervisorStrategy.defaultDecider)
  )
  /* does not support resizing routees
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher")
      .props(FibonacciRoutee.props)
    ,"balance-pool-router"
  ) */

  val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)
      .props(FibonacciRoutee.props)
    ,"roundrobin-pool-router"
  )

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)
  router ! FibonacciNumber(27,1)
  router ! FibonacciNumber(37,1)
  router ! FibonacciNumber(47,1)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏IT派

使用 Pandas 处理亿级数据

在数据分析领域,最热门的莫过于Python和R语言,此前有一篇文章《别老扯什么Hadoop了,你的数据根本不够大》指出:只有在超过5TB数据量的规模下,Hado...

1004
来自专栏数据库

单机数据库优化

数据库优化有很多可以讲,按照支撑的数据量来分可以分为两个阶段:单机数据库和分库分表,前者一般可以支撑500W或者10G以内的数据,超过这个值则需要考虑分库分表。...

1867
来自专栏进击的程序猿

Linearizability versus Serializability

原文 Linearizability和Serializability是在数据库和分布式系统中重要的两个概念,而且比较容易混淆,这篇文章试着对两个概念的不同进行...

733
来自专栏IT米粉

数据库的使用你可能忽略了这些

数据库的管理是一个非常专业的事情,对数据库的调优、监控一般是由数据库工程师完成,但是开发人员也经常与数据库打交道,即使是简单的增删改查也是有很多窍门,这里,一起...

2765
来自专栏Java技术栈

6 道 BATJ 必考的 Java 面试题

请对比 Exception 和 Error,另外,运行时异常与一般异常有什么区别?

761
来自专栏美团技术团队

HDFS NameNode内存详解

前言 《HDFS NameNode内存全景》中,我们从NameNode内部数据结构的视角,对它的内存全景及几个关键数据结构进行了简单解读,并结合实际场景介绍了N...

3936
来自专栏人工智能LeadAI

专业工程师看过来~ | RDD、DataFrame和DataSet的细致区别

RDD、DataFrame和DataSet是容易产生混淆的概念,必须对其相互之间对比,才可以知道其中异同。 RDD和DataFrame 上图直观地体现了Data...

3137
来自专栏Java学习网

Java细粒度锁实现的3种方式

Java细粒度锁实现的3种方式 最近在工作上碰见了一些高并发的场景需要加锁来保证业务逻辑的正确性,并且要求加锁后性能不能受到太大的影响。初步的想法是通过数据的时...

2699
来自专栏腾讯大数据的专栏

腾讯大数据之TDW计算引擎解析——Shuffle

腾讯分布式数据仓库(Tencent distributed Data Warehouse, 简称TDW)基于开源软件Hadoop和Hive进行构建,并且根据公司...

2727
来自专栏Golang语言社区

【go语言】Goroutines 并发模式(一)

摘要 这一篇主要是对GO语言中的并发编程模式做一个粗略的归纳总结,文中示例参考自golang conference中的一些演讲和博客,go涉及到的Go语言的语法...

3146

扫码关注云+社区