专栏首页SmartSiFlink SQL 如何定义时间属性

Flink SQL 如何定义时间属性

Flink 版本:1.11

本文将解释如何在 Flink 的 Table API 和 SQL 中为基于时间的操作定义时间属性。

1. 时间属性介绍

基于时间的操作,例如,Table API 和 SQL 查询中的窗口,需要知道时间相关的信息。因此,表需要提供逻辑时间属性以指明时间以及提供访问相应的时间戳。时间属性可以作为表 schema 的一部分,可以在用 CREATE TABLE DDL 语句创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。一旦时间属性定义好,就可以像普通列一样使用,也可以在时间相关的操作中使用。

只要时间属性没有被修改,只是从查询的一部分转发到另一部分,那么仍然是一个有效的时间属性。时间属性的行为类似于常规时间戳,并可用于计算。当在计算中使用时,时间属性被物化为一个标准时间戳。但是,不能使用普通时间戳来代替时间属性,也不能将其转换为时间属性。

2. 如何定义时间属性

Flink 可以根据如下两种时间概念来处理数据:

  • 处理时间是指机器执行相应操作的系统时间(也称为纪元时间,例如 Java 的 System.currentTimeMillis())。
  • 事件时间是指根据每一行中的时间戳来处理数据流。

因此,时间属性可以是基于处理时间的,也可以基于事件时间。此外,时间属性可以作为表 schema 的一部分,可以在用 CREATE TABLE DDL 语句创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。

2.1 处理时间

处理时间是基于机器的本地时间来处理数据,是最简单的一种时间概念,但是它不能提供确定性的结果。不同于事件时间,既不需要从数据里获取时间戳,也不需要生成 watermark。

2.1.1 在 DDL 中定义

处理时间属性可以在用 CREATE TABLE DDL 语句创建表时用计算列的方式定义。可以使用 PROCTIME() 函数定义处理时间,函数的返回类型是 TIMESTAMP_LTZ 类型。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外列作为处理时间属性
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

计算列是一个虚拟列,使用 column_name AS computed_column_expression 语法生成,例如,cost AS price * quanitity,其中 price 和 quanitity 是表中的两个实际物理列。

2.1.2 在 DataStream 到 Table 转换时定义

在 DataStream 转换 Table 时,处理时间属性是在 schema 定义时使用 .proctime 属性定义。时间属性只能通过一个额外的逻辑字段来扩展物理 schema。因此,只能在 schema 定义的末尾进行定义。

DataStream<Tuple2<String, String>> stream = ...;

Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

WindowedTable windowedTable = table.window(
  Tumble.over(lit(10).minutes())
      .on($("user_action_time"))
      .as("userActionWindow")
);

2.1.3 在 TableSource 中定义

处理时间属性可以在实现了 DefinedProctimeAttribute 的 TableSource 中定义。逻辑时间属性会放在 TableSource 已有物理字段的最后。

// 定义一个由处理时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"user_name" , "data"};
        TypeInformation[] types = new TypeInformation[] { Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
      // create stream
      DataStream<Row> stream = ;
      return stream;
    }

    @Override
    public String getProctimeAttribute() {
      // 这个名字的列会被追加到最后,作为第三列
      return "user_action_time";
    }
}

// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
  .from("user_actions")
  .window(Tumble
      .over(lit(10).minutes())
      .on($("user_action_time"))
      .as("userActionWindow"));

2.2 事件时间

事件时间允许 Table 程序根据每条记录中的时间戳生成结果,即使出现乱序或延迟事件也能获得一致的结果。此外,事件时间可以为在批处理和流环境中的 Table 程序提供统一的语法。流环境中的时间属性可以是批处理环境中一行的常规列。

为了处理乱序事件并区分流中的 on-time 和 late 事件,Flink 需要知道每一行的时间戳,并且还需要知道到目前为止处理进展(通过 Watermark)。

2.2.1 在 DDL 中定义

