首页
学习
活动
专区
圈层
工具
发布
34 篇文章
1
腾讯云流计算 Oceanus 最佳实践&解决方案汇总
2
腾讯云流计算 Oceanus Connector 使用示例汇总
3
Flink 实践教程:进阶11-SQL 关联:Regular Join
4
Flink 实践教程:进阶10-自定义聚合函数(UDAF)
5
Flink 实践教程:进阶9-自定义表值函数(UDTF)
6
Flink 实践教程:进阶8-自定义标量函数(UDF)
7
Flink 实践教程:进阶7-基础运维
8
Flink 实践教程:进阶6-CEP 复杂事件处理
9
Flink 实践教程:进阶5-乱序调整
10
Flink 实践教程:进阶4-窗口 TOP N
11
Flink 实践教程:进阶3-窗口操作
12
Flink 实践教程:进阶2-复杂格式数据抽取
13
Flink 实践教程:进阶1-维表关联
14
Flink 实践教程:入门10-Python作业的使用
15
Flink 实践教程:入门9-Jar 作业开发
16
Flink 实践教程:入门8-简单 ETL 作业
17
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
18
Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse
19
Flink 实践教程:入门5-写入 ClickHouse
20
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
21
Flink 实践教程:入门3-读取 MySQL 数据
22
Flink 实践教程:入门2-写入 Elasticsearch
23
Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务
24
Oceanus 实践-从0到1接入 CKafka SQL 作业
25
Oceanus 实践-从0到1开发ClickHouse SQL作业
26
Oceanus 实践-从0到1开发PG SQL作业
27
基于腾讯云Oceanus实现MySQL和Hbase维表到数据仓库ClickHouse的实时分析
28
基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
29
Flink社区 | Flink CDC 2.0 正式发布,核心改进详解
30
用Python进行实时计算——PyFlink快速入门
31
实时数据湖:Flink CDC流式写入Hudi
32
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
33
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
34
Flink Metrics&REST API 介绍和原理解析

Oceanus 实践-从0到1开发ClickHouse SQL作业

实时即未来,最近在腾讯云流计算 Oceanus 进行实时计算服务,分享给大家~

1. 环境搭建

1.1. 创建 Oceanus 集群

在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。

若之前未使用过VPC,日志(CLS),存储(COS)这些组件,需要先进行创建。

VPC及子网需要和下面的ClickHouse集群使用同一个,否则需要手动打通(如对等连接)。

创建完后的集群如下:

Oceanus集群

1.2 创建CDW ClickHouse集群

在云数据仓库控制台创建ClickHouse集群,这里为了简单,选择了与Oceanus同一个地域,同可用区。网络选择也选择与上面同样的VPC。

创建完后的集群如下:

CDW ClickHouse集群

登录集群,创建clickhouse表:通过同网段(同VPC,无需同子网)的云服务器(CVM)安装clickhouse-client进行登录。

clickhouse-client -m -h 172.28.28.85 --port 9000

创建表语句如下:

代码语言:txt
复制
   create database testdb;
   use testdb;
   CREATE TABLE IF NOT EXISTS testdb.clickhouse_sink ON CLUSTER default_cluster
   (
   id UInt64,
   name String,
   sign Int8
   )
   ENGINE = CollapsingMergeTree(sign)
   ORDER BY id;

至此,环境准备完毕。

2. 作业创建

2.1 创建SQL作业

在Oceanus控制台【作业管理】->【新建作业】-> SQL作业,选择刚刚新建的集群创建作业。然后在作业的【开发调试】->【作业参数】里面添加必要的connector,如clickhouse connector。

作业参数

2.2 创建Source端

此处选择Datagen来随机生成一些数据。

代码语言:txt
复制
-- Datagen Connector 可以随机生成一些数据用于测试
   -- 参见 [https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html](https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html)

CREATE TABLE random_source ( 
 f_sequence INT, 
 f_random INT, 
 f_random_str VARCHAR 
 ) WITH ( 
 'connector' = 'datagen', 
 'rows-per-second'='1',  -- 每秒产生的数据条数
 'fields.f_sequence.kind'='sequence',   -- 有界序列(结束后自动停止输出)
 'fields.f_sequence.start'='1',         -- 序列的起始值
 'fields.f_sequence.end'='10000',       -- 序列的终止值 
 'fields.f_random.kind'='random',       -- 无界的随机数
 'fields.f_random.min'='1',             -- 随机数的最小值
 'fields.f_random.max'='1000',          -- 随机数的最大值
 'fields.f_random_str.length'='10'      -- 随机字符串的长度
   );

2.3 创建Sink端

此处sink的表信息需要和CDW ClickHouse中表信息完全对应。

代码语言:txt
复制
   -- ClickHouse Sink (不完全支持upsert,详见说明文档)
   CREATE TABLE clickhouse_sink (
   `id` INTEGER,
   `name` STRING,
   `sign` INTEGER--,
   --PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
   ) WITH (
   -- 指定数据库连接参数
   'connector' = 'clickhouse',
   'url' = 'clickhouse://172.28.28.165:8123',
   -- 如果ClickHouse集群未配置账号密码可以不指定
   --'username' = 'root',
   --'password' = 'root',
   'database-name' = 'testdb',
   'table-name' = 'clickhouse_sink',
   'table.collapsing.field' = 'sign'   -- CollapsingMergeTree 类型列字段的名称
   );

2.4 算子操作

此处只做了简单的数据插入,没有进行复杂计算。

代码语言:txt
复制
   insert into clickhouse_sink select f_sequence as id, f_random_str as name, 1 as sign from random_source;

3. 验证总结

查询clickhouse数据,数据是否插入成功。

代码语言:txt
复制
select * from testddb.clickhouse_sink;
下一篇
举报
领券