前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink中Watermark定时生成源码分析

Flink中Watermark定时生成源码分析

作者头像
Flink实战剖析
发布2022-04-18 13:19:14
6100
发布2022-04-18 13:19:14
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

watermark的生成策略有两种:一种是周期性生成,另外一种是根据特定标记生成。在实际使用中大多数情况下会选择周期性生成方式也就是AssignerWithPeriodicWatermarks方式,使用方式如下:

代码语言:javascript
复制
//指定为evenTime时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//生成watermark的周期
env.getConfig.setAutoWatermarkInterval(watermarkInterval)
//指定方式
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {
   override def extractTimestamp(element: Element): Long = element.dT
  })

BoundedOutOfOrdernessTimestampExtractor 是Flink内置提供的允许乱序最大延时的watermark生成方式,只需要重写其extractTimestamp方法即可。

assignTimestampsAndWatermarks 可以理解为是一个算子转换操作,等同于map/window一样理解,可以为其设置并行度、名称,也是一个transformation/operator,

代码语言:javascript
复制
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
      AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

    final int inputParallelism = getTransformation().getParallelism();
    final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

    TimestampsAndPeriodicWatermarksOperator<T> operator =
        new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

    return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
        .setParallelism(inputParallelism);
  }

在生成的jobGraph中,也是作为其中的一部分:

默认的名称就是 Timestamps/Watermarks。

接下来深入分析其使用的StreamOperator类型TimestampsAndPeriodicWatermarksOperator,其继承了AbstractUdfStreamOperator,实现了OneInputStreamOperator接口与ProcessingTimeCallback接口,具体包含的方法: open方法:

代码语言:javascript
复制
public void open() throws Exception {
    super.open();
    //初始化默认当前watermark
    currentWatermark = Long.MIN_VALUE;
    //生成watermark周期时间配置
    watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
    //注册定时其配置
    if (watermarkInterval > 0) {
      long now = getProcessingTimeService().getCurrentProcessingTime();
      getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }
  }

最重要的就是getProcessingTimeService().registerTimer

注册一个watermarkInterval后触发的定时器,传入回调参数是this,也就是会调用当前对象的onProcessingTime方法(关于这部分知识可以查看Flink的定时系列)。

processElement方法:

代码语言:javascript
复制
public void processElement(StreamRecord<T> element) throws Exception {
    final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
        element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

    output.collect(element.replace(element.getValue(), newTimestamp));
  }

提取当前的事件时间,在BoundedOutOfOrdernessTimestampExtractor中会保存当前最大的事件时间。

onProcessingTime方法:

代码语言:javascript
复制
public void onProcessingTime(long timestamp) throws Exception {    // register next timer
    Watermark newWatermark = userFunction.getCurrentWatermark();
    //当新的watermark大于当前的watermark
    if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
      currentWatermark = newWatermark.getTimestamp();
      //将符合要求的watermark发送出去
      output.emitWatermark(newWatermark);
    }
    //注册下一次触发时间
    long now = getProcessingTimeService().getCurrentProcessingTime();
    getProcessingTimeService().registerTimer(now + watermarkInterval, this);
  }

该方法表示的就是定时回调的方法,将符合要求的watermark发送出去并且注册下一个定时器。另外该方法与processElement方法是两个互斥的方法,内部使用了同一把锁做同步控制。

processWatermark方法:

代码语言:javascript
复制
public void processWatermark(Watermark mark) throws Exception {
    if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
      currentWatermark = Long.MAX_VALUE;
      output.emitWatermark(mark);
    }
  }

用来处理上游发送过来的watermark,可以认为不做任何处理,下游的watermark只与其上游最近的生成方式相关。

—END—

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档