专栏首页函数式编程语言及工具ScalaPB(1): using protobuf in akka

ScalaPB(1): using protobuf in akka

    任何类型的实例作为消息在两端独立系统的机器之间进行传递时必须经过序列化/反序列化serialize/deserialize处理过程。假设以下场景:在一个网络里有两台连接的服务器,它们分别部署了独立的akka系统。如果我们需要在这两台服务器的akka系统之间进行消息交换的话,所有消息都必须经过序列化/反序列化处理。akka系统对于用户自定义消息类型的默认序列化处理是以java-object serialization 方式进行的。我们上次提过:由于java-object-serialization会把一个java-object的类型信息、实例值、它所包含的其它类型描述信息等都写入序列化的结果里,所以会占据较大空间,传输数据的效率相对就低了。protobuf是binary格式的,基本只包括实例值,所以数据传输效率较高。下面我们就介绍如何在akka系统中使用protobuf序列化。在akka中使用自定义序列化方法包括下面的这些步骤:

1、在.proto文件中对消息类型进行IDL定义

2、用ScalaPB编译IDL文件并产生scala源代码。这些源代码中包括了涉及的消息类型及它们的操作方法

3、在akka程序模块中import产生的classes,然后直接调用这些类型和方法

4、按akka要求编写序列化方法

5、在akka的.conf文件里actor.serializers段落中定义akka的默认serializer

下面的build.sbt文件里描述了程序结构:

lazy val commonSettings = Seq(
  name := "AkkaProtobufDemo",
  version := "1.0",
  scalaVersion := "2.12.6",
)

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
    ),
    name := "akka-protobuf-demo"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
    ),
    name := "remote-system"
  ).dependsOn(local)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

local和remote是两个分开的项目。我们会在这两个项目里分别部署akka系统。注意依赖项中的scalapb.runtime。PB.targets指明了产生源代码的路径。我们还需要在project/scalapb.sbt中指定scalaPB插件: 

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

我们首先在.proto文件里定义消息:

syntax = "proto3";

// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";

package learn.proto;

message Added {

    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message Subtracted {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message AddedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

message SubtractedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

现在我们先在remote项目里定义一个Calculator actor:

package akka.protobuf.calculator
import akka.actor._
import com.typesafe.config.ConfigFactory
import learn.proto.messages._

class Calculator extends Actor with ActorLogging {


  override def receive: Receive = {
    case Added(a,b) =>
      log.info("Calculating %d + %d".format(a, b))
      sender() ! AddedResult(a,b,a+b)
    case Subtracted(a,b) =>
      log.info("Calculating %d - %d".format(a, b))
      sender() ! SubtractedResult(a,b,a-b)
  }

}

object Calculator {
  def props = Props(new Calculator)
}

object CalculatorStarter extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  calcSystem.actorOf(Calculator.props,"calculator")

  println("press any key to end program ...")

  scala.io.StdIn.readLine()

  calcSystem.terminate()

}

运行CalculatorStarter产生一个calculator actor:  akka.tcp://calcSystem@127.0.0.1:2552/user/calculator

下面我们在local项目里从端口2551上部署另一个akka系统,然后调用端口2552上部署akka系统的calculator actor:

package akka.protobuf.calcservice
import akka.actor._
import learn.proto.messages._
import scala.concurrent.duration._

class CalcRunner(path: String) extends Actor with ActorLogging {
  sendIdentifyRequest()

  def sendIdentifyRequest(): Unit = {
    context.actorSelection(path) ! Identify(path)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
  }

  def receive = identifying

  def identifying : Receive = {
    case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
      log.info("Remote calculator started!")
      context.watch(calcRef)
      context.become(calculating(calcRef))
    case ActorIdentity(_,None) =>
      log.info("Remote calculator not found!")
    case ReceiveTimeout =>
      sendIdentifyRequest()
    case s @ _ =>
      log.info(s"Remote calculator not ready. [$s]")
  }

  def calculating(calculator: ActorRef) : Receive = {
    case (op : Added) => calculator ! op
    case (op : Subtracted) => calculator ! op

    case AddedResult(a,b,r)  =>
      log.info(s"$a + $b = $r")
    case SubtractedResult(a,b,r) =>
      log.info(s"$a - $b = $r")

    case Terminated(calculator) =>
      log.info("Remote calculator terminated, restarting ...")
      sendIdentifyRequest()
      context.become(identifying)

    case ReceiveTimeout => //nothing
  }

}

object CalcRunner {
  def props(path: String) = Props(new CalcRunner(path))
}

这个CalcRunner是一个actor,在程序里首先通过向remote项目中的calculator-actor传送Identify消息以取得具体的ActorRef。然后用这个ActorRef与calculator-actor进行交互。这其中Identify是akka预定消息类型,其它消息都是ScalaPB从.proto文件中产生的。下面是local项目的运算程序:

package akka.protobuf.demo
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import akka.protobuf.calcservice._

import scala.concurrent.duration._
import scala.util._
import learn.proto.messages._

object Main extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  val calcPath = "akka.tcp://calcSystem@127.0.0.1:2552/user/calculator"

  val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")

  println("Calculator started ...")

  import calcSystem.dispatcher

  calcSystem.scheduler.schedule(1.second, 1.second) {
    if (Random.nextInt(100) % 2 == 0)
      calculator ! Added(Random.nextInt(100), Random.nextInt(100))
    else
      calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
  }


  scala.io.StdIn.readLine()

}

