专栏首页维C果糖Akka 指南 之「容错」

Akka 指南 之「容错」

文章目录

容错

依赖

容错(fault tolerance)概念与 Actor 相关,为了使用这些概念,需要在项目中添加如下依赖:

<!-- Maven -->
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor_2.12</artifactId>
  <version>2.5.21</version>
</dependency>

<!-- Gradle -->
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-actor_2.12', version: '2.5.21'
}

<!-- sbt -->
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.21"

简介

正如在「Actor System」中所解释的,每个 Actor 都是其子级的监督者,因此每个 Actor 定义了故障处理的监督策略。这一策略不能在 Actor 系统启动之后改变,因为它是 Actor 系统结构的一个组成部分。

实践中的故障处理

首先,让我们看一个示例,它演示了处理数据存储错误的一种方法,这是现实应用程序中的典型故障源。当然,这取决于实际的应用程序,当数据存储不可用时可以做什么,但是在这个示例中,我们使用了一种尽最大努力的重新连接方法。

阅读以下源代码。内部的注释解释了故障处理的不同部分以及添加它们的原因。强烈建议运行此示例,因为很容易跟踪日志输出以了解运行时发生的情况。

创建监督策略

以下章节将更深入地解释故障处理机制和备选方案。

为了演示,让我们考虑以下策略:

private static SupervisorStrategy strategy =
    new OneForOneStrategy(
        10,
        Duration.ofMinutes(1),
        DeciderBuilder.match(ArithmeticException.class, e -> SupervisorStrategy.resume())
            .match(NullPointerException.class, e -> SupervisorStrategy.restart())
            .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
            .matchAny(o -> SupervisorStrategy.escalate())
            .build());

@Override
public SupervisorStrategy supervisorStrategy() {
  return strategy;
}

我们选择了一些众所周知的异常类型,以演示在「supervision」中描述的故障处理指令的应用。首先,一对一策略(one-for-one strategy)意味着每个子级都被单独对待(这和all-for-one策略的效果非常相似,唯一的区别是all-for-one策略中任何决定都适用于监督者的所有子级,而不仅仅是失败的子级)。在上面的示例中,10Duration.create(1, TimeUnit.MINUTES)分别传递给maxNrOfRetrieswithinTimeRange参数,这意味着策略每分钟重新启动一个子级最多10次。如果在withinTimeRange持续时间内重新启动计数超过maxNrOfRetries,则子 Actor 将停止。

此外,这些参数还有一些特殊的值。如果你指定:

  • -1maxNrOfRetriesDuration.Inf()withinTimeRange
    • 总是无限制地重新启动子级
  • -1maxNrOfRetries,有限的DurationwithinTimeRange
    • maxNrOfRetries被视为1
  • 非负数到maxNrOfRetriesDuration.Inf()withinTimeRange
    • withinTimeRange被视为无限持续(即无论需要多长时间),一旦重新启动计数超过maxNrOfRetries,子 Actor 将停止。

构成主体的match语句,由DeciderBuildermatch方法返回的PFBuilder组成,其中builderbuild方法完成。这是将子故障类型映射到相应指令的部分。

  • 注释:如果策略在监督者 Actor(而不是单独的类)中声明,则其决策者可以线程安全方式访问 Actor 的所有内部状态,包括获取对当前失败的子级的引用,可用作失败消息的getSender()

默认监督策略

如果定义的策略不包括引发的异常,则使用升级(escalate)。

如果没有为 Actor 定义监督策略,则默认情况下会处理以下异常:

  • ActorInitializationException将停止失败的子 Actor
  • ActorKilledException将停止失败的子 Actor
  • DeathPactException将停止失败的子 Actor
  • Exception将重新启动失败的子 Actor
  • 其他类型的Throwable将升级到父级 Actor

如果异常一直升级到根守护者,它将以与上面定义的默认策略相同的方式处理它。

停止监督策略

更接Erlang的方法是在子级失败时采取措施阻止他们,然后在DeathWatch显示子级死亡时由监督者采取纠正措施。此策略还预打包为SupervisorStrategy.stoppingStrategy,并附带一个StoppingSupervisorStrategy配置程序,以便在你希望/user监护人应用它时使用。

记录 Actor 的失败

默认情况下,除非升级,否则SupervisorStrategy会记录故障。升级的故障应该在层次结构中更高的级别处理并记录下来。

通过在实例化时将loggingEnabled设置为false,可以将SupervisorStrategy的默认日志设置为静音。定制的日志记录可以在Decider内完成。请注意,当在监督者 Actor 内部声明SupervisorStrategy时,对当前失败的子级的引用可用作sender

你还可以通过重写logFailure方法自定义自己的SupervisorStrategy中的日志记录。

顶级 Actor 的监督者

顶级 Actor 是指使用system.actorOf()创建的 Actor,它们是「User Guardian」的子代。守护者应用配置的策略,在这种情况下没有应用特殊的规则。

测试应用

下面的部分展示了不同指令在实践中的效果,其中需要一个测试设置。首先,我们需要一个合适的监督者:

import akka.japi.pf.DeciderBuilder;
import akka.actor.SupervisorStrategy;

static class Supervisor extends AbstractActor {

  private static SupervisorStrategy strategy =
      new OneForOneStrategy(
          10,
          Duration.ofMinutes(1),
          DeciderBuilder.match(ArithmeticException.class, e -> SupervisorStrategy.resume())
              .match(NullPointerException.class, e -> SupervisorStrategy.restart())
              .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
              .matchAny(o -> SupervisorStrategy.escalate())
              .build());

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }


  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Props.class,
            props -> {
              getSender().tell(getContext().actorOf(props), getSelf());
            })
        .build();
  }
}

