前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink SQL 如何定义时间属性

Flink SQL 如何定义时间属性

作者头像
smartsi
发布2021-10-08 16:41:08
1.7K0
发布2021-10-08 16:41:08
举报
文章被收录于专栏:SmartSiSmartSi

Flink 版本:1.11

本文将解释如何在 Flink 的 Table API 和 SQL 中为基于时间的操作定义时间属性。

1. 时间属性介绍

基于时间的操作,例如,Table API 和 SQL 查询中的窗口,需要知道时间相关的信息。因此,表需要提供逻辑时间属性以指明时间以及提供访问相应的时间戳。时间属性可以作为表 schema 的一部分,可以在用 CREATE TABLE DDL 语句创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。一旦时间属性定义好,就可以像普通列一样使用,也可以在时间相关的操作中使用。

只要时间属性没有被修改,只是从查询的一部分转发到另一部分,那么仍然是一个有效的时间属性。时间属性的行为类似于常规时间戳,并可用于计算。当在计算中使用时,时间属性被物化为一个标准时间戳。但是,不能使用普通时间戳来代替时间属性,也不能将其转换为时间属性。

2. 如何定义时间属性

Flink 可以根据如下两种时间概念来处理数据:

  • 处理时间是指机器执行相应操作的系统时间(也称为纪元时间,例如 Java 的 System.currentTimeMillis())。
  • 事件时间是指根据每一行中的时间戳来处理数据流。

因此,时间属性可以是基于处理时间的,也可以基于事件时间。此外,时间属性可以作为表 schema 的一部分,可以在用 CREATE TABLE DDL 语句创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。

2.1 处理时间

处理时间是基于机器的本地时间来处理数据,是最简单的一种时间概念,但是它不能提供确定性的结果。不同于事件时间,既不需要从数据里获取时间戳,也不需要生成 watermark。

2.1.1 在 DDL 中定义

处理时间属性可以在用 CREATE TABLE DDL 语句创建表时用计算列的方式定义。可以使用 PROCTIME() 函数定义处理时间,函数的返回类型是 TIMESTAMP_LTZ 类型。

代码语言:javascript
复制
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外列作为处理时间属性
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

计算列是一个虚拟列,使用 column_name AS computed_column_expression 语法生成,例如,cost AS price * quanitity,其中 price 和 quanitity 是表中的两个实际物理列。

2.1.2 在 DataStream 到 Table 转换时定义

在 DataStream 转换 Table 时,处理时间属性是在 schema 定义时使用 .proctime 属性定义。时间属性只能通过一个额外的逻辑字段来扩展物理 schema。因此,只能在 schema 定义的末尾进行定义。

代码语言:javascript
复制
DataStream<Tuple2<String, String>> stream = ...;

Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

WindowedTable windowedTable = table.window(
  Tumble.over(lit(10).minutes())
      .on($("user_action_time"))
      .as("userActionWindow")
);
2.1.3 在 TableSource 中定义

处理时间属性可以在实现了 DefinedProctimeAttribute 的 TableSource 中定义。逻辑时间属性会放在 TableSource 已有物理字段的最后。

代码语言:javascript
复制
// 定义一个由处理时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"user_name" , "data"};
        TypeInformation[] types = new TypeInformation[] { Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
      // create stream
      DataStream<Row> stream = ;
      return stream;
    }

    @Override
    public String getProctimeAttribute() {
      // 这个名字的列会被追加到最后,作为第三列
      return "user_action_time";
    }
}

// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
  .from("user_actions")
  .window(Tumble
      .over(lit(10).minutes())
      .on($("user_action_time"))
      .as("userActionWindow"));

2.2 事件时间

事件时间允许 Table 程序根据每条记录中的时间戳生成结果,即使出现乱序或延迟事件也能获得一致的结果。此外,事件时间可以为在批处理和流环境中的 Table 程序提供统一的语法。流环境中的时间属性可以是批处理环境中一行的常规列。

为了处理乱序事件并区分流中的 on-time 和 late 事件,Flink 需要知道每一行的时间戳,并且还需要知道到目前为止处理进展(通过 Watermark)。

2.2.1 在 DDL 中定义

