Akka(9): 分布式运算:Remoting-远程构建式

   上篇我们讨论了Akka-Remoting。我们说Akka-Remoting是一种点对点的通讯方式,能使两个不同JVM上Akka-ActorSystem上的两个Actor之间可以相互沟通。Akka-Remoting还没有实现完全的Actor位置透明(location transparency),因为一个Actor还必须在获得对方Actor确切地址信息后才能启动与之沟通过程。Akka-Remoting支持“远程查找”和“远程构建”两种沟通方式。由于篇幅所限,我们只介绍了“远程查找”。在这一篇里我们将会讨论“远程构建”方式。

同样,我们先通过项目结构来分析:

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    name := "remoteCreateDemo"
  ).aggregate(calculator,remote).dependsOn(calculator)

lazy val calculator = (project in file("calculator"))
  .settings(commonSettings)
  .settings(
    name := "calculator"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    name := "remoteSystem"
  ).aggregate(calculator).dependsOn(calculator)

远程构建的过程大致是这样的:由local通知remote启动构建Actor;remote从本地库中查找Actor的类定义(class)并把它载入内存。由于驱动、使用远程Actor是在local进行的,所以local,remote项目还必须共享Calculator,包括Calculator的功能消息。这项要求我们在.sbt中用aggregate(calculator)来协同编译。

我们把Calculator的监管supervisor也包括在这个源码文件里。现在这个calculator是个包括监管、功能、消息的完整项目了。Calculator源代码如下:

package remoteCreation.calculator

import akka.actor._
import scala.concurrent.duration._

object Calcultor {
  sealed trait MathOps
  case class Num(dnum: Double) extends MathOps
  case class Add(dnum: Double) extends MathOps
  case class Sub(dnum: Double) extends MathOps
  case class Mul(dnum: Double) extends MathOps
  case class Div(dnum: Double) extends MathOps

  sealed trait CalcOps
  case object Clear extends CalcOps
  case object GetResult extends CalcOps

  def props = Props(new Calcultor)
  def supervisorProps = Props(new SupervisorActor)
}

class Calcultor extends Actor with ActorLogging {
  import Calcultor._

  var result: Double = 0.0   //internal state

  override def receive: Receive = {
    case Num(d) => result = d
    case Add(d) => result += d
    case Sub(d) => result -= d
    case Mul(d) => result *= d
    case Div(d) =>
      val _ = result.toInt / d.toInt   //yield ArithmeticException
      result /= d
    case Clear => result = 0.0
    case GetResult =>
      sender() ! s"Result of calculation is: $result"
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class SupervisorActor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(Calcultor.props,"calculator")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)
  }

}

与上一个例子的”远程查找式“相同,remote需要为Remoting公开一个端口。我们可以照搬.conf配置文件内容:remote/src/main/resources/application.conf

akka {
  actor {
    provider = remote
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

由于远程构建和使用是在local上进行的,在remote上我们只需要启动ActorSystem就行了:

import com.typesafe.config.ConfigFactory
import akka.actor._

object CalculatorRunner extends App {
  val remoteSystem = ActorSystem("remoteSystem",ConfigFactory.load("application"))
  println("Remote system started.")

  scala.io.StdIn.readLine()
  remoteSystem.terminate()

}

Calculator的构建是在localSystem上启动的,我们需要在配置文件中描述远程构建标的(还是未能实现位置透明):local/src/main/resources/application.conf 

akka {
  actor {
    provider = remote,
    deployment {
      "/calculator" {
        remote = "akka.tcp://remoteSystem@127.0.0.1:2552"
      }
    }
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      port=2554
    }
  }
}

注意:上面这个/calculator设置实际上指的是SupervisorActor。

现在我们可以在local上开始构建calculator,然后使用它来运算了:

import akka.actor._
import remoteCreation.calculator.Calcultor._
import scala.concurrent.duration._
import akka.pattern._

object RemotingCreate extends App {
  val localSystem = ActorSystem("localSystem")
  val calcActor = localSystem.actorOf(props,
    name = "calculator")   //created SupervisorActor

  import localSystem.dispatcher

  calcActor ! Clear
  calcActor ! Num(13.0)
  calcActor ! Mul(1.5)

  implicit val timeout = akka.util.Timeout(1 second)

  ((calcActor ? GetResult).mapTo[String]) foreach println
  scala.io.StdIn.readLine()

  calcActor ! Div(0.0)
  calcActor ! Div(1.5)
  calcActor ! Add(100.0)
  ((calcActor ? GetResult).mapTo[String]) foreach println

  scala.io.StdIn.readLine()
  localSystem.terminate()

}

从代码上看构建calculator(SupervisorActor)过程与普通的Actor构建没分别,所有细节都放在配置文件里了。但是,要注意actorOf的name必须与配置文档中的设置匹配。

试运行结果与上一个例子相同。值得注意的是实际远程构建的是一个SupervisorActor。Calculator的构建是SupervisorActor构建的其中一部分。从运算结果看:这个SupervisorActor也实现了它的功能。

下面是这次示范的源代码:

local/build.sbt

azy val commonSettings = seq (
  name := "RemoteCreateDemo",
  version := "1.0",
  scalaVersion := "2.11.8",
  libraryDependencies := Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.5.2",
    "com.typesafe.akka" %% "akka-remote" % "2.5.2"
  )
)


lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    name := "remoteCreateDemo"
  ).aggregate(calculator).dependsOn(calculator)

