Akka 使用系列之三: 层次结构和容错机制

这篇文章介绍 Akka 层次结构,以及基于层次结构的容错机制。

1. Akka 的层次结构

我们需要实现一个翻译模块,其功能是输入中文输出多国语言。我们可以让一个 Master Actor 负责接收外界输入,多个 Worker Actor 负责将输入翻译成特定语言,Master Actor 和 Worker Actor 之间是上下级层次关系。下图展示了这种层级结构。

具体代码实现如下所示。

class Master extends Actor with ActorLogging{
    val english2chinese 
        = context.actorOf(Props[English2Chinese],"English2Chinese")
    val english2cat     
        = context.actorOf(Props[English2Cat],"English2Cat")

    def receive = {
        case eng1:String =>{
            english2chinese ! eng1
            english2cat     ! eng1
        }
    }
    
}

class English2Chinese extends Actor with ActorLogging{
    def receive = {
        case eng:String => {
            println("我翻译不出来!")
        }
    }
}

class English2Cat extends Actor with ActorLogging{
    def receive = {
        case eng:String =>{
             println( "喵喵喵!")
        }
    }
}

object Main{
    def main(args:Array[String]):Unit = {
        val sys = ActorSystem("system")
        val master = sys.actorOf(Props[Master],"Master")
        master ! "Hello,world!"
    }
}

我们在 Master Actor 中使用 context.actorOf 实例化 English2Chinese 和 English2Cat,便可以在它们之间形成层次关系。这点通过它们的 actor 地址得到证实。

上面的 Actors 层次结构是我们程序里 Actor 的层次结构。这个层次结构是 Actor System 层次结构的一部分。Actor System 层次结构从根节点出来有两个子节点:UserGuardian 和 SystemGuardian。用户程序产生的所有 Actor 都在 UserGuardian 节点下,SystemGuardian 节点则包含系统中的一些 Actor,比如 deadLetterListener。如果一个 Actor 已经 stop 了,发送给这个 Actor 的消息就会被转送到 deadLetterListener。因此完整的 Actor 层次结构如下所示。

2. Akka 的容错机制

对于分布式系统来说,容错机制是很重要的指标。那么 Akka 是怎么实现容错的呢?Akka 的容错机制是基于层次结构: Akka 在 Actor 加一个监控策略,对其子 Actor 进行监控。下面的代码是给 Actor 加了一个监控策略,其监控策略内容:如果子 Actor 在运行过程中抛出 Exception,对该子 Actor 执行停止动作 (即停止该子 Actor)。

 override val supervisorStrategy = OneForOneStrategy(){
      case _:Exception => Stop
}

Akka 的监控策略一共支持四种动作:Stop, Resume, Restart 和 Escalate。

1. Stop 子 Actor 停止。 2. Resume 子 Actor 忽略引发异常的消息,继续处理后续消息。 3. Restart 子 Actor 停止,重新初始化一个子 Actor 处理后续消息 4. Escalate 错误太严重,自己已经无法处理,将错误信息上报给父 Actor。

Akka 的监控策略分为两种。一种是 OneForOne。这种策略只对抛出 Exception 的子 Actor 执行相应动作。还是拿上面的翻译模块做例子,我们加入一个 OneForOne 的 Stop 的监控策略。

class Master1 extends Actor with ActorLogging{
    val english2Chinese = 
    context.actorOf(Props[English2Chinese1],"English2Chinese")
    val english2Cat     = 
    context.actorOf(Props[English2Cat1], "English2Cat")
    override val supervisorStrategy = OneForOneStrategy(){
      case _:Exception                    => Stop
    }
    override def receive = {
      case eng:String => {
        english2Cat ! eng;
        english2Chinese ! eng;
      }
    }
}
class English2Chinese1 extends Actor with ActorLogging{
  override def receive = {
    case eng:String => {
      println("翻译不出来")
    }
  }
}
class English2Cat1 extends Actor with ActorLogging{
  override def receive = {
    case eng:String => {
      throw new Exception("Exception in English2Cat1")
    }
  }
}