事件时间属性可以用 CREATE TABLE DDL 语句创建表时用 WATERMARK 语句定义。WATERMARK 语句在现有事件时间字段上定义 WATERMARK 生成表达式,将事件时间字段标记为事件时间属性。Flink 支持在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义事件时间属性。如果 Source 中的时间戳数据为年-月-日-时-分-秒这种格式,一般是没有时区信息的字符串值,例如,2020-04-15 20:13:40.564,建议将事件时间属性定义为 TIMESTAMP 列:

代码语言:javascript
复制
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

如果 Source 数据中的时间戳数据是一个纪元 (epoch) 时间,一般是一个 Long 值,例如,1618989564564,建议将事件时间属性定义在 TIMESTAMP_LTZ 列上:

代码语言:javascript
复制
CREATE TABLE user_actions (
 user_name STRING,
 data STRING,
 ts BIGINT,
 time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
 -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
 WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
 ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
2.2.2 在 DataStream 到 Table 转换时定义

在 DataStream 转换 Table 时,事件时间属性是在 schema 定义时使用 .rowtime 属性定义。在转换之前,时间戳和 watermark 在 DataStream 必须先设置好。在转换过程中,由于 DataStream 没有时区概念,因此 Flink 总是将 rowtime 属性解析成 TIMESTAMP WITHOUT TIME ZONE 类型,并且将所有事件时间的值都视为 UTC 时区的值。

在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于指定的 .rowtime 字段名称是否已经存在于 DataStream 的 schema 中,事件时间字段可以是:

  • 在 schema 结尾追加一个新的字段
  • 替换一个已经存在的字段。

不管在哪种情况下,事件时间戳字段都会保存 DataStream 事件的时间戳。

代码语言:javascript
复制
// (1) 追加一个新的字段

// 提取时间戳并分配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 声明一个额外逻辑字段作为事件时间属性
// 在 schema 的末尾使用 user_action_time.rowtime 定义事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());

// (2) 替换一个已经存在的字段

// 从第一个字段提取时间戳并分配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 第一个字段已经用来提取时间戳,因此不在必须,可以直接使用事件时间属性替换这个字段
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

// Usage:

WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));
2.2.3 在 TableSource 中定义

事件时间属性可以在实现了 DefinedRowTimeAttributes 的 TableSource 中定义。getRowtimeAttributeDescriptors() 方法返回一个 RowtimeAttributeDescriptor 列表,包含了事件时间属性名字、用来计算属性值的时间戳提取器以及 watermark 生成策略等信息。

需要确保 getDataStream() 方法返回的 DataStream 与定义的时间属性对齐。只有在定义了 StreamRecordTimestamp 时间戳分配器的时候,才认为 DataStream 有时间戳(由 TimestampAssigner 分配的时间戳)。只有定义了 PreserveWatermarks watermark 生成策略,DataStream 的 watermark 才会被保留。否则,只有时间字段的值是生效的。

代码语言:javascript
复制
// 定义一个有事件时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
  @Override
  public TypeInformation<Row> getReturnType() {
    String[] names = new String[] {"user_name", "data", "user_action_time"};
    TypeInformation[] types =
        new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
    return Types.ROW(names, types);
  }

  @Override
  public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    // 构造 DataStream
    // ...
    // 基于 "user_action_time" 定义 watermark
    DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
    return stream;
  }

  @Override
  public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
    // 标记 "user_action_time" 字段是事件时间字段
    // 给 "user_action_time" 构造一个时间属性描述符
    RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
        "user_action_time",
        new ExistingField("user_action_time"),
        new AscendingTimestamps()
    );
    List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
    return listRowtimeAttrDescr;
  }
}

// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

原文:Time Attributes

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-10-022,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 时间属性介绍
  • 2. 如何定义时间属性
    • 2.1 处理时间
      • 2.1.1 在 DDL 中定义
      • 2.1.2 在 DataStream 到 Table 转换时定义
      • 2.1.3 在 TableSource 中定义
    • 2.2 事件时间
      • 2.2.1 在 DDL 中定义
      • 2.2.2 在 DataStream 到 Table 转换时定义
      • 2.2.3 在 TableSource 中定义
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档