前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink】 WaterMark 详解

【Flink】 WaterMark 详解

作者头像
857技术社区
发布2022-05-17 15:45:59
1.2K0
发布2022-05-17 15:45:59
举报
文章被收录于专栏:857-Bigdata

概念

「流处理」

流处理,最本质的是在处理数据的时候,接受一条处理一条数据。

批处理,则是累积数据到一定程度在处理。这是他们本质的区别。

在设计上 Flink 认为数据是流式的,批处理只是流处理的特例。同时对数据分为有界数据和无界数据。

  • 有界数据对应批处理,API 对应 Dateset。
  • 无界数据对应流处理,API 对应 DataStream。

「乱序(out-of-order)」

什么是乱序呢?

可以理解为数据到达的顺序和其实际产生时间的排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的。虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order 或者说 late element)。

❝例如: 某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有 5 秒的延时,也就是在实际时间的第 1 秒产生的数据有可能在第 5 秒中产生的数据之后到来(比如到 Window 处理节点)。 有 1~10 个事件。 乱序到达的序列是:2,3,4,5,1,6,3,8,9,10,7 ❞

Flink 窗口

对于 Flink,如果来一条消息计算一条,这样是可以的,但是这样计算是非常频繁而且消耗资源,如果想做一些统计这是不可能的。所以对于 Spark 和 Flink 都产生了窗口计算。

比如 是因为我们想看到过去一分钟,过去半小时的访问数据,这时候我们就需要窗口。

Window:Window 是处理无界流的关键,Windows 将流拆分为一个个有限大小的 buckets,可以可以在每一个 buckets 中进行计算。

start_time,end_time:当 Window 时时间窗口的时候,每个 window 都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间。

「窗口生命周期」

简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

窗口有如下组件:

  • Window Assigner:用来决定某个元素被分配到哪个/哪些窗口中去。
  • Trigger:触发器。决定了一个窗口何时能够被计算或清除。触发策略可能类似于“当窗口中的元素数量大于 4”时,或“当水位线通过窗口结束时”。
  • Evictor:它可以在 触发器触发后 & 应用函数之前和/或之后 从窗口中删除元素。

窗口还拥有函数,比如 ProcessWindowFunction,ReduceFunction,AggregateFunction 或 FoldFunction。该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。

「Keyed vs Non-Keyed Windows」

在定义窗口之前,要指定的第一件事是流是否需要 Keyed,使用 keyBy(...)将无界流分成逻辑的 keyed stream。如果未调用 keyBy(...),则表示流不是 keyed stream。

  • 对于 Keyed 流,可以将传入事件的任何属性用作 key。拥有 Keyed stream 将允许窗口计算由多个任务并行执行,因为每个逻辑 Keyed 流可以独立于其余任务进行处理。相同 Key 的所有元素将被发送到同一个任务。
  • 在 Non-Keyed 流的情况下,原始流将不会被分成多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行性为 1。

「窗口分类」

窗口分类可以分成:滚动窗口(Tumbling Window,无重叠),滑动窗口(Sliding Window,有重叠),和会话窗口,(Session Window,活动间隙)

滚动窗口

滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为 5 分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。

滑动窗口

滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。

滑动窗口分配器将每个元素分配给固定窗口大小的窗口。类似于滚动窗口分配器,窗口的大小由窗口大小参数配置。另外一个窗口滑动参数控制滑动窗口的启动频率(how frequently a sliding window is started)。因此,如果滑动大小小于窗口大小,滑动窗可以重叠。在这种情况下,元素被分配到多个窗口。

例如,你可以使用窗口大小为 10 分钟的窗口,滑动大小为 5 分钟。这样,每 5 分钟会生成一个窗口,包含最后 10 分钟内到达的事件。

会话窗口

会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。

例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。

「Flink 中的时间」

Flink 在流处理程序支持不同的时间概念。分别为 Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间。

从时间序列角度来说,发生的先后顺序是:

代码语言:javascript
复制
事件时间(Event Time)--> 提取时间(Ingestion Time)--> 处理时间(Processing Time)
  • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入 Apache Flink 流处理系统的时间,也就是 Flink 读取数据源时间。(Source)
  • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是 Flink 程序处理该事件时当前系统时间。

