大家好,我是百思不得小赵。
创作时间:2022 年 5 月 25 日 博客主页: 🔍点此进入博客主页 —— 新时代的农民工 🙊 —— 换一种思维逻辑去看待这个世界 👀
“时间”在我们日常的开发学习过程中是特别常见的一个名词,例如:Java中的日期处理类、获取系统的当前时间、毫秒级的时间戳等等。接下来让我们来看看在Flink框架中,对时间不同的概念。Flink框架中有三个时间的语义:事件时间(Event Time )、摄入时间(Ingestion Time)、系统处理时间(Processing Time)。
在Flink流处理真实场景中,大部分的业务需求都会使用事件时间语义,但还是以具体的业务需求择选不同的时间语义。Flink引入EventTime方式如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置全局事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子.
在Flink数据处理过程中,数据从产生到计算到输出结果,是需要一个过程时间,在正常的情况下数据往往都是按照事件产生的时间顺序进行的,由于网络、分布式部署等原因会导致数据产生乱序问题,相当于Flink接收到的数据的先后顺序不是按照时间的事件时间顺序排列进行的。乱序数据会让窗口计算不准确.。如何避免这个问题呢?
如果一个数据的时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
Watermark概念
Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定 eventTime小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。
当 Flink 接收到数据时,会按照一定的规则去生成 Watermark,这条 Watermark就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是基于数据携带的时间戳生成的,一旦 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 event time 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。
举个栗子:如下图为一个乱序的 数据,将Watermark设置为2
如上图,将最大的延时时间设置为2秒,所以时间戳为 7s 的事件对应的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1是 1s~5s,窗口 2 是 6s~10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。
Watermark的特点
Watermark的使用
对于排序好的数据,不需要延迟触发,只需要指定时间戳即可。
代码如下:
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置全局事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
})
// 升序数据设置事件时间和watermark
// 无需设置延时时间 把当前的getTimestamp提取出当成当前的事件时间
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
@Override
public long extractAscendingTimestamp(SensorReading element) {
return element.getTimestamp() * 1000;
}
});
// 基于事件时间的开窗聚合 统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.minBy("temperature");
minTempStream.print("minTemp");
env.execute();
}
}
处理乱序数据需要制定对应的延时时间。
代码如下:
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置全局事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
})
// 乱序数据设置事件时间和watermark
// 需要设置延时时间
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000;
}
});
// 基于事件时间的开窗聚合 统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.minBy("temperature");
minTempStream.print("minTemp");
env.execute();
}
}
TimestampAssigner
Flink日供了TimestampAssigner接口,我们可以自定义去实现从数据中抽取时间戳的规则以及生成Watermark的规则。有如下两种类型:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100);
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置全局事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
})
// 乱序数据设置事件时间和watermark
// 需要设置延时时间
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000;
}
});
OutputTag<SensorReading> tag = new OutputTag<>("late");
// 基于事件时间的开窗聚合 统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
// 设置窗口延时
.allowedLateness(Time.seconds(1))
// 将窗口关闭后的数据输出到侧流
.sideOutputLateData(tag)
.minBy("temperature");
minTempStream.print("minTemp");
minTempStream.getSideOutput(tag).print("late");
env.execute();
}
}