配置文件application.conf:

akka {

  actor {
    provider = remote
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }

}

先运行remote然后local。注意下面出现的提示:

[akka.serialization.Serialization(akka://calcSystem)] Using the default Java serializer for class [learn.proto.messages.Added] which is not recommended because of performance implications. Use another serializer 

下面是protobuf类型的序列化方法:

package akka.protobuf.serializer

import akka.serialization.SerializerWithStringManifest
import learn.proto.messages._


class ProtobufSerializer extends SerializerWithStringManifest{

  def identifier: Int = 101110116

  override def manifest(o: AnyRef): String = o.getClass.getName
  final val AddedManifest = classOf[Added].getName
  final val SubtractedManifest = classOf[Subtracted].getName
  final val AddedResultManifest = classOf[AddedResult].getName
  final val SubtractedResultManifest = classOf[SubtractedResult].getName


  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {

    println("inside fromBinary"+manifest)

    manifest match {
      case AddedManifest => Added.parseFrom(bytes)
      case SubtractedManifest => Subtracted.parseFrom(bytes)
      case AddedResultManifest => AddedResult.parseFrom(bytes)
      case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
    }
  }

  override def toBinary(o: AnyRef): Array[Byte] = {

    println("inside toBinary ")
    o match {
      case a: Added => a.toByteArray
      case s :Subtracted => s.toByteArray
      case aR: AddedResult => aR.toByteArray
      case sR: SubtractedResult => sR.toByteArray
    }
  }
}

然后我们需要在application.conf中告诉akka系统使用这些方法:

  actor {

    serializers {

      proto = "akka.protobuf.serializer.ProtobufSerializer"
    }

    serialization-bindings {

      "java.io.Serializable" = none
      "com.google.protobuf.Message" = proto
      "learn.proto.messages.Added" = proto
      "learn.proto.messages.AddedResult" = proto
      "learn.proto.messages.Subtracted" = proto
      "learn.proto.messages.SubtractedResult" = proto

    }
  }

现在再重新运行:

[INFO] [04/30/2018 18:41:02.348] [calcSystem-akka.actor.default-dispatcher-2] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] Remote calculator started!
inside toBinary 
inside fromBinarylearn.proto.messages.AddedResult
[INFO] [04/30/2018 18:41:03.234] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] 18 + 38 = 56
inside toBinary 
inside fromBinarylearn.proto.messages.AddedResult
[INFO] [04/30/2018 18:41:04.197] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] 22 + 74 = 96

