前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink水印不能触发窗口计算问题详解

Flink水印不能触发窗口计算问题详解

作者头像
大数据真好玩
发布2022-04-27 09:20:30
1.5K0
发布2022-04-27 09:20:30
举报
文章被收录于专栏:暴走大数据暴走大数据

文章目录

前言 1.watermark特点 2.窗口 触发的条件 窗口的划分 窗口及水印触发的解释 3.代码 4.测试数据源 5.遇到的问题 6.问题排查 7.问题解决

前言

先说下水印的基本概念,对后面理解有帮助

1.watermark特点

  1. watermark并不是event的一个属性, 而是一条特殊的数据记录(只用来触发窗口结束,不参与数据计算)
  2. watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退(就算延迟数据到来了,比目前的水印小,他也不会倒退了,而是直接不赋值,return掉)

在org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.emitWatermark方法中

  1. watermark 与数据的时间戳相关

2.窗口

触发的条件

(1)在[window_start_time,window_end_time)窗口中有数据存在

(2)watermark时间 >= window_end_time;

窗口的划分

代码语言:javascript
复制
窗口的划分是基于来的eventtime所在的那一分钟,吧0-60s划分成窗口大小的等分
如果Window大小是3秒,那么1分钟内会把Window划分为如下的形式:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
[00:00:03,00:00:09)
...
[00:00:57,00:00:60)

窗口及水印触发的解释

假如我们设置10s的时间窗口(window),那么010s,1020s都是一个窗口,以0~10s为例,0位start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9©,13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒。当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算;当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算;当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算;当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算;触发计算的时候,会将AC(因为他们都小于10)都计算进去。通过上面这种方式,我们就将迟到的C计算进去了。这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算的,加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。

3.代码

代码语言:javascript
复制
   public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置使用事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //周期性的生成 watermark:系统会周期性的将 watermark 插入到流中 默认周期是200毫秒
        env.getConfig().setAutoWatermarkInterval(1000L);
        //设置并行度
        env.setParallelism(1);
        //2.获取数据
        DataStreamSource<String> streamSource = env.socketTextStream("linux121", 7777);
        //3.逻辑代码
        SingleOutputStreamOperator<Tuple2<String, Long>> maped = streamSource.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] split = value.split(",");
                return new Tuple2<String, Long>(split[0], Long.valueOf(split[1]));
            }
        });

        //定义水印
        SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = maped.assignTimestampsAndWatermarks(new WatermarkStrategy<Tuple2<String, Long>>() {
            @Override
            public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Tuple2<String, Long>>() {
                    private long maxTimeStamp = Long.MIN_VALUE;

                    @Override
                    public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
                        maxTimeStamp = Math.max(maxTimeStamp, event.f1);
                        System.out.println("maxTimeStamp:" + maxTimeStamp + "...format:" + sdf.format(maxTimeStamp));
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                      //  System.out.println(".....onPeriodicEmit....");
                        long maxOutOfOrderness = 3000l;
                        System.out.println(maxTimeStamp - maxOutOfOrderness);
                        output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
                    }
                };
            }
        }.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));
        //分组
        KeyedStream<Tuple2<String, Long>, String> keyedStream = watermarks.keyBy(value -> value.f0);
        //定义窗口
        WindowedStream<Tuple2<String, Long>, String, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(4));
        // apply是窗口的应用函数,即apply里的函数将应用在此窗口的数据上。进去的是基于key并且窗口的(就是相同的key,在规定的窗口时间进去的数据)
        SingleOutputStreamOperator<Object> result = timeWindow.apply(new WindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {
            /**
             *  s     输入的原始数据
             *  window 窗口、环境相关的数据
             *  input 计算后的拥有相同key的数据集合(keyBy后的keyedStream数据)
             *  out 发送器
             */
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Object> out) throws Exception {
                System.out.println("..." + sdf.format(window.getStart()));
                String key = s;
                Iterator<Tuple2<String, Long>> iterator = input.iterator();
                ArrayList<Long> list = new ArrayList<>();
                while (iterator.hasNext()) {
                    Tuple2<String, Long> next = iterator.next();
                    list.add(next.f1);
                }
                Collections.sort(list);
                String result = "key:" + key + "..." + "list.size:" + list.size() + "...list.first:" + sdf.format(list.get(0)) + "...list.last:" + sdf.format(list.get(list.size() - 1)) + "...window.start:" + sdf.format(window.getStart()) + "..window.end:" + sdf.format(window.getEnd());
                out.collect(result);
            }
        });
        result.print();
        env.execute();


    }

4.测试数据源

代码语言:javascript
复制
数据源:
01,1586489566000
01,1586489567000
01,1586489568000
01,1586489569000
01,1586489570000
01,1586489571000
01,1586489572000
01,1586489573000

对应的时间:
2020-04-10 11:32:46
2020-04-10 11:32:47
2020-04-10 11:32:48
2020-04-10 11:32:49
2020-04-10 11:32:50

打开对应服务器  nc -lp 7777

5.遇到的问题

按道理 窗口为4s,允许最大延迟是3s,那么当第一条数据01,1586489566000–2020-04-10 11:32:46进来后的窗口是 44s-48s。然后当01,1586489571000–2020-04-10 11:32:51 进来后,水印时间11:32:48>=48s的结束窗口,那么窗口应该结束,进行计算的。但是有时候测试并不会触发。

6.问题排查

后面断点调试发现代码有问题,在我们初始化水印时间的时候使用了private long maxTimeStamp = Long.MIN_VALUE; 当没数据来的时候,代码运行到这里给他赋值,然后他在减去允许迟到的时间,这时候该值就变成了 9223372036854772808 导致在emitWatermark 吧该值赋值给了currentWatermark,所以后面再来数据的的时候,数据的eventtime的水印时间都要比这个小,所以都不会触发窗口了。

所以就出现了当代码初始化还没运行到下面的的时候就来了数据的话,就能正常结束,当数据比代码运行到此处来的慢的时候就一直不会触发窗口结束。

7.问题解决

代码语言:javascript
复制
将 
private long maxTimeStamp = Long.MIN_VALUE;
替换成
private long maxTimeStamp = 0l;
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-04-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.watermark特点
  • 2.窗口
  • 3.代码
  • 4.测试数据源
  • 5.遇到的问题
  • 6.问题排查
  • 7.问题解决
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档