Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。系统几乎不会宕机(高可用性 99.9999999 % 一年只有 31 ms 宕机)。
woker能发送成功注册,并定时发送心跳。
master能成功接收注册,并能接收心跳及完成自检。
1、提供一个Master,负责woker的任务分配,注册及销毁。
2、提供一个Woker,负责Master分配的任务。需要定时向Master报告状态
3、Master内部提供自检机制,为其检测过期woker并销毁。
大体思路就是这样。下面开始撸代码:
1,消息模板类
package com.itunic.akka
/**
* Created by itunic.com on 2016/12/12.
*/
trait RemoteMessage extends Serializable
//注册消息
//worker -> master
case class Register(workerId: String, memorys: Int, cores: Int) extends RemoteMessage
//返回注册成功信息 master的连接信息
//master -> worker
case class Registered(ip: String, port: Int) extends RemoteMessage
//报活
//worker -> master
case class TransferHeartbeat(workerId: String) extends RemoteMessage
//heartbeat ->worker
case object SendHeartbeat
//heartbeat->master
case object CheckTimeOutWorker
//master ->worker 重新注册
case object RegisterAgain
2,Woker实体类
package com.itunic.akka
/**
* Created by itunic.com on 2016/12/13.
*/
class WorkerInfo(workerId: String, memory: Int, cores: Int) {
//TODO 最后更新日期
var lastTime: Long = _
}
3,核心类:Master
package com.itunic.akka
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable
/**
* Created by root
*/
class Master(host: String, prot: Int) extends Actor {
println("constructor invoked")
var workerMap = new mutable.HashMap[String, WorkerInfo]()
//检测时间
val CHECK_TIME = 15000
override def preStart(): Unit = {
println("preStart invoked")
//启动线程
import context.dispatcher
context.system.scheduler.schedule(0 millis, CHECK_TIME millis, self, CheckTimeOutWorker)
}
// 用于接收消息
override def receive: Receive = {
case Register(workerId, memorys, cores) => {
println("a client connected")
//判断map是否包含此worker
if (!workerMap.contains(workerId)) {
val workerInfo = new WorkerInfo(workerId, memorys, cores)
workerMap += (workerId -> workerInfo)
}
sender ! Registered(host, prot)
}
case TransferHeartbeat(workerId) => {
if (workerMap.contains(workerId)) {
val info = workerMap(workerId)
info.lastTime = System.currentTimeMillis()
} else {
sender ! RegisterAgain
}
}
//检测
case CheckTimeOutWorker => {
val thisTime = System.currentTimeMillis()
val removeWorker = workerMap.filter(x => thisTime - x._2.lastTime > CHECK_TIME)
for (i <- removeWorker) {
workerMap -= i._1
}
println(workerMap.size)
}
case "hello" => {
println("hello")
}
}
}
object Master {
def main(args: Array[String]) {
val host = args(0)
val port = args(1).toInt
// 准备配置
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
val actorSystem = ActorSystem("MasterSystem", config)
//创建Actor, 起个名字
val master = actorSystem.actorOf(Props(new Master(host, port)), "Master") //Master主构造器会执行
master ! "hello" //发送信息
actorSystem.awaitTermination() //让进程等待着, 先别结束
}
}
4,核心类:Woker
package com.itunic.akka
import java.util.UUID
import scala.concurrent.duration._
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.language.postfixOps
/**
* Created by itunic.com on 2016/12/13.
*/
class Worker(val masterHost: String, val masterPort: Int) extends Actor {
var master: ActorSelection = _
//worker唯一标识
val workerId = UUID.randomUUID().toString
//内存
val memory = 40000
//cpu核心数
val cores = 4
//心跳时间
val HEARTBEAT_TIME = 10000
//建立连接
override def preStart(): Unit = {
//在master启动时会打印下面的那个协议, 可以先用这个做一个标志, 连接哪个master
//继承actor后会有一个context, 可以通过它来连接
master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //需要有/user, Master要和master那边创建的名字保持一致
master ! Register(workerId, memory, cores)
}
override def receive: Receive = {
//向Master注册。
case Registered(host, port) => {
println("a reply form master" + host + ":" + port)
//启动线程,进行发送心跳
import context.dispatcher
context.system.scheduler.schedule(0 millis, HEARTBEAT_TIME millis, self, SendHeartbeat)
}
//发送心跳给Master
case SendHeartbeat => {
println("from SendHeartbeat by worker")
master ! TransferHeartbeat(workerId)
}
//重新注册
case RegisterAgain => {
master ! Register(workerId, memory, cores)
}
}
}
object Worker {
def main(args: Array[String]) {
val host = args(0)
val port = args(1).toInt
val masterHost = args(2)
val masterPort = args(3).toInt
// 准备配置
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
val actorSystem = ActorSystem("WorkerSystem", config)
actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
actorSystem.awaitTermination()
}
}
5,Maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itunic.akka</groupId>
<artifactId>my-rpc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.15</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.15</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!--<arg>-make:transitive</arg>-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.itunic.akka.Master</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>