Akka之简单的自定义RPC框架(乞丐版)

关于Akka

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。系统几乎不会宕机(高可用性 99.9999999 % 一年只有 31 ms 宕机)。

自定义RPC通信框架(乞丐版)

目标

woker能发送成功注册,并定时发送心跳。

master能成功接收注册,并能接收心跳及完成自检。

大体思路

1、提供一个Master,负责woker的任务分配,注册及销毁。

2、提供一个Woker,负责Master分配的任务。需要定时向Master报告状态

3、Master内部提供自检机制,为其检测过期woker并销毁。

大体思路就是这样。下面开始撸代码:

1,消息模板类

package com.itunic.akka

/**
  * Created by itunic.com on 2016/12/12.
  */
trait RemoteMessage extends Serializable

//注册消息
//worker -> master
case class Register(workerId: String, memorys: Int, cores: Int) extends RemoteMessage

//返回注册成功信息 master的连接信息
//master -> worker
case class Registered(ip: String, port: Int) extends RemoteMessage

//报活
//worker -> master
case class TransferHeartbeat(workerId: String) extends RemoteMessage

//heartbeat ->worker
case object SendHeartbeat

//heartbeat->master
case object CheckTimeOutWorker

//master ->worker 重新注册
case object RegisterAgain

2,Woker实体类

package com.itunic.akka

/**
  * Created by itunic.com on 2016/12/13.
  */
class WorkerInfo(workerId: String, memory: Int, cores: Int) {
 //TODO 最后更新日期
  var lastTime: Long = _
}

3,核心类:Master

package com.itunic.akka

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable

/**
  * Created by root
  */
class Master(host: String, prot: Int) extends Actor {

  println("constructor invoked")
  var workerMap = new mutable.HashMap[String, WorkerInfo]()
 //检测时间
  val CHECK_TIME = 15000

  override def preStart(): Unit = {
    println("preStart invoked")
 //启动线程
 import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECK_TIME millis, self, CheckTimeOutWorker)
  }

 // 用于接收消息
  override def receive: Receive = {
 case Register(workerId, memorys, cores) => {
      println("a client connected")
 //判断map是否包含此worker
 if (!workerMap.contains(workerId)) {
        val workerInfo = new WorkerInfo(workerId, memorys, cores)
        workerMap += (workerId -> workerInfo)
      }
      sender ! Registered(host, prot)
    }
 case TransferHeartbeat(workerId) => {
 if (workerMap.contains(workerId)) {
        val info = workerMap(workerId)
        info.lastTime = System.currentTimeMillis()
      } else {
        sender ! RegisterAgain
      }
    }
 //检测
 case CheckTimeOutWorker => {
      val thisTime = System.currentTimeMillis()
      val removeWorker = workerMap.filter(x => thisTime - x._2.lastTime > CHECK_TIME)
 for (i <- removeWorker) {
        workerMap -= i._1
      }
      println(workerMap.size)
    }
 case "hello" => {
      println("hello")
    }
  }
}

object Master {
  def main(args: Array[String]) {

    val host = args(0)
    val port = args(1).toInt
 // 准备配置
    val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
 //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("MasterSystem", config)
 //创建Actor, 起个名字
    val master = actorSystem.actorOf(Props(new Master(host, port)), "Master") //Master主构造器会执行
    master ! "hello" //发送信息
    actorSystem.awaitTermination() //让进程等待着, 先别结束

  }
}

4,核心类:Woker

package com.itunic.akka

import java.util.UUID
import scala.concurrent.duration._

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps

/**
  * Created by itunic.com on 2016/12/13.
  */
class Worker(val masterHost: String, val masterPort: Int) extends Actor {

  var master: ActorSelection = _
 //worker唯一标识
  val workerId = UUID.randomUUID().toString
 //内存
  val memory = 40000
 //cpu核心数
  val cores = 4
 //心跳时间
  val HEARTBEAT_TIME = 10000

 //建立连接
  override def preStart(): Unit = {
 //在master启动时会打印下面的那个协议, 可以先用这个做一个标志, 连接哪个master
 //继承actor后会有一个context, 可以通过它来连接
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //需要有/user, Master要和master那边创建的名字保持一致
    master ! Register(workerId, memory, cores)
  }

  override def receive: Receive = {
 //向Master注册。
 case Registered(host, port) => {
      println("a reply form master" + host + ":" + port)
 //启动线程,进行发送心跳
 import context.dispatcher
      context.system.scheduler.schedule(0 millis, HEARTBEAT_TIME millis, self, SendHeartbeat)
    }
 //发送心跳给Master
 case SendHeartbeat => {
      println("from SendHeartbeat by worker")

      master ! TransferHeartbeat(workerId)
    }
 //重新注册
 case RegisterAgain => {
      master ! Register(workerId, memory, cores)
    }
  }
}

