前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Akka之简单的自定义RPC框架(乞丐版)

Akka之简单的自定义RPC框架(乞丐版)

作者头像
天策
发布2018-06-22 14:42:34
1.1K0
发布2018-06-22 14:42:34
举报
文章被收录于专栏:行者悟空行者悟空
关于Akka

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。系统几乎不会宕机(高可用性 99.9999999 % 一年只有 31 ms 宕机)。

自定义RPC通信框架(乞丐版)
目标

woker能发送成功注册,并定时发送心跳。

master能成功接收注册,并能接收心跳及完成自检。

大体思路

1、提供一个Master,负责woker的任务分配,注册及销毁。

2、提供一个Woker,负责Master分配的任务。需要定时向Master报告状态

3、Master内部提供自检机制,为其检测过期woker并销毁。

大体思路就是这样。下面开始撸代码:

1,消息模板类

代码语言:javascript
复制
package com.itunic.akka

/**
  * Created by itunic.com on 2016/12/12.
  */
trait RemoteMessage extends Serializable

//注册消息
//worker -> master
case class Register(workerId: String, memorys: Int, cores: Int) extends RemoteMessage

//返回注册成功信息 master的连接信息
//master -> worker
case class Registered(ip: String, port: Int) extends RemoteMessage

//报活
//worker -> master
case class TransferHeartbeat(workerId: String) extends RemoteMessage

//heartbeat ->worker
case object SendHeartbeat

//heartbeat->master
case object CheckTimeOutWorker

//master ->worker 重新注册
case object RegisterAgain

2,Woker实体类

代码语言:javascript
复制
package com.itunic.akka

/**
  * Created by itunic.com on 2016/12/13.
  */
class WorkerInfo(workerId: String, memory: Int, cores: Int) {
 //TODO 最后更新日期
  var lastTime: Long = _
}

3,核心类:Master

代码语言:javascript
复制
package com.itunic.akka

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable

/**
  * Created by root
  */
class Master(host: String, prot: Int) extends Actor {

  println("constructor invoked")
  var workerMap = new mutable.HashMap[String, WorkerInfo]()
 //检测时间
  val CHECK_TIME = 15000

  override def preStart(): Unit = {
    println("preStart invoked")
 //启动线程
 import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECK_TIME millis, self, CheckTimeOutWorker)
  }

 // 用于接收消息
  override def receive: Receive = {
 case Register(workerId, memorys, cores) => {
      println("a client connected")
 //判断map是否包含此worker
 if (!workerMap.contains(workerId)) {
        val workerInfo = new WorkerInfo(workerId, memorys, cores)
        workerMap += (workerId -> workerInfo)
      }
      sender ! Registered(host, prot)
    }
 case TransferHeartbeat(workerId) => {
 if (workerMap.contains(workerId)) {
        val info = workerMap(workerId)
        info.lastTime = System.currentTimeMillis()
      } else {
        sender ! RegisterAgain
      }
    }
 //检测
 case CheckTimeOutWorker => {
      val thisTime = System.currentTimeMillis()
      val removeWorker = workerMap.filter(x => thisTime - x._2.lastTime > CHECK_TIME)
 for (i <- removeWorker) {
        workerMap -= i._1
      }
      println(workerMap.size)
    }
 case "hello" => {
      println("hello")
    }
  }
}

object Master {
  def main(args: Array[String]) {

    val host = args(0)
    val port = args(1).toInt
 // 准备配置
    val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
 //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("MasterSystem", config)
 //创建Actor, 起个名字
    val master = actorSystem.actorOf(Props(new Master(host, port)), "Master") //Master主构造器会执行
    master ! "hello" //发送信息
    actorSystem.awaitTermination() //让进程等待着, 先别结束

  }
}

4,核心类:Woker

代码语言:javascript
复制
package com.itunic.akka

import java.util.UUID
import scala.concurrent.duration._

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps

/**
  * Created by itunic.com on 2016/12/13.
  */