事件时间属性可以用 CREATE TABLE DDL 语句创建表时用 WATERMARK 语句定义。WATERMARK 语句在现有事件时间字段上定义 WATERMARK 生成表达式,将事件时间字段标记为事件时间属性。Flink 支持在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义事件时间属性。如果 Source 中的时间戳数据为年-月-日-时-分-秒这种格式,一般是没有时区信息的字符串值,例如,2020-04-15 20:13:40.564,建议将事件时间属性定义为 TIMESTAMP 列:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

如果 Source 数据中的时间戳数据是一个纪元 (epoch) 时间,一般是一个 Long 值,例如,1618989564564,建议将事件时间属性定义在 TIMESTAMP_LTZ 列上:

CREATE TABLE user_actions (
 user_name STRING,
 data STRING,
 ts BIGINT,
 time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
 -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
 WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
 ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

2.2.2 在 DataStream 到 Table 转换时定义

在 DataStream 转换 Table 时,事件时间属性是在 schema 定义时使用 .rowtime 属性定义。在转换之前,时间戳和 watermark 在 DataStream 必须先设置好。在转换过程中,由于 DataStream 没有时区概念,因此 Flink 总是将 rowtime 属性解析成 TIMESTAMP WITHOUT TIME ZONE 类型,并且将所有事件时间的值都视为 UTC 时区的值。

在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于指定的 .rowtime 字段名称是否已经存在于 DataStream 的 schema 中,事件时间字段可以是:

  • 在 schema 结尾追加一个新的字段
  • 替换一个已经存在的字段。

不管在哪种情况下,事件时间戳字段都会保存 DataStream 事件的时间戳。

// (1) 追加一个新的字段

// 提取时间戳并分配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 声明一个额外逻辑字段作为事件时间属性
// 在 schema 的末尾使用 user_action_time.rowtime 定义事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());

// (2) 替换一个已经存在的字段

// 从第一个字段提取时间戳并分配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 第一个字段已经用来提取时间戳,因此不在必须,可以直接使用事件时间属性替换这个字段
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

// Usage:

WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));

2.2.3 在 TableSource 中定义

事件时间属性可以在实现了 DefinedRowTimeAttributes 的 TableSource 中定义。getRowtimeAttributeDescriptors() 方法返回一个 RowtimeAttributeDescriptor 列表,包含了事件时间属性名字、用来计算属性值的时间戳提取器以及 watermark 生成策略等信息。

需要确保 getDataStream() 方法返回的 DataStream 与定义的时间属性对齐。只有在定义了 StreamRecordTimestamp 时间戳分配器的时候,才认为 DataStream 有时间戳(由 TimestampAssigner 分配的时间戳)。只有定义了 PreserveWatermarks watermark 生成策略,DataStream 的 watermark 才会被保留。否则,只有时间字段的值是生效的。

// 定义一个有事件时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
  @Override
  public TypeInformation<Row> getReturnType() {
    String[] names = new String[] {"user_name", "data", "user_action_time"};
    TypeInformation[] types =
        new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
    return Types.ROW(names, types);
  }

  @Override
  public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    // 构造 DataStream
    // ...
    // 基于 "user_action_time" 定义 watermark
    DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
    return stream;
  }

  @Override
  public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
    // 标记 "user_action_time" 字段是事件时间字段
    // 给 "user_action_time" 构造一个时间属性描述符
    RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
        "user_action_time",
        new ExistingField("user_action_time"),
        new AscendingTimestamps()
    );
    List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
    return listRowtimeAttrDescr;
  }
}

// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