Watermark

Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark 是用于处理乱序事件或延迟数据的,这通常用 watermark 机制结合 window 来实现(Watermarks 用来触发 window 窗口计算)。

「实战一」

代码语言:javascript
复制
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3000; // 3.0 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        // 生成 watermark
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

效果解析:

图中是一个 10s 大小的窗口,10000 ~ 20000 为一个窗口。当 eventTime 为 23000 的数据到来,生成的 watermark 的时间戳为 20000,>= window_end_time,会触发窗口计算。

「实战二」

代码语言:javascript
复制
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxTimeLag = 3000; // 3 seconds

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current time minus the maximum time lag
        return new Watermark(System.currentTimeMillis() - maxTimeLag);
    }
}

效果解析:

只是简单的用当前系统时间减去最大延迟时间生成 Watermark ,当 Watermark 为 20000 时,>= 窗口的结束时间,会触发 10000 ~ 20000 窗口计算。再当 eventTime 为 19500 的数据到来,它本应该是属于窗口 10000 ~ 20000 窗口的,但这个窗口已经触发计算了,所以此数据会被丢弃。

「如何设置最大乱序时间」

虽说水位线表明着早于它的事件不应该再出现,,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有 3 种:

重新激活已经关闭的窗口并重新计算以修正结果。将迟到事件收集起来另外处理。将迟到事件视为错误消息并丢弃。Flink 默认的处理方式是第 3 种直接丢弃,其他两种方式分别使用 Side Output 和 Allowed Lateness。

Side Output 机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

Allowed Lateness 机制允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

这里总结机制为:

  • 窗口 window 的作用是为了周期性的获取数据。
  • watermark 的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess 是将窗口关闭时间再延迟一段时间。
  • sideOutPut 是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

「实战」

代码语言:javascript
复制
public class TumblingEventWindowExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
//        env.getConfig().setAutoWatermarkInterval(100);
        DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
        DataStream<Tuple2<String, Long>> resultStream = socketStream
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(String element) {
                        long eventTime = Long.parseLong(element.split(" ")[0]);
                        System.out.println(eventTime);
                        return eventTime;
                    }
                })
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return Tuple2.of(value.split(" ")[1], 1L);
                    }
                })
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 允许延迟处理2秒
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                    }
                });
        resultStream.print();
        env.execute();
    }
}

效果解析:

当 watermark 为 21000 时,触发了[10000, 20000)窗口计算,由于设置了allowedLateness(Time.seconds(2))即允许两秒延迟处理,watermark < window_end_time + lateTime公式得到满足,因此随后 10000 和 12000 进入窗口时,依然能触发窗口计算;随后 watermark 增加到 22000,watermark < window_end_time + lateTime 不再满足,因此 11000 再次进入窗口时,窗口不再进行计算

「延迟数据重定向」

❝流的返回值必须是 SingleOutputStreamOperator,其是 DataStream 的子类。通过 getSideOutput 方法获取延迟数据。可以将延迟数据重定向到其他流或者进行输出。 ❞

代码语言:javascript
复制
public class TumblingEventWindowExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
        //保存被丢弃的数据
        OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
        //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
        SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = socketStream
                // Time.seconds(3)有序的情况修改为0
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(String element) {
                        long eventTime = Long.parseLong(element.split(" ")[0]);
                        System.out.println(eventTime);
                        return eventTime;
                    }
                })
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return Tuple2.of(value.split(" ")[1], 1L);
                    }
                })
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .sideOutputLateData(outputTag) // 收集延迟大于2s的数据
                .allowedLateness(Time.seconds(2)) //允许2s延迟
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                    }
                });
        resultStream.print();
        //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
        DataStream<Tuple2<String, Long>> sideOutput = resultStream.getSideOutput(outputTag);
        sideOutput.print();
        env.execute();
    }
}

Flink WaterMark 常见面试问题

  • Flink 流处理应用中,常见的处理需求/应对方案是什么?
  • Watermark 本质是什么?
  • Watermark 是如何解决问题?
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概念
  • Flink 窗口
  • Watermark
  • Flink WaterMark 常见面试问题
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档