系统使用了自定义的ProtobufferSerializer。

下面是本次示范的完整源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

lazy val commonSettings = Seq(
  name := "AkkaProtobufDemo",
  version := "1.0",
  scalaVersion := "2.12.6",
)

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
    ),
    name := "akka-protobuf-demo"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
    ),
    name := "remote-system"
  ).dependsOn(local)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

resources/application.conf

akka {
  actor {
    provider = remote
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }
  actor {
    serializers {
      proto = "akka.protobuf.serializer.ProtobufSerializer"
    }
    serialization-bindings {
      "java.io.Serializable" = none
      "com.google.protobuf.Message" = proto
      "learn.proto.messages.Added" = proto
      "learn.proto.messages.AddedResult" = proto
      "learn.proto.messages.Subtracted" = proto
      "learn.proto.messages.SubtractedResult" = proto

    }
  }
}

main/protobuf/messages.proto

syntax = "proto3";

// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";

package learn.proto;

message Added {

    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message Subtracted {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message AddedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

message SubtractedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

remote/Calculator.scala

package akka.protobuf.calculator
import akka.actor._
import com.typesafe.config.ConfigFactory
import learn.proto.messages._

class Calculator extends Actor with ActorLogging {


  override def receive: Receive = {
    case Added(a,b) =>
      log.info("Calculating %d + %d".format(a, b))
      sender() ! AddedResult(a,b,a+b)
    case Subtracted(a,b) =>
      log.info("Calculating %d - %d".format(a, b))
      sender() ! SubtractedResult(a,b,a-b)
  }

}

object Calculator {
  def props = Props(new Calculator)
}

object CalculatorStarter extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  calcSystem.actorOf(Calculator.props,"calculator")

  println("press any key to end program ...")

  scala.io.StdIn.readLine()

  calcSystem.terminate()

}

CalcService.scala

package akka.protobuf.calcservice
import akka.actor._
import learn.proto.messages._
import scala.concurrent.duration._



class CalcRunner(path: String) extends Actor with ActorLogging {
  sendIdentifyRequest()

  def sendIdentifyRequest(): Unit = {
    context.actorSelection(path) ! Identify(path)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
  }

  def receive = identifying

  def identifying : Receive = {
    case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
      log.info("Remote calculator started!")
      context.watch(calcRef)
      context.become(calculating(calcRef))
    case ActorIdentity(_,None) =>
      log.info("Remote calculator not found!")
    case ReceiveTimeout =>
      sendIdentifyRequest()
    case s @ _ =>
      log.info(s"Remote calculator not ready. [$s]")
  }

  def calculating(calculator: ActorRef) : Receive = {
    case (op : Added) => calculator ! op
    case (op : Subtracted) => calculator ! op

    case AddedResult(a,b,r)  =>
      log.info(s"$a + $b = $r")
    case SubtractedResult(a,b,r) =>
      log.info(s"$a - $b = $r")

    case Terminated(calculator) =>
      log.info("Remote calculator terminated, restarting ...")
      sendIdentifyRequest()
      context.become(identifying)

    case ReceiveTimeout => //nothing
  }

}

object CalcRunner {
  def props(path: String) = Props(new CalcRunner(path))
}

Main.scala

package akka.protobuf.demo
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import akka.protobuf.calcservice._

import scala.concurrent.duration._
import scala.util._
import learn.proto.messages._

object Main extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  val calcPath = "akka.tcp://calcSystem@127.0.0.1:2552/user/calculator"

  val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")


  println("Calculator started ...")

  import calcSystem.dispatcher

  calcSystem.scheduler.schedule(1.second, 1.second) {
    if (Random.nextInt(100) % 2 == 0)
      calculator ! Added(Random.nextInt(100), Random.nextInt(100))
    else
      calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
  }


  scala.io.StdIn.readLine()

}

ProtobufferSerializer.scala