这个监督者将被用来创建一个子级,我们可以用它进行实验:

static class Child extends AbstractActor {
  int state = 0;

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Exception.class,
            exception -> {
              throw exception;
            })
        .match(Integer.class, i -> state = i)
        .matchEquals("get", s -> getSender().tell(state, getSelf()))
        .build();
  }
}

通过使用「TestKit」中描述的实用程序,测试更容易,其中TestProbe提供了一个 Actor 引用,可用于接收和检查回复。

import akka.testkit.TestProbe;
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import static java.util.concurrent.TimeUnit.SECONDS;
import static akka.japi.Util.immutableSeq;
import scala.concurrent.Await;

public class FaultHandlingTest extends AbstractJavaTest {
  static ActorSystem system;
  scala.concurrent.duration.Duration timeout =
      scala.concurrent.duration.Duration.create(5, SECONDS);

  @BeforeClass
  public static void start() {
    system = ActorSystem.create("FaultHandlingTest", config);
  }

  @AfterClass
  public static void cleanup() {
    TestKit.shutdownActorSystem(system);
    system = null;
  }

  @Test
  public void mustEmploySupervisorStrategy() throws Exception {
    // code here
  }
}

让我们创建 Actor:

Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child =
    (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout);

第一个测试将演示Resume指令,因此我们通过在 Actor 中设置一些非初始状态进行尝试,并使其失败:

child.tell(42, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);

如你所见,值42保留了故障处理指令。现在,如果我们将失败更改为更严重的NullPointerException,情况将不再如此:

child.tell(new NullPointerException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);

最后,如果发生致命的IllegalArgumentException,监督者将终止该子级:

final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);

到目前为止,监督者完全不受子级失败的影响,因为指令集(directives set)确实处理了它。如果出现Exception情况,则情况不再如此,监督者会将失败升级。

child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);

监管者本身由ActorSystem提供的顶级 Actor 进行监督,它具有在所有异常情况下重新启动的默认策略(ActorInitializationExceptionActorKilledException的显著异常)。因为重启时的默认指令是杀死所有的子级,所以我们不希望子级在这次失败中幸存。

如果不需要这样做(这取决于用例),我们需要使用一个不同的监督者来覆盖这个行为。

static class Supervisor2 extends AbstractActor {

  private static SupervisorStrategy strategy =
      new OneForOneStrategy(
          10,
          Duration.ofMinutes(1),
          DeciderBuilder.match(ArithmeticException.class, e -> SupervisorStrategy.resume())
              .match(NullPointerException.class, e -> SupervisorStrategy.restart())
              .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
              .matchAny(o -> SupervisorStrategy.escalate())
              .build());

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }


  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Props.class,
            props -> {
              getSender().tell(getContext().actorOf(props), getSelf());
            })
        .build();
  }

  @Override
  public void preRestart(Throwable cause, Optional<Object> msg) {
    // do not kill all children, which is the default here
  }
}

使用此父级,子级可以在升级的重新启动后存活,如上一个测试所示:

superprops = Props.create(Supervisor2.class);
supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout);
child.tell(23, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Akka 指南 之「集群中的分布式发布订阅」

    为了使用分布式发布订阅(Distributed Publish Subscribe),你需要将以下依赖添加到你的项目中:

    CG国斌
  • 效率编程 之「泛型」

    每种泛型都定义了一组参数化类型,其构成格式为:先是类或者接口的名称,接着用尖括号(<>)把对应于泛型形式类型参数的实际类型参数列表括起来。例如,List<Str...

    CG国斌
  • 详述 IntelliJ IDEA 遇到 java -source 1.3 中不支持某某操作的解决方法

    在一个新的 Mac Pro 电脑中,安装 IntelliJ IDEA,并且配置了 JDK 1.8,打开测试项目,运行后,报出如下问题:

    CG国斌
  • 边缘计算:赢家是软还是硬

    今天想和大家谈的是边缘计算节点的最佳存在形态,是硬核的一体化盒子,还是基于开放硬件,用软件去定义。

    边缘计算
  • 使用PyInstaller将python转成可执行文件exe笔记

    首先需要下载PyInstaller和UPX,UPX是用来压缩exe的,点击超链接下载吧,目前稳定版本是1.3,注意选择你使用的操作系统。

    IT派
  • GNS3 配置介绍

    D:\progra~1\GNS3\SecureCRT\SecureCRT.EXE /script D:\progra~1\GNS3\SecureCRT\Scri...

    py3study
  • P05_kafka_2.9.2-0.8.1集群搭建

    安装scala 2.11.4 1、将课程提供的scala-2.11.4.tgz使用WinSCP拷贝到sparkproject1的/usr/local目录下。 ...

    Albert陈凯
  • 软件公司如何应对软件开发人员因涨薪“叛逃”的现象?

    正直风口上的移动互联网、大数据和人工智能,让软件开发人才炙手可热,工资待遇水涨船高。从“我们万事俱备,只差一个软件开发人员”,到“创业如何寻找技术合伙人”,技术...

    西安弈聪软件公司
  • 建筑和软件中模式之异同

    特别是中国传统建筑,那是很讲模式的,这些都是传统文化使然,比如京剧 一招一式都有套路;中国画,也有套路,树应该怎么画法?有几种画法?艺术大家通常是创造出自己的套...

    物流IT圈
  • Flutter 学习笔记 16 - Hero 动画

    七适散人

扫码关注云+社区

领取腾讯云代金券