前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink-Checkpoint&State案例测试

Flink-Checkpoint&State案例测试

作者头像
火之高兴
发布2024-07-25 15:49:39
500
发布2024-07-25 15:49:39
举报
文章被收录于专栏:大数据应用技术

###测试环境 hadoop10伪分布式:flink hdfs

测试代码
代码语言:javascript
复制
package day160616;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CheckPointTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);



        //------------------------checkpoint-start-------------------

        env.enableCheckpointing(1000);  //开启checkpoint,每隔1s执行checkpoint操作
        //状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop10:8020/flink-checkpoint"));
        //取消作业时,不删除检查点目录
        env.getCheckpointConfig()
                .enableExternalizedCheckpoints(CheckpointConfig
                        .ExternalizedCheckpointCleanup
                        .RETAIN_ON_CANCELLATION);
        //通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
        //------------------------checkpoint-end-----------------------
        DataStreamSource<String> ds = env.socketTextStream("hadoop10", 9999);
//        将输入数据流转换为 Tuple2<String, Integer> 类型的流,其中第一个字段为城市名称,第二个字段为数字。
//        使用 map 函数将输入数据字符串解析为 Tuple2 对象,并按照城市名称进行分组。
        KeyedStream<Tuple2<String, Integer>, String> ds2 = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] arr = value.split(",");
                return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
            }
        }).keyBy(v -> v.f0);
//          在分组后的流上应用 map 函数,使用 RichMapFunction 实现对每个元素的处理逻辑
        ds2.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            //          在 open 方法中,初始化一个 ValueState<Integer> 状态变量,用于存储当前城市的最大值。
            ValueState<Integer> state;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<Integer> valueStateDescriptor
                        = new ValueStateDescriptor<Integer>("vs1", Integer.class);
                state = getRuntimeContext().getState(valueStateDescriptor);
            }

            /**
             * 城市名称,数字
             * 输入                   输出
             * 北京,20               北京,20
             * 北京,30               北京,30
             * 北京,10               北京,30
             */
            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                //1.获取状态中的值
                //2.状态中有值
                //  输入的值和状态的值进行比较大小,谁大返回谁
                //3.状态中没有值
                //  返回输入的值
                //4.更新状态

//                在 map 方法中,首先获取状态中存储的旧值。然后,根据以下条件更新最大值:
//                  如果状态中没有值或者输入值大于状态值,则将输入值设为最大值。
//                  如果状态中已经有值且输入值小于或等于状态值,则保持状态值不变
                Integer oldValue = state.value();

                if (oldValue == null || oldValue < value.f1) {
                    oldValue = value.f1;
                }
                state.update(oldValue);
                return Tuple2.of(value.f0, oldValue);//返回城市名称和最大值作为输出。
            }

        }).print();
//该代码实现了在流中根据城市名称进行分组,并计算每个城市的最大值。
// 在每个元素到达时,它会检查当前城市的最大值,并更新状态。
// 然后将城市名称和最大值作为输出打印出来。
        env.execute();
    }
}
测试流程

将代码打包,上传至hadoop10 /opt/app/flink,jar; 执行命令运行jar包: [root@hadoop10 app]# flink run -c day160616.CheckPointTest /opt/app/flink.jar Job has been submitted with JobID ee5811b41a5e8c5d7dd052ed78db14b4;

在这里插入图片描述
在这里插入图片描述

在webui界面查看运行任务;

在这里插入图片描述
在这里插入图片描述

代码运行逻辑为:9999端口输入城市,温度,代码将记录下当前城市的最高温度在stdout进行打印,如绿色框线示意。若下次的温度低于当前的最高温度,则继续输出曾经记录的最高温度。直到新的最高温度高于当前的最高温度,重新记录最高温度输出;

在这里插入图片描述
在这里插入图片描述

模拟取消任务,验证Checkpoint机制。现在取消该job,然后再重新运行[root@hadoop10 app]# flink run -c day160616.CheckPointTest /opt/app/flink.jar Job has been submitted with JobID 77861182fcb0c82677eabd40629a91ff

在这里插入图片描述
在这里插入图片描述

再次通过9999端口发送测试案例,发现state并没有将设置好的checkpoint镜像读出,而是又重新计算了当前城市的最大值。

在这里插入图片描述
在这里插入图片描述

关闭该作业,重新输入启动命令[root@hadoop10 app]# flink run -c day160616.CheckPointTest -s hdfs://hadoop10:8020/flink-checkpoint/77861182fcb0c82677eabd40629a91ff/chk-434 /opt/app/flink.jar Job has been submitted with JobID 6a5034e14fb103e594a266ca554e2558 ,注意红色框线部分的checkpoint恢复镜像位置以及指令的填写;

在这里插入图片描述
在这里插入图片描述

再次进入webui的任务管理界面,并且通过9999端口发送新的测试案例。本次checkpoint测试成功,恢复了从上一轮取消的作业中记录的城市状态,zhoukou和zhengzhou的最大值,由绿色框线示意。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 测试代码
  • 测试流程
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档