原文:Time Attributes

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 1.12 Release 文档解读

    本文的 Release 文档描述了在 Flink 1.11 和 Flink 1.12 之间更改的重要方面,例如配置,行为或依赖项。如果您打算将 Flink 版本...

    zhisheng
  • Flink 1.11:更好用的流批一体 SQL 引擎

    许多的数据科学家,分析师和 BI 用户依赖交互式 SQL 查询分析数据。Flink SQL 是 Flink 的核心模块之一。作为一个分布式的 SQL 查询引擎。...

    数据社
  • Flink SQL 如何实现数据流的 Join?

    无论在 OLAP 还是 OLTP 领域,Join 都是业务常会涉及到且优化规则比较复杂的 SQL 语句。对于离线计算而言,经过数据库领域多年的积累,Join 语...

    zhisheng
  • Flink重点难点:Flink Table&SQL必知必会(一)

    Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。

    王知无-import_bigdata
  • Flink SQL 客户端如何使用

    Flink 的 Table & SQL API 可以处理 SQL 语言编写的查询语句,但是这些查询需要嵌入用 Java 或 Scala 编写的 Table 程序...

    smartsi
  • Flink Table&SQL必知必会(干货建议收藏)

    Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。

    大数据老哥
  • Flink SQL DDL 和 窗口函数实战

    2019 年 8 月 22 日,Flink 发布了 1.9 版本,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念...

    kk大数据
  • Flink流之动态表详解

    问题导读 1.动态表有什么特点? 2.流处理与批处理转换为表后有什么相同之处? 3.动态表和连续查询是什么关系? 4.连续查询本文列举了什么例子? 5.Flin...

    用户1410343
  • Flink从1.7到1.12版本升级汇总

    最进再看官方flink提供的视频教程,发现入门版本因为时间关系都是基于1.7.x讲解的. 在实际操作中跟1.12.x版本还是有差距的, 所以整理一下从1.7 版...

    王知无-import_bigdata
  • Flink on Hive构建流批一体数仓

    Flink使用HiveCatalog可以通过批或者流的方式来处理Hive中的表。这就意味着Flink既可以作为Hive的一个批处理引擎,也可以通过...

    大数据老哥
  • 腾讯基于 Flink SQL 的功能扩展与深度优化实践

    摘要:本文由腾讯高级工程师杜立分享,主要介绍腾讯实时计算平台针对 Flink SQL 所做的优化,内容包括:

    Spark学习技巧
  • Flink集成Iceberg小小实战

    Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds ...

    大数据真好玩
  • 前沿 | 深入解读 Flink SQL 1.13

    摘要:本文由社区志愿者陈政羽整理,Apache Flink 社区在 5 月份发布了 1.13 版本,带来了很多新的变化。文章整理自徐榜江(雪尽) 5 月 22 ...

    公众号:大数据羊说
  • Apache Flink 1.12.0 正式发布,流批一体真正统一运行!

    Apache Flink 社区很荣幸地宣布 Flink 1.12.0 版本正式发布!近 300 位贡献者参与了 Flink 1.12.0 的开发,提交了超过 1...

    zhisheng
  • CODING 敏捷开发:如何自定义属性

    CODING 承载了最先进的敏捷研发理论,能够帮助您和您的团队快速入门敏捷研发,并通过标准化的流程和完整的信息统计成为企业实践敏捷研发的好工具。在上一篇视频指南...

    CODING
  • Flink 最锋利的武器:Flink SQL 入门和实战

    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

    王知无-import_bigdata
  • Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码

    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

    AI科技大本营
  • Apache-Flink-持续查询(ContinuousQueries)

    摘要:实际问题 我们知道在流计算场景中,数据是源源不断的流入的,数据流永远不会结束,那么计算就永远不会结束,如果计算永远不会结束的话,那么计算结果何时输出呢?本...

    王知无-import_bigdata
  • 大数据云原生系列| 微信 Flink on Kubernetes 实战总结

    涂小刚,微信高级开发工程师,负责微信大数据平台开发及建设。 王玉君,腾讯云后台高级开发工程师,负责腾讯云原生系统开发及建设。 前言 架构转型,拥抱云原生服务...

    腾讯云原生

扫码关注云+社区

领取腾讯云代金券