前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Flink(二十八):Flink 容错机制 自动重启策略和恢复

2021年大数据Flink(二十八):Flink 容错机制 自动重启策略和恢复

作者头像
Lansonli
发布2021-10-09 17:41:38
2.1K0
发布2021-10-09 17:41:38
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

自动重启策略和恢复

重启策略配置方式

  • 配置文件中

在flink-conf.yml中可以进行配置,示例如下:

代码语言:javascript
复制
restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 3

restart-strategy.fixed-delay.delay: 10 s
  • 代码中

还可以在代码中针对该任务进行配置,示例如下:

代码语言:javascript
复制
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数

Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔

 ))

重启策略分类

  • 默认重启策略

如果配置了Checkpoint,而没有配置重启策略,那么代码中出现了非致命错误时,程序会无限重启

  • 无重启策略

 Job直接失败,不会尝试进行重启  设置方式1:flink-conf.yaml  restart-strategy: none  ​  设置方式2:  无重启策略也可以在程序中设置  val env = ExecutionEnvironment.getExecutionEnvironment()  env.setRestartStrategy(RestartStrategies.noRestart())

  • 固定延迟重启策略--开发中使用

 设置方式1:  重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:  例子:  restart-strategy: fixed-delay  restart-strategy.fixed-delay.attempts: 3  restart-strategy.fixed-delay.delay: 10 s  ​  设置方式2:  也可以在程序中设置:  val env = ExecutionEnvironment.getExecutionEnvironment()  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(    3, // 最多重启3次数    Time.of(10, TimeUnit.SECONDS) // 重启时间间隔  ))  上面的设置表示:如果job失败,重启3次, 每次间隔10

  • 失败率重启策略--开发偶尔使用

 设置方式1:  失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:  例子:  restart-strategy:failure-rate  restart-strategy.failure-rate.max-failures-per-interval: 3  restart-strategy.failure-rate.failure-rate-interval: 5 min  restart-strategy.failure-rate.delay: 10 s  ​  设置方式2:  失败率重启策略也可以在程序中设置:  val env = ExecutionEnvironment.getExecutionEnvironment()  env.setRestartStrategy(RestartStrategies.failureRateRestart(    3, // 每个测量时间间隔最大失败次数    Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔    Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔  ))  上面的设置表示:如果5分钟内job失败不超过三次,自动重启, 每次间隔10s (如果5分钟内程序失败超过3次,则程序退出)

​​​​​​​代码演示

代码语言:javascript
复制
package cn.it.checkpoint;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
 * Author lanson
 * Desc 演示Checkpoint+重启策略
 */
public class CheckpointDemo02_RestartStrategy {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //===========Checkpoint参数设置====
        //===========类型1:必须参数=============
        //设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!
        env.enableCheckpointing(1000);
        //设置State状态存储介质
        /*if(args.length > 0){
            env.setStateBackend(new FsStateBackend(args[0]));
        }else {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        }*/
        if(SystemUtils.IS_OS_WINDOWS){
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        }else{
            env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
        }
        //===========类型2:建议参数===========
        //设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
        //如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
        //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是  false不是
        //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
        //设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //===========类型3:直接使用默认的即可===============
        //设置checkpoint的执行模式为EXACTLY_ONCE(默认)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
        //设置同一时间有多少个checkpoint可以同时执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1

        //=============重启策略===========
        //-1.默认策略:配置了Checkpoint而没有配置重启策略默认使用无限重启
        //-2.配置无重启策略
        //env.setRestartStrategy(RestartStrategies.noRestart());
        //-3.固定延迟重启策略--开发中使用!
        //重启3次,每次间隔10s
        /*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, //尝试重启3次
                Time.of(10, TimeUnit.SECONDS))//每次重启间隔10s
        );*/
        //-4.失败率重启--偶尔使用
        //5分钟内重启3次(第3次不包括,也就是最多重启2次),每次间隔10s
        /*env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, // 每个测量时间间隔最大失败次数
                Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
                Time.of(10, TimeUnit.SECONDS) // 每次重启的时间间隔
        ));*/

        //上面的能看懂就行,开发中使用下面的代码即可
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.Transformation
        //3.1切割出每个单词并直接记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //value就是每一行
                String[] words = value.split(" ");
                for (String word : words) {
                    if(word.equals("bug")){
                        System.out.println("手动模拟的bug...");
                        throw new RuntimeException("手动模拟的bug...");
                    }
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        //注意:批处理的分组是groupBy,流处理的分组是keyBy
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.sink
        result.print();

        //5.execute
        env.execute();
    }
}

​​​​​​​手动重启并恢复-了解

1.把程序打包

2.启动Flink集群(本地单机版,集群版都可以)

/export/server/flink/bin/start-cluster.sh

3.访问webUI

http://node1:8081/#/overview

http://node2:8081/#/overview

4.使用FlinkWebUI提交

cn.checkpoint.CheckpointDemo01

5.取消任务

6.重新启动任务并指定从哪恢复

cn.itcast.checkpoint.CheckpointDemo01

hdfs://node1:8020/flink-checkpoint/checkpoint/9e8ce00dcd557dc03a678732f1552c3a/chk-34

7.关闭/取消任务

8.关闭集群

/export/server/flink/bin/stop-cluster.sh

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自动重启策略和恢复
    • 重启策略配置方式
      • 重启策略分类
        • ​​​​​​​代码演示
        • ​​​​​​​手动重启并恢复-了解
          • 1.把程序打包
            • 2.启动Flink集群(本地单机版,集群版都可以)
              • 3.访问webUI
                • 4.使用FlinkWebUI提交
                  • 5.取消任务
                    • 6.重新启动任务并指定从哪恢复
                      • 7.关闭/取消任务
                        • 8.关闭集群
                        相关产品与服务
                        大数据
                        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档