package akka.protobuf.serializer

import akka.serialization.SerializerWithStringManifest
import learn.proto.messages._


class ProtobufSerializer extends SerializerWithStringManifest{

  def identifier: Int = 101110116

  override def manifest(o: AnyRef): String = o.getClass.getName
  final val AddedManifest = classOf[Added].getName
  final val SubtractedManifest = classOf[Subtracted].getName
  final val AddedResultManifest = classOf[AddedResult].getName
  final val SubtractedResultManifest = classOf[SubtractedResult].getName


  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {

    println("inside fromBinary"+manifest)

    manifest match {
      case AddedManifest => Added.parseFrom(bytes)
      case SubtractedManifest => Subtracted.parseFrom(bytes)
      case AddedResultManifest => AddedResult.parseFrom(bytes)
      case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
    }
  }

  override def toBinary(o: AnyRef): Array[Byte] = {

    println("inside toBinary ")
    o match {
      case a: Added => a.toByteArray
      case s :Subtracted => s.toByteArray
      case aR: AddedResult => aR.toByteArray
      case sR: SubtractedResult => sR.toByteArray
    }
  }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Akka(13): 分布式运算:Cluster-Sharding-运算的集群分片

        通过上篇关于Cluster-Singleton的介绍,我们了解了Akka为分布式程序提供的编程支持:基于消息驱动的运算模式特别适合分布式程序编程,我们不...

    用户1150956
  • restapi(0)- 平台数据维护,写在前面

    在云计算的推动下,软件系统发展趋于平台化。云平台系统一般都是分布式的集群系统,采用大数据技术。在这方面akka提供了比较完整的开发技术支持。我在上一个系列...

    用户1150956
  • Akka(11): 分布式运算:集群-均衡负载

    在上篇讨论里我们主要介绍了Akka-Cluster的基本原理。同时我们也确认了几个使用Akka-Cluster的重点:首先,Akka-Cluster集群构建与...

    用户1150956
  • 云与性能测试 | 洞见

    近年来,随着云计算技术的发展和各种诸如AWS、GCP、阿里云等云平台的日趋成熟,越来越多的的用户选择把系统搭建在云端,因此云测试的概念随即产生。云测试看字面意思...

    ThoughtWorks
  • 成功的 Web 应用系统性能测试

    基于Web服务器的应用系统由于提供浏览器界面而无须安装,大大降低了系统部署和升级成本,得以普遍应用。目前,很多企业的核心业务系统均是Web应用,但当Web应用的...

    一见
  • 微服务测试的思考与实践 | 洞见

    最近几年,微服务架构越来越火爆,逐渐被企业所采用。随着软件架构的变化,对应的软件测试策略需要作何调整呢?本文将介绍微服务架构下的测试策略,并结合分享在业务和架构...

    ThoughtWorks
  • 一篇文章入门Jmeter性能测试【经典长文】

    进入...\apache-jmeter-5.1\bin目录,双击jmeter.bat启动Jmeter,如果正常打``开,则安装成功。

    用户2149234
  • 性能测试从零开始实施指南——测试计划篇

    最近有些同学找我咨询关于性能测试计划相关的问题,原因是他们公司要做性能测试,Leader要求写一份性能测试计划,苦于之前没做过相关工作,无从下手。这篇文章,结合...

    写博客的老张
  • 关于蘑菇数据集的探索分析数据集描述读取数据集直观分析——颜色鲜艳的蘑菇都有毒?相关性分析——判断各指标与毒性相关性模型训练——使用决策树模型

    数据集描述 来源于kaggle的蘑菇数据集,包括毒性,大小,表面,颜色等,所有数据均为字符串类型,分析毒性与其他属性的关系 读取数据集 dataset = pd...

    月见樽
  • 大型网站压力测试及优化方案

    木桶理论又称短板理论,其核心思想是一只木桶盛水多少,并不取决于最高的木板,而取决于最短的那块木板。

    lyb-geek

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动