首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Flink窗口聚合方法在时间戳上的失败

Flink窗口聚合方法在时间戳上的失败
EN

Stack Overflow用户
提问于 2022-11-22 06:32:28
回答 2查看 33关注 0票数 0

我们数据库里有一张表A。我们使用flink SQL JdbcCatalog将该表加载到Flink中。

下面是我们如何加载数据

val catalog = new JdbcCatalog("my_catalog", "database_name", username, password, url)

streamTableEnvironment.registerCatalog("my_catalog", catalog) streamTableEnvironment.useCatalog("my_catalog")

val query = "select timestamp, count from A"

val sourceTable = streamTableEnvironment.sqlQuery(query) streamTableEnvironment.createTemporaryView("innerTable", sourceTable)

val aggregationQuery = select window_end, sum(count) from TABLE(TUMBLE(TABLE innerTable, DESCRIPTOR(timestamp), INTERVAL '10' minutes)) group by window_end

它抛出以下错误Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval[, datetime interval]) requires the timecol is a time attribute type, but is TIMESTAMP(6).

简而言之,我们希望在已经存在的列上应用窗口聚合。我们该怎么做?这是批处理。

EN

回答 2

Stack Overflow用户

发布于 2022-11-22 11:00:07

在Flink中用作时间属性的时间戳列必须是时间戳(3)或TIMESTAMP_LTZ(3)。

票数 0
EN

Stack Overflow用户

发布于 2022-11-22 20:29:35

列应该是时间戳(3)或TIMESTAMP_LTZ(3),但该列也应该标记为ROWTIME。

在代码中键入这一行

代码语言:javascript
运行
复制
sourceTable.printSchema();

检查结果。该列应标记为ROWTIME,如下所示。

代码语言:javascript
运行
复制
(
  `deviceId` STRING,
  `dataStart` BIGINT,
  `recordCount` INT,
  `time_Insert` BIGINT,
  `time_Insert_ts` TIMESTAMP(3) *ROWTIME*
)

你可以在下面找到我的样本。

代码语言:javascript
运行
复制
        Table tableCpuDataCalculatedTemp = tableEnv.fromDataStream(streamCPUDataCalculated, Schema.newBuilder()
                        .column("deviceId", DataTypes.STRING())
                        .column("dataStart", DataTypes.BIGINT())
                        .column("recordCount", DataTypes.INT())
                        .column("time_Insert", DataTypes.BIGINT())
                        .column("time_Insert_ts", DataTypes.TIMESTAMP(3))
                        .watermark("time_Insert_ts", "time_Insert_ts")
                        .build());

水印方法使其具有时效性

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74528345

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档