前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink之处理函数

Flink之处理函数

作者头像
丁D
发布2023-10-20 08:26:55
1930
发布2023-10-20 08:26:55
举报
文章被收录于专栏:老铁丁D

摘要处理函数(ProcessFunction)了。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

在我们之前学习的API,不管事聚合、转换或者开窗操作都是基于DataStream进行操作的,我们统称DataSream API. 但是我们知道这些API无法访问时间戳或者当前事件的事件时间。 因此Flink还提供了更低层API让我们直面数据流的基本元素:数据事件、状态、及时间让我们对流有完全的控制权,我们称这一层接口叫“处理函数”(ProcessFunction)

处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。 同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。 用法:stream.process(new MyProcessFunction()) 调用process方法传入一个 ProcessFunction 作为参数,用来定义处理逻辑。

Flink提供了8个不同的处理函数: ProcessFunction KeyedProcessFunction ProcessWindowFunction ProcessAllWindowFunction CoProcessFunction ProcessJoinFunction BroadcastProcessFunction KeyedBroadcastProcessFunction

注意:注意定时器timer 只能在keyed streams 上面使用。

处理函数提供了两个方法: 抽象方法:public abstract void processElement(I value, Context ctx, Collector<O> out)每个元素进来都会调用一次 value输入的值,ctx上下文可以获取时间用来注册定时器,out用来输出 非抽象方法:public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)timestamp时间戳,触发的时间如果是事件语义就是水位线

TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。 这样一来,我们在代码中就方便了很多,可以肆无忌惮地对一个key注册定时器,而不用担心重复定义——因为一个时间戳上的定时器只会触发一次。 利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多1秒一次。 这里注意定时器的时间戳必须是毫秒数,所以我们得到整秒之后还要乘以1000。定时器默认的区分精度是毫秒。

下面是一个KeyedProcessFunction的案例:10s温度连续上升就预警

代码语言:javascript
复制
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction

K表示分组的类型

I表示输入的类型

O表示输出的类型

代码语言:javascript
复制
package _8processFunction;
import dto.SensorReadingDTO;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import util.DateUtil;
//10s温度连续上升就预警
public class ProcessFunction_1_KeyedProcessFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 10008);
DataStream<SensorReadingDTO> dataStream = streamSource
.map(new MapFunction<String, SensorReadingDTO>() {
@Override
public SensorReadingDTO map(String input) throws Exception {
if (StringUtils.isNotBlank(input)) {
String[] infoArray = input.split(",");
SensorReadingDTO sensorReadingDTO = new SensorReadingDTO();
sensorReadingDTO.setId(infoArray[0]);
sensorReadingDTO.setTimestamp(Long.valueOf(infoArray[1]) * 1000);
sensorReadingDTO.setTemperature(Double.valueOf(infoArray[2]));
sensorReadingDTO.setTimestampStr(DateUtil.format(sensorReadingDTO.getTimestamp()));
return sensorReadingDTO;
}
return null;
}
});
dataStream.keyBy(SensorReadingDTO::getId).process(new MyProcess(10000)).print();
env.execute();
}
public static class MyProcess extends KeyedProcessFunction<String, SensorReadingDTO, String> {
private Integer interval;
private ValueState<Double> tempValueState;
private ValueState<Long> timerTimestampValueState;//只是为了清除定时器的时候用
public MyProcess(Integer interval) {
this.interval = interval;
}
@Override
public void processElement(SensorReadingDTO sensorReadingDTO, Context context,
Collector<String> collector) throws Exception {
Double curTemp = sensorReadingDTO.getTemperature();
double lastTemp = tempValueState.value() != null ? tempValueState.value() : curTemp;
if (lastTemp > curTemp) {
//温度出现下降 重新计算,所以删除定时器(但是温度还要设置)
context.timerService().deleteProcessingTimeTimer(timerTimestampValueState.value());
timerTimestampValueState.clear();
} else if (lastTemp <= curTemp && timerTimestampValueState.value() == null) {
//温度上升,没有定时器要注册 如 温度 10 20 30 20的时候是不用注册定时器的
long warningTimestamp = context.timerService().currentProcessingTime() + interval;
context.timerService().registerProcessingTimeTimer(warningTimestamp);
timerTimestampValueState.update(warningTimestamp);
}
tempValueState.update(curTemp);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out)
throws Exception {
out.collect("传感器" + ctx.getCurrentKey() + "温度值连续" + interval + "ms上升");
timerTimestampValueState.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
tempValueState = getRuntimeContext()
.getState(new ValueStateDescriptor("last-temp", Double.class));
timerTimestampValueState = getRuntimeContext()
.getState(new ValueStateDescriptor("timer_timestamp", Long.class));
}
@Override
public void close() throws Exception {
tempValueState.clear();
timerTimestampValueState.clear();
}
}
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-06-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档