专栏首页SmartSiFlink 从Checkpoint中恢复作业

Flink 从Checkpoint中恢复作业

Flink 1.11 版本

1. 配置

如果我们的任务已经执行很长时间,突然遇到故障停止,那么中间过程处理结果就会全部丢失,重启后需要重新从上一次开始的位置消费,这会花费我们很长的时间。这种结局显示我们不能接受,我们希望的是作业在故障失败重启后能保留之前的状态并能从失败的位置继续消费。可以通过如下配置保存处理状态:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启Checkpoint
env.enableCheckpointing(1000);
// 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/flink/checkpoint"));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);

作业停止后 CheckPoint 数据默认会自动删除,所以需要如下配置来设置在作业失败被取消后 CheckPoint 数据不被删除:

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2. 验证

我们使用经典的 WordCount 实例来验证从 Checkpoint 中恢复作业并能沿用之前的状态信息。为了模拟作业失败并能恢复,我们判断当我们输入是 “ERROR” 时,抛出异常迫使作业失败:

public void flatMap(String value, Collector out) {
    // 失败信号
    if (Objects.equals(value, "ERROR")) {
        throw new RuntimeException("custom error flag, restart application");
    }
    ...
}

为了确保作业在失败后能自动恢复,我们设置了重启策略,失败后最多重启3次,每次重启间隔10s:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

我们看一下详细的代码:

public class RestoreCheckpointExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 配置Checkpoint
        env.enableCheckpointing(1000);
        env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/flink/checkpoint"));
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 配置失败重启策略:失败后最多重启3次 每次重启间隔10s
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

        DataStream<String> source = env.socketTextStream("localhost", 9100, "\n")
                .name("MySourceFunction");
        DataStream<Tuple2<String, Integer>> wordsCount = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector out) {
                // 失败信号
                if (Objects.equals(value, "ERROR")) {
                    throw new RuntimeException("custom error flag, restart application");
                }
                // 拆分单词
                for (String word : value.split("\\s")) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }).name("MyFlatMapFunction");

        DataStream<Tuple2<String, Integer>> windowCount = wordsCount
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> tuple) throws Exception {
                        return tuple.f0;
                    }
                })
                .sum(1).name("MySumFunction");

        windowCount.print().setParallelism(1).name("MyPrintFunction");
        env.execute("RestoreCheckpointExample");
    }
}

代码地址

下面我们具体操作进行验证。首先启动一个 nc 服务:

wy:opt wy$ nc -lk 9100

端口号为:9100

然后启动 RestoreCheckpointExample 作业:

wy:~ wy$ flink run -c com.flink.example.stream.state.checkpoint.RestoreCheckpointExample  ~/study/code/data-example/flink-example/target/flink-example-1.0.jar

下表是从 nc 服务输出测试数据,从 Flink Web 页面输出结果数据的详细信息:

序号

输入

输出

备注

1

a

(a,1)

2

a

(a,2)

3

b

(b,1)

4

ERROR

作业重启

5

b

(b,2)

6

a

(a,3)

7

ERROR

作业重启

8

a

(a,4)

9

ERROR

作业重启

10

b

(b,3)

11

ERROR

作业失败

从上面信息可以看出作业恢复后,计算结果也是基于作业失败前保存的状态上计算的。我们设置最多可以重启三次,当我们第四次输入 “ERROR” 数据时,程序彻底失败。

3. 作业状态变化

发送 ERROR 信号后,flatMap 算子抛出异常,由 RUNNING 状态切换为 FAILED,导致作业被取消:

2020-12-26 20:48:12,967 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: MySourceFunction -> MyFlatMapFunction (1/1) (be8abffb0f6815889929dc9b605b7ae5) switched from RUNNING to FAILED.
java.lang.RuntimeException: custom error flag, restart application
	at com.flink.example.stream.state.checkpoint.RestoreCheckpointExample$1.flatMap(RestoreCheckpointExample.java:39) ~[blob_p-353721c11ae1acd403dc8be3b663e9a60854d5c3-b6237955a73f418e6d7b272281b64594:?]
	at com.flink.example.stream.state.checkpoint.RestoreCheckpointExample$1.flatMap(RestoreCheckpointExample.java:34) ~[blob_p-353721c11ae1acd403dc8be3b663e9a60854d5c3-b6237955a73f418e6d7b272281b64594:?]
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:111) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
2020-12-26 20:48:12,978 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to cancel task MySumFunction -> Sink: MyPrintFunction (1/1) (d464321ae464046684fd28d37bdcc3d7).
2020-12-26 20:48:12,978 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - MySumFunction -> Sink: MyPrintFunction (1/1) (d464321ae464046684fd28d37bdcc3d7) switched from RUNNING to CANCELING.
...
2020-12-26 20:48:12,979 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - MySumFunction -> Sink: MyPrintFunction (1/1) (d464321ae464046684fd28d37bdcc3d7) switched from CANCELING to CANCELED.

由于我们设置了重启策略,重启间隔为10s,所以作业在10s之后重启,经过 CREATED -> DEPLOYING -> RUNNING 状态,作业被重启:

2020-12-26 20:48:22,997 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: MySourceFunction -> MyFlatMapFunction (1/1) (223b777dfc69013852e9ab37d3cc078e) switched from CREATED to DEPLOYING.
...
2020-12-26 20:48:22,998 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: MySourceFunction -> MyFlatMapFunction (1/1) (223b777dfc69013852e9ab37d3cc078e) switched from DEPLOYING to RUNNING.
2020-12-26 20:48:22,999 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - MySumFunction -> Sink: MyPrintFunction (1/1) (53e45aa6b16f0b82d1bde8325f0cfbaf) switched from CREATED to DEPLOYING.
...
2020-12-26 20:48:23,000 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - MySumFunction -> Sink: MyPrintFunction (1/1) (53e45aa6b16f0b82d1bde8325f0cfbaf) switched from DEPLOYING to RUNNING.

由于我们设置了最多重启三次,所以第四次发出 ERROR 信号后,作业彻底失败:

2020-12-26 21:05:29,294 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: MySourceFunction -> MyFlatMapFunction (1/1) (223b777dfc69013852e9ab37d3cc078e) switched from RUNNING to FAILED.
java.lang.RuntimeException: custom error flag, restart application
	at com.flink.example.stream.state.checkpoint.RestoreCheckpointExample$1.flatMap(RestoreCheckpointExample.java:39) ~[blob_p-353721c11ae1acd403dc8be3b663e9a60854d5c3-b6237955a73f418e6d7b272281b64594:?]
	at com.flink.example.stream.state.checkpoint.RestoreCheckpointExample$1.flatMap(RestoreCheckpointExample.java:34) ~[blob_p-353721c11ae1acd403dc8be3b663e9a60854d5c3-b6237955a73f418e6d7b272281b64594:?]
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
...
2020-12-26 21:05:29,332 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to cancel task MySumFunction -> Sink: MyPrintFunction (1/1) (53e45aa6b16f0b82d1bde8325f0cfbaf).
2020-12-26 21:05:29,332 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - MySumFunction -> Sink: MyPrintFunction (1/1) (53e45aa6b16f0b82d1bde8325f0cfbaf) switched from RUNNING to CANCELING.
...
2020-12-26 21:05:29,334 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - MySumFunction -> Sink: MyPrintFunction (1/1) (53e45aa6b16f0b82d1bde8325f0cfbaf) switched from CANCELING to CANCELED.
...
2020-12-26 21:05:29,353 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job a78621726e80e5bde6f936a177f0d052 from job leader monitoring.
2020-12-26 21:05:29,353 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job a78621726e80e5bde6f936a177f0d052.

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink Checkpoint 原理流程以及常见失败原因分析

    目前有赞实时任务主要以 Flink 为主,为了保证实时任务的容错恢复以及停止重启时的状态恢复,几乎所有的实时任务都会开启 Checkpoint 或者触发 Sav...

    有赞coder
  • Flink 清理过期 Checkpoint 目录的正确姿势

    本博客是笔者在生产环境使用 Flink 遇到的 Checkpoint 相关故障后,整理输出,价值较高的 实战采坑记,本文会带你更深入的了解 Flink 实现增量...

    zhisheng
  • Flink Savepoints和Checkpoints的3个不同点

    在本文中,我们将解释什么是 Savepoint,什么会使用它们,并就它们与 Checkpoint 的区别进行对比分析。

    smartsi
  • Apache Flink 管理大型状态之增量 Checkpoint 详解

    Apache Flink 是一个有状态的流计算框架,状态是作业算子中已经处理过的内存状态,供后续处理时使用。状态在流计算很多复杂场景中非常重要,比如:

    zhisheng
  • Apache Flink 管理大型状态之增量 Checkpoint 详解

    作者 | Stefan Ricther & Chris Ward 翻译 | 邱从贤(山智)

    用户6259908
  • Flink 管理大型状态之增量 Checkpoint

    Apache Flink 是一个有状态的流处理框架。什么是流处理应用程序的状态呢?你可以理解状态为应用程序算子中的内存。状态在流计算很多复杂场景中非常重要,比如...

    smartsi
  • Flink状态管理和容错机制介绍

    本文整理自去年8月11日在北京举行的 Flink Meetup 会议,分享嘉宾施晓罡,目前在阿里大数据团队部从事Blink方面的研发,现在主要负责Blink状...

    zhisheng
  • Flink Checkpoint机制原理剖析与参数配置

    在Flink状态管理详解这篇文章中,我们介绍了Flink的状态都是基于本地的,而Flink又是一个部署在多节点的分布式引擎,分布式系统经常出现进程被杀、节点宕机...

    PP鲁
  • Flink DataStream—— 状态(State)&检查点(Checkpoint)&保存点(Savepoint)原理

    ​ 最近一次项目当中需要将大量数据保存再Flink程序当中用作缓存数据一共后续数据使用,隧对最近使用到的状态、检查点、保存点等原理和使用进行一个总结

    俺也想起舞
  • Flink如何管理Kafka的消费偏移量

    在这篇文章中我们将结合例子逐步讲解 Flink 是如何与 Kafka 工作来确保将 Kafka Topic 中的消息以 Exactly-Once 语义处理。

    smartsi
  • Flink 状态管理和容错机制介绍

    计算任务的结果不仅仅依赖于输入,还依赖于它的当前状态,其实大多数的计算都是有状态的计算。比如wordcount,给一些word,其计算它的count,这是一个很...

    smartsi
  • 昨天面试别人说他熟悉Flink,结果我问了他Flink是如何实现exactly-once语义的?

    什么是状态呢?比如我们在平时的开发中,需要对数据进行count,sum,max等操作,这些中间的结果(即是状态)是需要保存的,因为要不断的更新,这些值或者变量就...

    王知无-import_bigdata
  • Flink 1.11 新特性详解:【非对齐】Unaligned Checkpoint 优化高反压

    问题导读 1.Barrier 对齐会造成什么问题? 2.Barrier 对齐是否会造成反压? 3.如何理解Unaligned Checkpoint ? 作为...

    zhisheng
  • 图解 Flink Checkpoint 原理及在 1.11 版本的优化

    上次发文,提到了 Flink 可以非常高效的进行有状态流的计算,通过使用 Flink 内置的 Keyed State 和 Operator State,保存每个...

    kk大数据
  • Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计...

    Java帮帮
  • 一文搞懂 Flink 的 Exactly Once 和 At Least Once

    An Overview of End-to-End Exactly-Once Processing in Apache Flink (with Apache K...

    zhisheng
  • 端到端Exactly-Once是分布式系统最大挑战?Flink是如何解决的?

    某条数据投递到某个流处理系统后,该系统对这条数据只处理一次,提供Exactly-Once的保障是一种理想的情况。如果系统不出任何故障,那简直堪称完美。然而现实世...

    PP鲁
  • Flink Exactly-Once 投递实现浅析

    随着近来越来越多的业务迁移到 Flink 上,对 Flink 作业的准确性要求也随之进一步提高,其中最为关键的是如何在不同业务场景下保证 exactly-onc...

    王知无-import_bigdata
  • 原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

    阿里巴巴技术专家,四川大学硕士,2010年毕业后加入阿里搜索事业部,从事搜索离线平台的研发工作,参与将搜索后台数据处理架构从MapReduce到Flink的重构...

    zhisheng

扫码关注云+社区

领取腾讯云代金券