前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink之Watermark实践

Flink之Watermark实践

作者头像
actionzhang
发布2022-11-30 17:11:09
3960
发布2022-11-30 17:11:09
举报

我们知道实时计算中,数据时间比较敏感,有eventTime和processTime区分,一般来说eventTime是从原始的消息中提取过来的,processTime是Flink自己提供的,Flink中一个亮点就是可以基于eventTime计算,这个功能很有用,因为实时数据可能会经过比较长的链路,多少会有延时,并且有很大的不确定性,对于一些需要精确体现事件变化趋势的场景中,单纯使用processTime显然是不合理的。

       在Flink中基于eventTime计算,需要注意两点,首先要设置数据流的时间特征,下面的代码的意思是基于eventTime处理数据,

代码语言:javascript
复制
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

其次,需要提取eventTime和设置WaterMark,因为数据格式不相同,设置warterMark的方式也有多种建议大家参考官网(https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/event_timestamps_watermarks.html), 下面我们具体分析一下,eventTime结合Watermark的工作方式。

window划分

window是flink中划分数据一个基本单位,window的划分方式是固定的,默认会根据自然时间划分window,并且划分方式是前闭后开,如下表格是window为10s和1分钟的划分方式,从00:00:00点开始划分3个window。统一的窗口划分方式,方便决定数据到底归属于哪个window,比如数据1时间是00:00:01.345 这个数据点在window=10s的时候,归属于w1,window= 60s时,归属于w1,而数据2时间是00:01:02.152,window=10s归属于w7,window = 60s时归属于w2. 如果这个时候,时间特征是eventTime,那么就会基于从原始数据提取到的eventTime,将eventTime划分到不同window中,同样适用于Ingestion Time   和 process Time。

window划分

w1

w2

w3

10s

[00:00:00~00:00:10)

[00:00:10~00:00:20)

[00:00:20~00:00:30)

60s

[00:00:00~00:01:00)

[00:01:00~00:02:00)

[00:03:00~00:03:00)

划分了window之后,触发window的计算,就可以得到这个window中的聚合结果了,其实基于eventTime和基于processTime计算最大的不同点就是在触发window的计算实际上不相同,通常数据流基于processTime,在window的endTime等于当前时间的时候就会触发计算,而eventTime因为数据有可能是乱序的,所以需要watermark的协助,完成window计算的触发。

Watermark 

提取WaterMark的方式两类,一类是定时提取watermark,对应AssignerWithPeriodicWatermarks,这种方式会定时提取更新wartermark,另一类伴随event的到来就提取watermark,就是每一个event到来的时候,就会提取一次Watermark,对应AssignerWithPunctuatedWatermarks,这样的方式当然设置watermark更为精准,但是当数据量大的时候,频繁的更新wartermark会比较影响性能。通常情况下采用定时提取就足够了。需要注意的是watermark的提取工作在taskManager中完成,意味着这项工作是并行进行的的,而watermark是一个全局的概念,就是一个整个Flink作业之后一个warkermark。那么warkermark一般是怎么提取呢,这里引用官网的两个例子来说明。在第一个例子中extractTimestamp方法,在每一个event到来之后就会被调用,这里其实就是为了设置watermark的值,关键代码在于Math.max(timestamp,currentMaxTimestamp),意思是在当前的水位和当前事件时间中选取一个较大值,来让watermark流动。为什么要选取最大值,因为理想状态下,消息的事件时间肯定是递增的,实际处理中,消息乱序是大概率事件,所以为了保证watermark递增,要取最大值。而getCurrentWatermarker会被定时调用,可以看到方法中减了一个常量,这个原因在下面阐述。就这样,不断从eventTime中提取并更新watermark。第二个例子,并没有在提取eventTime的时候更新watermark的值,而是直接取系统当前时间减去一个常量,作为新的watermark。

代码语言:javascript
复制
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}
代码语言:javascript
复制
/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

	private final long maxTimeLag = 5000; // 5 seconds

	@Override
	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
		return element.getCreationTime();
	}

	@Override
	public Watermark getCurrentWatermark() {
		// return the watermark as current time minus the maximum time lag
		return new Watermark(System.currentTimeMillis() - maxTimeLag);
	}
}

可以上面两种代码中,提取watermark的时候都要减去一个常量,为了理解这么做的原因,需要了解,watermark的工作方式,上文提到在基于eventTime的计算中,需要watermark的协助来触发window的计算,触发规则是watermark大于等于window的结束时间,并且这个窗口中有数据的时候,就会触发window计算。 举个例子说明其工作方式,当前window为10s,设想理想情况下消息都没有延迟,那么eventTime等于系统当前时间,假如设置watermark等于eventTIme的时候,当watermark = 00:00:10的时候,就会触发w1的计算,这个时后因为消息都没有延迟,watermark之前的消息(00:00:00~00:00:10)都已经落入到window中,所以会计算window中全量的数据。那么假如有一条消息data1,eventTime是00:00:01 应该属于w1,在00:00:11才到达,因为假设消息没有延迟,那么watermark等于当前时间,00:00:11,这个时候w1已经计算完毕,那么这条消息就会被丢弃,没有加入计算,这样就会出现问题。这是已经可以理解,代码中为什么要减去一个常量作为watermark,假设每次提取eventTime的时后,减去2s,那么当data1在00:00:11到达的时候,watermark才是00:00:09这个时候,w1还没有触发计算,那么data1会被加入w1,这个时候计算完全没有问题,所以减去一个常量是为了对延时的消息进行容错的。

实践中遇到问题

在实际的工作中,会遇到各种各样的数据,最近在工作中遇到一类数据,需求是基于eventTime,按照原始数据中某几个key做keyby操作,然后window为1分钟,对指标做sum。按照一般的做法我选取了类似官网中的第一种生成watermark的方式来处理数据,同时也考虑到延时,看了部分数据的延时,又咨询的业务方,确定了一个延时时间,但在验数(通过离线明细验一天的数)的时候发现,有几个key明显统计小了,经过分析之后,得出结论这些key的数据严重延迟,倒是整体数据严重乱序,watermark设置的太浅了,开始设置延时为10秒,数据中,我发现有一些数据的延时很小只有1s,而这些key的延时竟然达到了60s。因为我是根据eventTime结合延时常量去更新watermark,那些延时很小的key的数据将watermark来到最新,导致延时大的key可能数据刚到,不到10s,watermark已经到达window的end time,直接触发了这个window的计算,导致这些延时太大的key,在window中丢失很多数据。

结论

对于严重乱序的数据,需要严格统计数据最大延迟时间,才能保证计算的数据准确,延时设置太小会影响数据准确性,延时设置太大不经影响数据的实时性,更加会加重Flink作业的负担,不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • window划分
  • Watermark 
  • 实践中遇到问题
  • 结论
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档