object Worker {
  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt

 // 准备配置
    val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
 //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("WorkerSystem", config)
    actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
    actorSystem.awaitTermination()
  }
}

5,Maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.itunic.akka</groupId>
 <artifactId>my-rpc</artifactId>
 <version>1.0-SNAPSHOT</version>
 <properties>
 <maven.compiler.source>1.7</maven.compiler.source>
 <maven.compiler.target>1.7</maven.compiler.target>
 <encoding>UTF-8</encoding>
 <scala.version>2.11.8</scala.version>
 <scala.compat.version>2.11</scala.compat.version>
 </properties>

 <dependencies>
 <dependency>
 <groupId>org.scala-lang</groupId>
 <artifactId>scala-library</artifactId>
 <version>${scala.version}</version>
 </dependency>

 <dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-actor_2.11</artifactId>
 <version>2.3.15</version>
 </dependency>


 <dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-remote_2.11</artifactId>
 <version>2.3.15</version>
 </dependency>

 </dependencies>

 <build>
 <sourceDirectory>src/main/scala</sourceDirectory>
 <testSourceDirectory>src/test/scala</testSourceDirectory>
 <plugins>
 <plugin>
 <groupId>net.alchim31.maven</groupId>
 <artifactId>scala-maven-plugin</artifactId>
 <version>3.2.2</version>
 <executions>
 <execution>
 <goals>
 <goal>compile</goal>
 <goal>testCompile</goal>
 </goals>
 <configuration>
 <args>
 <!--<arg>-make:transitive</arg>-->
 <arg>-dependencyfile</arg>
 <arg>${project.build.directory}/.scala_dependencies</arg>
 </args>
 </configuration>
 </execution>
 </executions>
 </plugin>

 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <version>2.4.3</version>
 <executions>
 <execution>
 <phase>package</phase>
 <goals>
 <goal>shade</goal>
 </goals>
 <configuration>
 <filters>
 <filter>
 <artifact>*:*</artifact>
 <excludes>
 <exclude>META-INF/*.SF</exclude>
 <exclude>META-INF/*.DSA</exclude>
 <exclude>META-INF/*.RSA</exclude>
 </excludes>
 </filter>
 </filters>
 <transformers>
 <transformer
 implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
 <resource>reference.conf</resource>
 </transformer>
 <transformer
 implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
 <mainClass>com.itunic.akka.Master</mainClass>
 </transformer>
 </transformers>
 </configuration>
 </execution>
 </executions>
 </plugin>
 </plugins>
 </build>

</project>

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏分布式系统进阶

KafkaController分析2-NetworkClient分析InFlightRequests类

671
来自专栏编码小白

tomcat源码解读四 tomcat中的processer

     Processor是一个接口,针对于不同协议下具有不同的具体实现类,其实现类的具体功能是处理http请求,主要是对协议进行解析,状态处理以及响应。然后...

4097
来自专栏码匠的流水账

聊聊NettyConnector的start及shutdown

reactor-netty-0.7.6.RELEASE-sources.jar!/reactor/ipc/netty/NettyConnector.java

581
来自专栏java、Spring、技术分享

Netty中Channel与Unsafe源码解读

  Channel是netty网络操作抽象类,包括网络的读,写,链路关闭,发起连接等。我们拿出NioServerSocketChannel来进行分析,NioSe...

943
来自专栏大闲人柴毛毛

Java并发编程的艺术(九)——批量获取多条线程的执行结果

当向线程池提交callable任务后,我们可能需要一次性获取所有返回结果,有三种处理方法。 方法一:自己维护返回结果 // 创建一个线程池 ExecutorSe...

3486
来自专栏软件开发 -- 分享 互助 成长

java多线程(内附实例:窗口售票问题、人和叉子的问题)

java多线程的开发有两种方法: (1)实现Runnable接口; (2)继承Thread类; 区别: (1)由于java中一个类只能继承一个父类,但是可以实现...

1847
来自专栏Golang语言社区

go的websocket实现原理与用法详解

本文实例讲述了go的websocket实现原理与用法。分享给大家供大家参考,具体如下: websocket分为握手和数据传输阶段,即进行了HTTP握手 + 双工...

3957
来自专栏腾讯云数据库团队的专栏

Elasticsearch查询解析

       Elasticsearch(ES)可用于全文检索、日志分析、指标分析、APM等众多场景,而且搭建部署容易,后期弹性扩容、故障处理简单。ES在一定程...

3854
来自专栏智能大石头

线程池ThreadPool及Task调度机制分析

近1年,偶尔发生应用系统启动时某些操作超时的问题,特别在使用4核心Surface以后。笔记本和台式机比较少遇到,服务器则基本上没有遇到过。

650
来自专栏码匠的流水账

FluxInterval实例及解析

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxInterval.java

661

扫码关注云+社区