前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink Table的Time Attributes

聊聊flink Table的Time Attributes

原创
作者头像
code4it
发布2019-02-01 14:31:31
1.7K0
发布2019-02-01 14:31:31
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink Table的Time Attributes

Processing time

通过fromDataStream定义

代码语言:javascript
复制
DataStream<Tuple2<String, String>> stream = ...;
​
// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
​
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 从DataStream创建Table的话,可以在fromDataStream里头进行定义Processing time

通过TableSource定义

代码语言:javascript
复制
// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
​
    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"Username" , "Data"};
        TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }
​
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // create stream
        DataStream<Row> stream = ...;
        return stream;
    }
​
    @Override
    public String getProctimeAttribute() {
        // field with this name will be appended as a third field
        return "UserActionTime";
    }
}
​
// register table source
tEnv.registerTableSource("UserActions", new UserActionSource());
​
WindowedTable windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 通过TableSource创建Table的话,可以通过实现DefinedProctimeAttribute接口来定义Processing time

Event time

通过fromDataStream定义

代码语言:javascript
复制
// Option 1:
​
// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
​
// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
​
​
// Option 2:
​
// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
​
// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
​
// Usage:
​
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 从DataStream创建Table的话,可以在fromDataStream里头进行定义Event time;具体有两种方式,一种是额外定义一个字段,一种是覆盖原有的字段

通过TableSource定义

代码语言:javascript
复制
// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
​
    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"Username", "Data", "UserActionTime"};
        TypeInformation[] types =
            new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
        return Types.ROW(names, types);
    }
​
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // create stream
        // ...
        // assign watermarks based on the "UserActionTime" attribute
        DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
        return stream;
    }
​
    @Override
    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        // Mark the "UserActionTime" attribute as event-time attribute.
        // We create one attribute descriptor of "UserActionTime".
        RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
            "UserActionTime",
            new ExistingField("UserActionTime"),
            new AscendingTimestamps());
        List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
        return listRowtimeAttrDescr;
    }
}
​
// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());
​
WindowedTable windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 通过TableSource创建Table的话,可以通过实现DefinedRowtimeAttributes接口来定义Event time

definedTimeAttributes

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/sources/definedTimeAttributes.scala

代码语言:javascript
复制
/**
  * Extends a [[TableSource]] to specify a processing time attribute.
  */
trait DefinedProctimeAttribute {
​
  /**
    * Returns the name of a processing time attribute or null if no processing time attribute is
    * present.
    *
    * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of
    * type [[Types.SQL_TIMESTAMP]].
    */
  @Nullable
  def getProctimeAttribute: String
}
​
/**
  * Extends a [[TableSource]] to specify rowtime attributes via a
  * [[RowtimeAttributeDescriptor]].
  */
trait DefinedRowtimeAttributes {
​
  /**
    * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table.
    *
    * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of
    * type [[Types.SQL_TIMESTAMP]].
    *
    * @return A list of [[RowtimeAttributeDescriptor]].
    */
  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
​
/**
  * Describes a rowtime attribute of a [[TableSource]].
  *
  * @param attributeName The name of the rowtime attribute.
  * @param timestampExtractor The timestamp extractor to derive the values of the attribute.
  * @param watermarkStrategy The watermark strategy associated with the attribute.
  */
class RowtimeAttributeDescriptor(
  val attributeName: String,
  val timestampExtractor: TimestampExtractor,
  val watermarkStrategy: WatermarkStrategy) {
​
  /** Returns the name of the rowtime attribute. */
  def getAttributeName: String = attributeName
​
  /** Returns the [[TimestampExtractor]] for the attribute. */
  def getTimestampExtractor: TimestampExtractor = timestampExtractor
​
  /** Returns the [[WatermarkStrategy]] for the attribute. */
  def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy
​
  override def equals(other: Any): Boolean = other match {
    case that: RowtimeAttributeDescriptor =>
        Objects.equals(attributeName, that.attributeName) &&
        Objects.equals(timestampExtractor, that.timestampExtractor) &&
        Objects.equals(watermarkStrategy, that.watermarkStrategy)
    case _ => false
  }
​
  override def hashCode(): Int = {
    Objects.hash(attributeName, timestampExtractor, watermarkStrategy)
  }
}
  • DefinedProctimeAttribute定义了getProctimeAttribute方法,返回String,用于定义Process time的字段名;DefinedRowtimeAttributes定义了getRowtimeAttributeDescriptors方法,返回的是RowtimeAttributeDescriptor的List,RowtimeAttributeDescriptor有3个属性,分别是attributeName、timestampExtractor及watermarkStrategy

小结

  • 在从DataStream或者TableSource创建Table时可以指定Time Attributes,指定了之后就可以作为field来使用或者参与time-based的操作
  • 针对Processing time,如果从DataStream创建Table的话,可以在fromDataStream里头进行定义;通过TableSource创建Table的话,可以通过实现DefinedProctimeAttribute接口来定义Processing time;DefinedProctimeAttribute定义了getProctimeAttribute方法,返回String,用于定义Process time的字段名
  • 针对Event time,如果从DataStream创建Table的话,可以在fromDataStream里头进行定义;具体有两种方式,一种是额外定义一个字段,一种是覆盖原有的字段;通过TableSource创建Table的话,可以通过实现DefinedRowtimeAttributes接口来定义Event time;DefinedRowtimeAttributes定义了getRowtimeAttributeDescriptors方法,返回的是RowtimeAttributeDescriptor的List,RowtimeAttributeDescriptor有3个属性,分别是attributeName、timestampExtractor及watermarkStrategy

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Processing time
    • 通过fromDataStream定义
      • 通过TableSource定义
      • Event time
        • 通过fromDataStream定义
          • 通过TableSource定义
          • definedTimeAttributes
          • 小结
          • doc
          相关产品与服务
          大数据
          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档