Akka之简单的自定义RPC框架(乞丐版)

关于Akka

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。系统几乎不会宕机(高可用性 99.9999999 % 一年只有 31 ms 宕机)。

自定义RPC通信框架(乞丐版)

目标

woker能发送成功注册,并定时发送心跳。

master能成功接收注册,并能接收心跳及完成自检。

大体思路

1、提供一个Master,负责woker的任务分配,注册及销毁。

2、提供一个Woker,负责Master分配的任务。需要定时向Master报告状态

3、Master内部提供自检机制,为其检测过期woker并销毁。

大体思路就是这样。下面开始撸代码:

1,消息模板类

package com.itunic.akka

/**
  * Created by itunic.com on 2016/12/12.
  */
trait RemoteMessage extends Serializable

//注册消息
//worker -> master
case class Register(workerId: String, memorys: Int, cores: Int) extends RemoteMessage

//返回注册成功信息 master的连接信息
//master -> worker
case class Registered(ip: String, port: Int) extends RemoteMessage

//报活
//worker -> master
case class TransferHeartbeat(workerId: String) extends RemoteMessage

//heartbeat ->worker
case object SendHeartbeat

//heartbeat->master
case object CheckTimeOutWorker

//master ->worker 重新注册
case object RegisterAgain

2,Woker实体类

package com.itunic.akka

/**
  * Created by itunic.com on 2016/12/13.
  */
class WorkerInfo(workerId: String, memory: Int, cores: Int) {
 //TODO 最后更新日期
  var lastTime: Long = _
}

3,核心类:Master

package com.itunic.akka

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable

/**
  * Created by root
  */
class Master(host: String, prot: Int) extends Actor {

  println("constructor invoked")
  var workerMap = new mutable.HashMap[String, WorkerInfo]()
 //检测时间
  val CHECK_TIME = 15000

  override def preStart(): Unit = {
    println("preStart invoked")
 //启动线程
 import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECK_TIME millis, self, CheckTimeOutWorker)
  }

 // 用于接收消息
  override def receive: Receive = {
 case Register(workerId, memorys, cores) => {
      println("a client connected")
 //判断map是否包含此worker
 if (!workerMap.contains(workerId)) {
        val workerInfo = new WorkerInfo(workerId, memorys, cores)
        workerMap += (workerId -> workerInfo)
      }
      sender ! Registered(host, prot)
    }
 case TransferHeartbeat(workerId) => {
 if (workerMap.contains(workerId)) {
        val info = workerMap(workerId)
        info.lastTime = System.currentTimeMillis()
      } else {
        sender ! RegisterAgain
      }
    }
 //检测
 case CheckTimeOutWorker => {
      val thisTime = System.currentTimeMillis()
      val removeWorker = workerMap.filter(x => thisTime - x._2.lastTime > CHECK_TIME)
 for (i <- removeWorker) {
        workerMap -= i._1
      }
      println(workerMap.size)
    }
 case "hello" => {
      println("hello")
    }
  }
}

object Master {
  def main(args: Array[String]) {

    val host = args(0)
    val port = args(1).toInt
 // 准备配置
    val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
 //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("MasterSystem", config)
 //创建Actor, 起个名字
    val master = actorSystem.actorOf(Props(new Master(host, port)), "Master") //Master主构造器会执行
    master ! "hello" //发送信息
    actorSystem.awaitTermination() //让进程等待着, 先别结束

  }
}

4,核心类:Woker

package com.itunic.akka

import java.util.UUID
import scala.concurrent.duration._

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps

/**
  * Created by itunic.com on 2016/12/13.
  */
class Worker(val masterHost: String, val masterPort: Int) extends Actor {

  var master: ActorSelection = _
 //worker唯一标识
  val workerId = UUID.randomUUID().toString
 //内存
  val memory = 40000
 //cpu核心数
  val cores = 4
 //心跳时间
  val HEARTBEAT_TIME = 10000

 //建立连接
  override def preStart(): Unit = {
 //在master启动时会打印下面的那个协议, 可以先用这个做一个标志, 连接哪个master
 //继承actor后会有一个context, 可以通过它来连接
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //需要有/user, Master要和master那边创建的名字保持一致
    master ! Register(workerId, memory, cores)
  }

  override def receive: Receive = {
 //向Master注册。
 case Registered(host, port) => {
      println("a reply form master" + host + ":" + port)
 //启动线程,进行发送心跳
 import context.dispatcher
      context.system.scheduler.schedule(0 millis, HEARTBEAT_TIME millis, self, SendHeartbeat)
    }
 //发送心跳给Master
 case SendHeartbeat => {
      println("from SendHeartbeat by worker")

      master ! TransferHeartbeat(workerId)
    }
 //重新注册
 case RegisterAgain => {
      master ! Register(workerId, memory, cores)
    }
  }
}

