首页
学习
活动
专区
圈层
工具
发布
34 篇文章
1
腾讯云流计算 Oceanus 最佳实践&解决方案汇总
2
腾讯云流计算 Oceanus Connector 使用示例汇总
3
Flink 实践教程:进阶11-SQL 关联:Regular Join
4
Flink 实践教程:进阶10-自定义聚合函数(UDAF)
5
Flink 实践教程:进阶9-自定义表值函数(UDTF)
6
Flink 实践教程:进阶8-自定义标量函数(UDF)
7
Flink 实践教程:进阶7-基础运维
8
Flink 实践教程:进阶6-CEP 复杂事件处理
9
Flink 实践教程:进阶5-乱序调整
10
Flink 实践教程:进阶4-窗口 TOP N
11
Flink 实践教程:进阶3-窗口操作
12
Flink 实践教程:进阶2-复杂格式数据抽取
13
Flink 实践教程:进阶1-维表关联
14
Flink 实践教程:入门10-Python作业的使用
15
Flink 实践教程:入门9-Jar 作业开发
16
Flink 实践教程:入门8-简单 ETL 作业
17
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
18
Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse
19
Flink 实践教程:入门5-写入 ClickHouse
20
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
21
Flink 实践教程:入门3-读取 MySQL 数据
22
Flink 实践教程:入门2-写入 Elasticsearch
23
Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务
24
Oceanus 实践-从0到1接入 CKafka SQL 作业
25
Oceanus 实践-从0到1开发ClickHouse SQL作业
26
Oceanus 实践-从0到1开发PG SQL作业
27
基于腾讯云Oceanus实现MySQL和Hbase维表到数据仓库ClickHouse的实时分析
28
基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
29
Flink社区 | Flink CDC 2.0 正式发布,核心改进详解
30
用Python进行实时计算——PyFlink快速入门
31
实时数据湖:Flink CDC流式写入Hudi
32
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
33
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
34
Flink Metrics&REST API 介绍和原理解析

Flink 实践教程:入门5-写入 ClickHouse

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用 Datagen Connector 模拟生成客户视频点击量数据,并利用滚动窗口函数对每分钟内客户的视频点击量进行聚合分析,最后将数据输出到 ClickHouse 的流程。

前置准备

创建流计算 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群

进入流计算 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,完成 Oceanus 集群的创建。具体可参考 Oceanus 官方文档创建独享集群[2]。

创建 ClickHouse 集群

进入 ClickHouse 控制台[3],点击左上角【新建集群】,完成 ClickHouse 集群的创建。具体可参考 ClickHouse 快速入门[4]。

注意:创建 Oceanus 集群和 ClickHouse 集群时所选的 VPC 必须相同。

创建 ClickHouse 表:

  1. 进入与 ClickHouse 集群同 VPC 的某一台 CVM 下,安装 ClickHouse 客户端(下载该客户端需连通外网),具体操作步骤参考 ClickHouse 快速入门[4]。
代码语言:shell
复制
 # 下载 ClickHouse-Client 命令
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86\_64/clickhouse-client-20.7.2.30-2.noarch.rpm
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86\_64/clickhouse-common-static-20.7.2.30-2.x86\_64.rpm

2. 安装客户端

代码语言:shell
复制
rpm -ivh \*.rpm

3. 使用 tcp 端口登陆 ClickHouse 集群,IP 地址可通过控制台查看

代码语言:shell
复制
clickhouse-client -hxxx.xxx.xxx.xxx --port 9000

4. 登陆 ClickHouse 集群,建表。

代码语言:sql
复制
CREATE TABLE default.datagen_to_ck on cluster default_cluster (
win_start     TIMESTAMP,
win_end       TIMESTAMP,
user_id       String,
amount_total  Int16,
Sign          Int8  )
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/datagen_to_ck', '{replica}',Sign)
ORDER BY (win_start,win_end,user_id);

流计算 Oceanus 作业

1. 创建 Source

代码语言:sql
复制
CREATE TABLE random_source ( 
    user_id   VARCHAR,
    amount    INT,
    pre_time  AS CURRENT_TIMESTAMP,
    WATERMARK FOR pre_time AS pre_time - INTERVAL '3' SECOND
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second' = '5',            -- 每秒产生的数据条数
  'fields.user_id.length' = '1',      -- 随机字符串的长度
  'fields.amount.kind' = 'random',    -- 无界的随机数
  'fields.amount.min' = '1',          -- 随机数的最小值
  'fields.amount.max' = '10'          -- 随机数的最大值
);

2. 创建 Sink

代码语言:sql
复制
CREATE TABLE clickhouse (
    win_start     TIMESTAMP(3),
    win_end       TIMESTAMP(3),
    user_id       VARCHAR,
    amount_total  BIGINT,
    PRIMARY KEY (win_start,win_end,user_id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://10.0.0.178:8123',
    --'username' = 'root',     -- 如果ClickHouse集群未配置账号密码可以不指定
    --'password' = 'root',
    'database-name' = 'default',
    'table-name' = 'datagen_to_ck',
    'table.collapsing.field' = 'Sign'   -- CollapsingMergeTree 类型列字段的名称
);

3. 编写业务 SQL

代码语言:sql
复制
INSERT INTO clickhouse
SELECT
    TUMBLE_START(pre_time,INTERVAL '1' MINUTE) AS win_start,
    TUMBLE_END(pre_time,INTERVAL '1' MINUTE) AS win_end,
    user_id,
    CAST(SUM(amount) AS BIGINT) AS amount_total
FROM random_source
GROUP BY TUMBLE(pre_time,INTERVAL '1' MINUTE),user_id;

4. 选择 Connector

点击【作业参数】,在【内置 Connector】选择 flink-connector-clickhouse,点击【保存】>【发布草稿】运行作业。

新版 Flink 1.13 集群不需要用户自己选择内置 Connector

总结

本示例使用 datagen Connecor 模拟产生随机数据,使用 TUMBLE WINDOW(滚动窗口)统计各用户(user_id)每分钟的视频点击量(amount_total),然后将数据存储在 ClickHouse 中。 更多时间窗口函数示例请参考 Oceanus 官方文档 5。

参考链接

1 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

2 创建独享集群:https://cloud.tencent.com/document/product/849/48298

3 ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou

4 ClickHouse 快速入门:https://cloud.tencent.com/document/product/1299/49824

5 Oceanus 窗口函数官方文档:https://cloud.tencent.com/document/product/849/18077

下一篇
举报
领券