前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 实践教程-进阶(3):窗口操作

Flink 实践教程-进阶(3):窗口操作

作者头像
腾讯云大数据
发布2021-12-22 09:40:21
5420
发布2021-12-22 09:40:21
举报
文章被收录于专栏:腾讯云大数据腾讯云大数据

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何实时获取 CKafka 中的 JSON 格式数据,经过 HOP WINDOW(滑动窗口)函数聚合分析后存入 ClickHouse 中。

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。

创建 Topic: 进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。

数据准备: 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。

代码语言:javascript
复制
# Kafka 客户端启动生产者命令bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic oceanus_advanced3_input --producer.config ../config/producer.properties
代码语言:javascript
复制
// 数据格式{  "id": 1,  "amount": 10,  "times": "2021-10-01 10:00:00"}

创建 ClickHouse 集群

进入 ClickHouse 控制台 [7],点击左上角【新建集群】,完成 ClickHouse 集群的创建。具体可参考 ClickHouse 快速入门 [8]。

注意:创建 Oceanus 集群和 ClickHouse 集群时所选的 VPC 建议相同。

创建 ClickHouse 表:

1.进入与 ClickHouse 集群同 VPC 的某一台 CVM 下,安装 ClickHouse 客户端(下载该客户端需连通外网),具体操作步骤参考 ClickHouse 快速入门 [8]。

代码语言:javascript
复制
# 下载 ClickHouse-Client 命令wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpmwget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm
# 安装客户端rpm -ivh *.rpm
# 使用 tcp 端口登陆 ClickHouse 集群,IP 地址可通过控制台查看clickhouse-client -hxx.xx.xx.xx --port 9000 -m

2.登陆 ClickHouse 集群,建表。

代码语言:javascript
复制
CREATE TABLE default.oceanus_advanced3_output1 on cluster default_cluster (    win_start   TIMESTAMP,    win_end     TIMESTAMP,    id          Int8,    amount_all  Int16,    Sign        Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/oceanus_advanced3_output1', '{replica}',Sign) ORDER BY (win_start,win_end,id);

Oceanus 作业

1. 创建 Source
代码语言:javascript
复制
CREATE TABLE `kafka_json_source_table` (  `id`      INT,  `amount`  INT,  `times`   TIMESTAMP(3),    WATERMARK FOR times AS times - INTERVAL '3' SECOND ) WITH (  'connector' = 'kafka',  'topic' = 'oceanus_advanced3_input',      -- 替换为您要消费的 Topic  'scan.startup.mode' = 'earliest-offset',  -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址  'properties.group.id' = 'testGroup',      -- 必选参数, 一定要指定 Group ID  'format' = 'json',  'json.fail-on-missing-field' = 'false',   -- 如果设置为 false, 则遇到缺失字段不会报错。  'json.ignore-parse-errors' = 'true'       -- 如果设置为 true,则忽略任何解析报错。);
2. 创建 Sink
代码语言:javascript
复制
CREATE TABLE `clickhouse_sink` (    `win_start`  TIMESTAMP(3),    `win_end`    TIMESTAMP(3),    `id`         INT,    `amount_all` INT,     PRIMARY KEY (win_start,win_end,id) NOT ENFORCED) WITH (    'connector' = 'clickhouse',    'url' = 'clickhouse://10.0.0.178:8123',    'database-name' = 'default',    'table-name' = 'oceanus_advanced3_output1',    'table.collapsing.field' = 'Sign'   -- CollapsingMergeTree 类型列字段的名称);
3. 编写业务 SQL
代码语言:javascript
复制
INSERT INTO clickhouse_sinkSELECTHOP_START(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE) AS win_start,  -- 滑动窗口的开始时间HOP_END(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE)   AS win_end,    -- 滑动窗口的结束时间id,SUM(amount)  AS amount_allFROM kafka_json_source_table-- 这里使用滑动窗口函数和用户 id 进行分组聚合,统计了每分钟各用户的视频点击量,每30s更新一次。GROUP BY HOP(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE),id;

新版 Flink 1.13 集群 SQL 作业不需要用户自己选择内置 Connector

总结

HOP WINDOW(滑动窗口)将元素分配到固定长度的窗口中,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。HOP WINDOW(滑动窗口)保持窗口大小(Size:INTERVAL '1' MINUTE)不变,每次滑动指定的时间周期(Slide:INTERVAL '30' SECOND),因而允许窗口之间的相互重叠。

Slide 的大小决定了 Flink 创建新窗口的频率。

  • 当 Slide 小于 Size 时,相邻窗口会重叠,一个时间会被分配到多个窗口。
  • 当 Slide 大于 Size 时,可能会导致有些事件被丢弃。
  • 当 Slide 等于 Size 时,等于是 TUMBLE WINDOW(滚动窗口)。

更多时间窗口函数请参考 Oceanus 官方文档 [9]。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1

[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839

[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854

[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840

[7] ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou

[8] ClickHouse 快速入门:https://cloud.tencent.com/document/product/1299/49824

[9] 时间窗口函数:https://cloud.tencent.com/document/product/849/18077

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 流计算 Oceanus 简介
  • 前置准备
    • 创建流计算 Oceanus 集群
      • 创建消息队列 CKafka
        • 创建 ClickHouse 集群
          • 创建 ClickHouse 表:
          • 1. 创建 Source
          • 2. 创建 Sink
          • 3. 编写业务 SQL
      • Oceanus 作业
      • 总结
      • 参考链接
      相关产品与服务
      流计算 Oceanus
      流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档