object hierarchy1 {
  def main(args:Array[String]):Unit = {
    val system = ActorSystem("system")
    val master = system.actorOf(Props[Master1],"Master")
    master ! "Hello, world"
    Thread.sleep(1000)
    master ! "Hello, world"
  }
}

运行这段代码,我们得到下面结果。从下面的结果,我们可以看出:第一轮 English2Cat1 抛出了 Exception, English2Chinese1 正常工作;第二轮,English2Cat1 已经死了,English2Chinese1 还在正常工作。

另一种是 AllForOne。如果有子 Actor 抛出 Exception,这种监控策略对所有子 Actor 执行动作。

class Master2 extends Actor with ActorLogging{
    val english2Chinese = 
    context.actorOf(Props[English2Chinese2],"English2Chinese")
    val english2Cat     = 
    context.actorOf(Props[English2Cat2], "English2Cat")

    override val supervisorStrategy= AllForOneStrategy() {
      case _: Exception                   => Stop
    }

    override def receive = {
      case eng:String => {
        english2Cat ! eng;
        english2Chinese ! eng;
      }
    }
}

运行这段代码,我们得到下面结果。从下面的结果,我们可以看出:第一轮 English2Cat1 抛出了 Exception, English2Chinese1 正常工作;第二轮,English2Cat1 已经死了,English2Chinese1 也已经死亡了。这个结果说明监控策略已经将 MasterActor 的所有子 Actor 停止了。

3. 总结

我们使用 Akka 开发并行程序时,可以使用层级结构组织 Actors。层次结构不仅比较符合人类直觉,还为容错提供了机制保障。本文的所有代码已经上传到 GitHub 。欢迎关注 AlgorithmDog 公众号,每两周的更新会有推送哦。

Akka 系列系列文章

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android知识点总结

Java总结IO篇之File类和Properties类

打开颜色选择器 :读流I-->字符串分割-->字符串存入Map-->使用Map对象还原用户配置 修改配置时 :写流O-->创建Map对象-->字符...

1282
来自专栏逍遥剑客的游戏开发

MPQ文件系统优化(续)

1625
来自专栏DOTNET

.Net多线程编程—Parallel LINQ、线程池

Parallel LINQ 1 System.Linq.ParallelEnumerable 重要方法概览: 1)public static ParallelQ...

2907
来自专栏jeremy的技术点滴

JVM的Finalization Delay引起的OOM

3708
来自专栏SDNLAB

POF技术分享(三):Packet处理流程

前言: 之前对POF基本原理、POF交换机源码结构进行解读,但是,要想完成POF交换机的二次开发和拓展,有必要对POF交换机特有的数据包处理流程、POF交换机和...

35712
来自专栏祝威廉

ElasticSearch Aggregations GroupBy 实现源码分析

也就是按newtype 字段进行group by,然后对num求平均值。在我们实际的业务系统中,这种统计需求也是最多的。

2343
来自专栏wannshan(javaer,RPC)

dubbo @Activate 注解使用和实现解析

Activate注解表示一个扩展是否被激活(使用),可以放在类定义和方法上,dubbo用它在spi扩展类定义上,表示这个扩展实现激活条件和时机。先看下定义: @...

4116
来自专栏芋道源码1024

Spring Webflux —— 源码阅读之 handler 包

查找给定请求的handler,如果找不到特定的请求,则返回一个空的Mono。这个方法被getHandler(org.springframework.web.se...

2435
来自专栏我的小碗汤

golang 设置 http response 响应头与坑

之前遇到个问题,在一段代码中这样设置WriteHeader,最后在header中取Name时怎么也取不到。

1203
来自专栏Spark生态圈

[spark] Shuffle Write解析 (Sort Based Shuffle)

从 Spark 2.0 开始移除了Hash Based Shuffle,想要了解可参考Shuffle 过程,本文将讲解 Sort Based Shuffle。

1332

扫码关注云+社区