object Worker {
  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt

 // 准备配置
    val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
 //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("WorkerSystem", config)
    actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
    actorSystem.awaitTermination()
  }
}

5,Maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.itunic.akka</groupId>
 <artifactId>my-rpc</artifactId>
 <version>1.0-SNAPSHOT</version>
 <properties>
 <maven.compiler.source>1.7</maven.compiler.source>
 <maven.compiler.target>1.7</maven.compiler.target>
 <encoding>UTF-8</encoding>
 <scala.version>2.11.8</scala.version>
 <scala.compat.version>2.11</scala.compat.version>
 </properties>

 <dependencies>
 <dependency>
 <groupId>org.scala-lang</groupId>
 <artifactId>scala-library</artifactId>
 <version>${scala.version}</version>
 </dependency>

 <dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-actor_2.11</artifactId>
 <version>2.3.15</version>
 </dependency>


 <dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-remote_2.11</artifactId>
 <version>2.3.15</version>
 </dependency>

 </dependencies>

 <build>
 <sourceDirectory>src/main/scala</sourceDirectory>
 <testSourceDirectory>src/test/scala</testSourceDirectory>
 <plugins>
 <plugin>
 <groupId>net.alchim31.maven</groupId>
 <artifactId>scala-maven-plugin</artifactId>
 <version>3.2.2</version>
 <executions>
 <execution>
 <goals>
 <goal>compile</goal>
 <goal>testCompile</goal>
 </goals>
 <configuration>
 <args>
 <!--<arg>-make:transitive</arg>-->
 <arg>-dependencyfile</arg>
 <arg>${project.build.directory}/.scala_dependencies</arg>
 </args>
 </configuration>
 </execution>
 </executions>
 </plugin>

 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <version>2.4.3</version>
 <executions>
 <execution>
 <phase>package</phase>
 <goals>
 <goal>shade</goal>
 </goals>
 <configuration>
 <filters>
 <filter>
 <artifact>*:*</artifact>
 <excludes>
 <exclude>META-INF/*.SF</exclude>
 <exclude>META-INF/*.DSA</exclude>
 <exclude>META-INF/*.RSA</exclude>
 </excludes>
 </filter>
 </filters>
 <transformers>
 <transformer
 implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
 <resource>reference.conf</resource>
 </transformer>
 <transformer
 implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
 <mainClass>com.itunic.akka.Master</mainClass>
 </transformer>
 </transformers>
 </configuration>
 </execution>
 </executions>
 </plugin>
 </plugins>
 </build>

</project>

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Golang语言社区

【Golang语言社区】GO1.9 map并发安全测试

var m sync.Map //全局 func maintest() { // 第一个 YongHuomap := make(map[st...

4688
来自专栏跟着阿笨一起玩NET

c#实现打印功能

2632
来自专栏Ceph对象存储方案

Luminous版本PG 分布调优

Luminous版本开始新增的balancer模块在PG分布优化方面效果非常明显,操作也非常简便,强烈推荐各位在集群上线之前进行这一操作,能够极大的提升整个集群...

3095
来自专栏转载gongluck的CSDN博客

cocos2dx 打灰机

#include "GamePlane.h" #include "PlaneSprite.h" #include "BulletNode.h" #include...

5346
来自专栏一个会写诗的程序员的博客

Spring Reactor 项目核心库Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactiv...

2132
来自专栏java 成神之路

使用 NIO 实现 echo 服务器

4537
来自专栏陈仁松博客

ASP.NET Core 'Microsoft.Win32.Registry' 错误修复

今天在发布Asp.net Core应用到Azure的时候出现错误InvalidOperationException: Cannot find compilati...

4818
来自专栏张善友的专栏

Miguel de Icaza 细说 Mix 07大会上的Silverlight和DLR

Mono之父Miguel de Icaza 详细报道微软Mix 07大会上的Silverlight和DLR ,上面还谈到了Mono and Silverligh...

2697
来自专栏一个爱瞎折腾的程序猿

sqlserver使用存储过程跟踪SQL

USE [master] GO /****** Object: StoredProcedure [dbo].[sp_perfworkload_trace_s...

2000
来自专栏张善友的专栏

Silverlight + Model-View-ViewModel (MVVM)

     早在2005年,John Gossman写了一篇关于Model-View-ViewModel模式的博文,这种模式被他所在的微软的项目组用来创建Expr...

2938

扫码关注云+社区