前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink之状态编程

Flink之状态编程

作者头像
丁D
发布2023-10-20 08:30:01
2470
发布2023-10-20 08:30:01
举报
文章被收录于专栏:老铁丁D老铁丁D

摘要本文将从状态的概念入手,详细介绍 Flink 中的状态分类、状态的使用、持久化及状态后端的配置。

一、Flink状态概念

Flink的处理机制核心:有状态的流式计算,那么什么是有状态,什么是无状态呢?

在流式处理中,数据是连续不断的到来和处理的,每个任务在计算的时候,可以基于当前数据直接转换就能得到结果如map,filter(无状态), 也可以是依赖上一个数据才能得到结果,这个时候我们就需要将上一个结果记录下来如sum(有状态)。

下面的几个场景都需要使用流处理的状态功能: 1、数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。 2、检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。 3、对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。

有状态的算子处理流程如下: 1、接收到上游数据 2、通过上下文获取当前状态 3、根据业务逻辑计算,更新状态 4、将处理结果输出给下游

Flink的算子任务,可以设置并行度,从而在不同的slot运行多个实例,我们把这个实例叫成“并行子任务”或者“算子子任务”。

二、状态分类

1、托管状态(推荐):由flink统一管理 存储、故障恢复、重组等 2、原始状态: 需要我们自定义,一般不用除非托管搞不定

重点介绍托管状态 我们知道 Flink一个算子任务,可以分为多个并行子任务,分配在不同的任务槽(task slot)中运行,而这些slot的计算资源是物理隔离的, 所以flink管理的的状态是在不同的并行子任务是无法共享的,基于这个想法我们可以将状态分为 算子状态和按键状态

算子状态:状态的作用在一个并行子任务,也就是一个算子子任务,所有这个子任务处理的数据共享一个状态 按键状态:我们的流可以根据keyby进行分组成keyedStream,这个时候同一个key共享一个状态

值得注意的是无论是keyed state还是operator state,他们都是在本地实例上进行维护的,也就是说每一个并行子任务维护着对应的状态 算子子任务之间的状态并不能共享。

算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。

三、状态数据结构

按键状态数据结构分为5种: 1、值状态(ValueState) 2、列表状态(ListState) 3、映射状态(MapState) 4、归约状态(ReducingState) 5、聚合状态(AggregatingState)

算子状态数据结构分为3种 1、列表状态(ListState) 2、联合列表状态(UnionListState) 3、广播状态(BroadcastState): 有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。状态描述器中最重要的内容,就是状态的名称(name)和类型(type)。

代码语言:javascript
复制
public void open(Configuration parameters) throws Exception {
lastTemperatureValueState = getRuntimeContext()
.getState(new ValueStateDescriptor("last-temp", Double.class));
}

一般来说我们在生命周期方法.open()中获取状态对象。但这个变量不应该在 open 中声明——应该在外面直接把它定义为类的属性, 这样就可以在不同的方法中通用了。而在外部又不能直接获取状态,因为编译时是无法拿到运行时上下文的。 所以最终的解决方案就变成了:在外部声明状态对象,在 open 生命周期方法中通过运行时上下文获取状态。

四、状态具体使用demo

代码语言:javascript
复制
import dto.SensorReadingDTO;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import util.DateUtil;
//如果传感器的温度差大于10度就预警
//使用状态记录上一次的状态
public class Status_1_KeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 10008);
DataStream<SensorReadingDTO> dataStream = streamSource
.map(new MapFunction<String, SensorReadingDTO>() {
@Override
public SensorReadingDTO map(String input) throws Exception {
if (StringUtils.isNotBlank(input)) {
String[] infoArray = input.split(",");
SensorReadingDTO sensorReadingDTO = new SensorReadingDTO();
sensorReadingDTO.setId(infoArray[0]);
sensorReadingDTO.setTimestamp(Long.valueOf(infoArray[1]) * 1000);
sensorReadingDTO.setTemperature(Double.valueOf(infoArray[2]));
sensorReadingDTO.setTimestampStr(DateUtil.format(sensorReadingDTO.getTimestamp()));
return sensorReadingDTO;
}
return null;
}
});
//使用flatMap 可以输出0,1个或多个,没有超过10度的,就不要输出
//但是使用map 只能输入1个,必须输出一个,所以不符合
DataStream<Tuple3<String,Double,Double>> checkDataStream = dataStream.keyBy(SensorReadingDTO::getId)
.flatMap(new MyMapper(Double.valueOf(10)));
checkDataStream.print();
env.execute();
}
public static class MyMapper extends RichFlatMapFunction<SensorReadingDTO, Tuple3<String,Double,Double>> {
private ValueState<Double> lastTemperatureValueState;
private final Double threshold;
public MyMapper(Double threshold) {
this.threshold = threshold;
}
@Override
public void open(Configuration parameters) throws Exception {
lastTemperatureValueState = getRuntimeContext()
.getState(new ValueStateDescriptor("last-temp", Double.class));
}
@Override
public void close() throws Exception {
//释放
lastTemperatureValueState.clear();
}
@Override
public void flatMap(SensorReadingDTO sensorReadingDTO, Collector<Tuple3<String,Double,Double>> out) throws Exception {
//第一次 为空记录当前温度
Double lastTemp = lastTemperatureValueState.value();
Double curTemp = sensorReadingDTO.getTemperature();
// 如果不为空,判断是否温差超过阈值,超过则报警
if (lastTemp != null) {
if (Math.abs(curTemp - lastTemp) >= threshold) {
out.collect(new Tuple3<>(sensorReadingDTO.getId(), lastTemp, curTemp));
}
}
// 更新保存的"上一次温度"
lastTemperatureValueState.update(curTemp);
}
}
}

五、状态后端

1、MemoryStateBackend 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,而将checkpoint存储在JobManager的内存中 特点:快速、低延迟,但不稳定 2、FsStateBackend(默认) 将checkpoint存到远程的持久化文件系统(FileSystem)上,而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上 同时拥有内存级的本地访问速度,和更好的容错保证 3、RocksDBStateBackend 将所有状态序列化后,存入本地的RocksDB中存储

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-06-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Flink状态概念
  • 二、状态分类
  • 三、状态数据结构
  • 四、状态具体使用demo
  • 五、状态后端
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档