前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink中案例学习--State与CheckPoint

Flink中案例学习--State与CheckPoint

作者头像
小勇DW3
发布2019-12-12 15:13:02
5510
发布2019-12-12 15:13:02
举报
文章被收录于专栏:小勇DW3小勇DW3

一、State

在Flink中,按照基本类型,对State做了以下两类的划分:

Keyed State,和Key有关的状态类型,它只能被基于KeyedStream之上的操作,方法所使用。我们可以从逻辑上理解这种状态是一个并行度操作实例和一种Key的对应, <parallel-operator-instance, key>。 Operator State(或者non-keyed state),它是和Key无关的一种状态类型。相应地我们从逻辑上去理解这个概念,它相当于一个并行度实例,对应一份状态数据。因为这里没有涉及Key的概念,所以在并行度(扩/缩容)发生变化的时候,这里会有状态数据的重分布的处理。如下图:

Keyed State 应用示例:

代码示例:

代码语言:javascript
复制
public class StateManager extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * 操作 state 的句柄
     * @param longLongTuple2
     * @param collector
     * @throws Exception
     */

    private transient ValueState<Tuple2<Long, Long>> sum;


    @Override
    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {

        //获取state值
        Tuple2<Long, Long> currentSum = sum.value();

        currentSum.f0 = currentSum.f0 + 1;
        currentSum.f1 = currentSum.f1 + value.f1;

        //操作state更新
        sum.update(currentSum);

        //输出flatMap的算子结果
        if(currentSum.f0 >= 2)
        {
            out.collect(new Tuple2<Long, Long>(value.f0, currentSum.f1/currentSum.f0));
        }

    }


    @Override
    public void open(Configuration parameters) throws Exception {

        ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>(
                "average",                                                      //状态的名称
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),      //状态的类型
                Tuple2.of(0L, 0L)                                               //状态的初始默认值
        );

        sum = getRuntimeContext().getState(descriptor);


    }


}

Operator State 应用示例:

二、checkpoint的应用示例

代码语言:javascript
复制
        //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
        env.enableCheckpointing(1000);
        // 高级选项:
        // 设置模式为exactly-once (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-12-11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档