class Worker(val masterHost: String, val masterPort: Int) extends Actor {

  var master: ActorSelection = _
 //worker唯一标识
  val workerId = UUID.randomUUID().toString
 //内存
  val memory = 40000
 //cpu核心数
  val cores = 4
 //心跳时间
  val HEARTBEAT_TIME = 10000

 //建立连接
  override def preStart(): Unit = {
 //在master启动时会打印下面的那个协议, 可以先用这个做一个标志, 连接哪个master
 //继承actor后会有一个context, 可以通过它来连接
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //需要有/user, Master要和master那边创建的名字保持一致
    master ! Register(workerId, memory, cores)
  }

  override def receive: Receive = {
 //向Master注册。
 case Registered(host, port) => {
      println("a reply form master" + host + ":" + port)
 //启动线程,进行发送心跳
 import context.dispatcher
      context.system.scheduler.schedule(0 millis, HEARTBEAT_TIME millis, self, SendHeartbeat)
    }
 //发送心跳给Master
 case SendHeartbeat => {
      println("from SendHeartbeat by worker")

      master ! TransferHeartbeat(workerId)
    }
 //重新注册
 case RegisterAgain => {
      master ! Register(workerId, memory, cores)
    }
  }
}

object Worker {
  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt

 // 准备配置
    val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
 //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("WorkerSystem", config)
    actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
    actorSystem.awaitTermination()
  }
}

5,Maven pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.itunic.akka</groupId>
 <artifactId>my-rpc</artifactId>
 <version>1.0-SNAPSHOT</version>
 <properties>
 <maven.compiler.source>1.7</maven.compiler.source>
 <maven.compiler.target>1.7</maven.compiler.target>
 <encoding>UTF-8</encoding>
 <scala.version>2.11.8</scala.version>
 <scala.compat.version>2.11</scala.compat.version>
 </properties>

 <dependencies>
 <dependency>
 <groupId>org.scala-lang</groupId>
 <artifactId>scala-library</artifactId>
 <version>${scala.version}</version>
 </dependency>

 <dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-actor_2.11</artifactId>
 <version>2.3.15</version>
 </dependency>


 <dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-remote_2.11</artifactId>
 <version>2.3.15</version>
 </dependency>

 </dependencies>

 <build>
 <sourceDirectory>src/main/scala</sourceDirectory>
 <testSourceDirectory>src/test/scala</testSourceDirectory>
 <plugins>
 <plugin>
 <groupId>net.alchim31.maven</groupId>
 <artifactId>scala-maven-plugin</artifactId>
 <version>3.2.2</version>
 <executions>
 <execution>
 <goals>
 <goal>compile</goal>
 <goal>testCompile</goal>
 </goals>
 <configuration>
 <args>
 <!--<arg>-make:transitive</arg>-->
 <arg>-dependencyfile</arg>
 <arg>${project.build.directory}/.scala_dependencies</arg>
 </args>
 </configuration>
 </execution>
 </executions>
 </plugin>

 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <version>2.4.3</version>
 <executions>
 <execution>
 <phase>package</phase>
 <goals>
 <goal>shade</goal>
 </goals>
 <configuration>
 <filters>
 <filter>
 <artifact>*:*</artifact>
 <excludes>
 <exclude>META-INF/*.SF</exclude>
 <exclude>META-INF/*.DSA</exclude>
 <exclude>META-INF/*.RSA</exclude>
 </excludes>
 </filter>
 </filters>
 <transformers>
 <transformer
 implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
 <resource>reference.conf</resource>
 </transformer>
 <transformer
 implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
 <mainClass>com.itunic.akka.Master</mainClass>
 </transformer>
 </transformers>
 </configuration>
 </execution>
 </executions>
 </plugin>
 </plugins>
 </build>

</project>
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016年12月14日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关于Akka
  • 自定义RPC通信框架(乞丐版)
    • 目标
      • 大体思路
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档