lazy val calculator = (project in file("calculator"))
  .settings(commonSettings)
  .settings(
    name := "calculator"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    name := "remoteSystem"
  ).aggregate(calculator).dependsOn(calculator)

calculator/calculator.scala

package remoteCreation.calculator

import akka.actor._
import scala.concurrent.duration._

object Calcultor {
  sealed trait MathOps
  case class Num(dnum: Double) extends MathOps
  case class Add(dnum: Double) extends MathOps
  case class Sub(dnum: Double) extends MathOps
  case class Mul(dnum: Double) extends MathOps
  case class Div(dnum: Double) extends MathOps

  sealed trait CalcOps
  case object Clear extends CalcOps
  case object GetResult extends CalcOps

  def props = Props(new Calcultor)
  def supervisorProps = Props(new SupervisorActor)
}

class Calcultor extends Actor with ActorLogging {
  import Calcultor._

  var result: Double = 0.0   //internal state

  override def receive: Receive = {
    case Num(d) => result = d
    case Add(d) => result += d
    case Sub(d) => result -= d
    case Mul(d) => result *= d
    case Div(d) =>
      val _ = result.toInt / d.toInt   //yield ArithmeticException
      result /= d
    case Clear => result = 0.0
    case GetResult =>
      sender() ! s"Result of calculation is: $result"
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class SupervisorActor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(Calcultor.props,"calculator")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)
  }

}

remote/src/main/resources/application.conf

akka {
  actor {
    provider = remote
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

remote/CalculatorRunner.scala

package remoteCreation.remote
import com.typesafe.config.ConfigFactory
import akka.actor._

object CalculatorRunner extends App {
  val remoteSystem = ActorSystem("remoteSystem",ConfigFactory.load("application"))
  println("Remote system started.")

  scala.io.StdIn.readLine()
  remoteSystem.terminate()

}

local/src/main/resources/application.conf

akka {
  actor {
    provider = remote,
    deployment {
      "/calculator" {
        remote = "akka.tcp://remoteSystem@127.0.0.1:2552"
      }
    }
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      port=2554
    }
  }
}

local/RemotingCreation.scala

import akka.actor._
import remoteCreation.calculator.Calcultor._
import scala.concurrent.duration._
import akka.pattern._

object RemotingCreate extends App {
  val localSystem = ActorSystem("localSystem")
  val calcActor = localSystem.actorOf(props,
    name = "calculator")  //created SupervisorActor

  import localSystem.dispatcher

  calcActor ! Clear
  calcActor ! Num(13.0)
  calcActor ! Mul(1.5)

  implicit val timeout = akka.util.Timeout(1 second)

  ((calcActor ? GetResult).mapTo[String]) foreach println
  scala.io.StdIn.readLine()

  calcActor ! Div(0.0)
  calcActor ! Div(1.5)
  calcActor ! Add(100.0)
  ((calcActor ? GetResult).mapTo[String]) foreach println

  scala.io.StdIn.readLine()
  localSystem.terminate()

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏杨建荣的学习笔记

一次数据库无法登陆的问题及排查 (r3笔记第99天)

今天在中午的时候,收到客户的邮件,说数据库访问有问题了,赶紧连到生产环境查看。 结果在尝试登录的时候报了listener的错误,感觉像是listener停了一样...

2965
来自专栏杨建荣的学习笔记

物化视图刷新结合ADG的尝试 (r8笔记第47天)

最近开发提了几个需求,需要把几个线上的分布式的表整合到统计系统中方便统计,看来分久必合,合久必分,当时的分开考虑,肯定没有想到以后会整合起来,这 可对我们是一些...

35010
来自专栏跟着阿笨一起玩NET

plsql developer的一些使用

512
来自专栏青枫的专栏

day28_Struts2综合案例

a、拷贝必要的jar包(图中黄色框框) 和 与数据库操作有关的jar包与配置文件(图中绿色框框)

341
来自专栏大数据学习笔记

Hadoop基础教程-第10章 HBase:Hadoop数据库(10.7 HBase 批量导入)

第10章 HBase:Hadoop数据库 10.7 HBase 批量导入 10.7.1 批量导入数据的方法 向HBase表中导入一条数据可以使用HBase Sh...

1685
来自专栏Bug生活2048

Spring Boot学习笔记(五)整合MyBatis实现数据库访问

这里主要依赖两个,一个是连接MySql的`mysql-connector-java`,还一个是SpringBoot整合MyBatis的核心依赖`mybatis-...

562
来自专栏杨建荣的学习笔记

一次数据库宕机问题的分析(r6笔记第5天)

今天来到办公室,发现有一台服务器中的数据库实例停掉了。这种情况真是意料之外,尤其是我还不是很熟悉这台机器的服务。 赶紧查看数据库日志,可以看到数据库在昨晚停掉了...

3495
来自专栏乐沙弥的世界

Oracle 实例恢复

Oracle实例失败多为实例非一致性关闭所致,通常称为崩溃(crash)。实例失败的结果等同于shutdown abort。

805
来自专栏Hadoop实操

如何使用Sentry通过视图实现Impala的行级授权

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github:https://github.com/fayson/cdhproje...

4049
来自专栏杨建荣的学习笔记

分分钟搭建MySQL Group Replication测试环境(r11笔记第82天)

最近看了下MySQL 5.7中的闪亮特性Group Replication,也花了不少做了些测试,发现有些方面的表现确实不赖。当然要模拟这么一套环境还是需...

3397

扫码关注云+社区