数据仓库 ClickHouse

最近更新时间:2023-07-07 16:35:32

我的收藏

介绍

ClickHouse Sink Connector 提供了对 ClickHouse 数据仓库的写入支持。ClickHouse Source Connector 提供了 ClickHouse 作为批数据源和维表的功能。

版本说明

Flink 版本
说明
1.11
支持 Sink
1.13
支持 Source 和 Sink
1.14
支持 Source 和 Sink
1.16
支持 Source 和 Sink

使用范围

ClickHouse 不支持标准的 update 和 delete 操作。作为 Sink 时,若您的任务有 update 和 delete 操作,可以通过 CollapsingMergeTree 来实现。
对于 JAR 作业,Java/Scala 编写的作业可以参考 JDBC 方式写入 ClickHouse,这里不做阐述。

DDL 定义

用作数据目的(不包含更新)(Sink with insert only)

CREATE TABLE clickhouse_sink_table (
`id` INT,
`name` STRING
) WITH (
-- 指定数据库连接参数
'connector' = 'clickhouse', -- 指定使用clickhouse连接器
'url' = 'clickhouse://172.28.28.160:8123', -- 指定集群地址,可以通过ClickHouse集群界面查看
-- 如果ClickHouse集群未配置账号密码可以不指定
--'username' = 'root', -- ClickHouse集群用户名
--'password' = 'root', -- ClickHouse集群的密码
'database-name' = 'db', -- 数据写入目的数据库
'table-name' = 'table', -- 数据写入目的数据表
'sink.batch-size' = '1000' -- 触发批量写的条数
);

包含 update 和 delete 操作的数据目的(Sink with upsert)

CREATE TABLE clickhouse_upsert_sink_table (
`id` INT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
-- 指定数据库连接参数
'connector' = 'clickhouse', -- 指定使用clickhouse连接器
'url' = 'clickhouse://172.28.28.160:8123', -- 指定集群地址,可以通过ClickHouse集群界面查看
-- 如果ClickHouse集群未配置账号密码可以不指定
--'username' = 'root', -- ClickHouse集群用户名
--'password' = 'root', -- ClickHouse集群的密码
'database-name' = 'db', -- 数据写入目的数据库
'table-name' = 'table', -- 数据写入目的数据表
'table.collapsing.field' = 'Sign', -- CollapsingMergeTree 类型列字段的名称
'sink.batch-size' = '1000' -- 触发批量写的条数
);
注意
一定要定义 pk 和申明 table.collapsing.field 字段,才会启动该功能。对应的 ClickHouse 建表语句,请参考 常见问题

作为批数据源

CREATE TABLE `clickhosue_batch_source` (
`when` TIMESTAMP,
`userid` BIGINT,
`bytes` FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://172.28.1.21:8123',
'database-name' = 'dts',
'table-name' = 'download_dist'
-- 'scan.by-part.enabled' = 'false', -- 是否启用读 ClickHouse 表 part。若启用,必须先在所有节点上使用命令 'STOP MERGES''STOP TTL MERGES' 停止表的后台 merge 和基于 TTL 的数据删除操作,否则读取的数据会不正确。
-- 'scan.part.modification-time.lower-bound' = '2021-09-24 16:00:00', -- 用于根据 modification_time 过滤 ClickHouse 表 part 的最小时间(包含),格式 yyyy-MM-dd HH:mm:ss。
-- 'scan.part.modification-time.upper-bound' = '2021-09-17 19:16:26', -- 用于根据 modification_time 过滤 ClickHouse 表 part 的最大时间(不包含),格式 yyyy-MM-dd HH:mm:ss。
-- 'local.read-write' = 'false', -- 是否读本地表,默认 false
-- 'table.local-nodes' = '172.28.1.24:8123,172.28.1.226:8123,172.28.1.109:8123,172.28.1.36:8123' -- local node 列表,需要使用 http port。注意一个 shard 只能配置一个 replica 节点地址,否则会读取到重复数据。
);
注意
MergeTree 系列引擎才支持按 part 读取,且一定要停止待读取表所有节点的后台 merge 和基于 TTL 的数据删除操作,避免 part 变化导致数据读取不准确。本地读时一个 shard 只能配置一个 replica 地址,避免重复数据。

作为维表

CREATE TABLE `clickhouse_dimension` (
`userid` BIGINT,
`comment` STRING
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://172.28.1.21:8123',
'database-name' = 'dimension',
'table-name' = 'download_dist',
'lookup.cache.max-rows' = '500', -- 查询缓存(Lookup Cache)中最多缓存的数据条数。
'lookup.cache.ttl' = '10min', -- 查询缓存中每条记录最长的缓存时间。
'lookup.max-retries' = '10' -- 数据库查询失败时,最多重试的次数。
);

WITH 参数

参数值
必填
默认值
描述
connector
-
当要使用 ClickHouse 作为数据目的(Sink)需要填写,取值 clickhouse
url
-
ClickHouse 集群连接 url,可以通过集群界面查看,举例 'clickhouse://127.1.1.1:8123'。
username
-
ClickHouse 集群用户名。
password
-
ClickHouse 集群密码。
database-name
-
ClickHouse 集群数据库。
table-name
-
ClickHouse 集群数据表。
sink.batch-size
1000
Connector batch 写入的条数。
sink.flush-interval
1000 (单位 ms)
Connector 异步线程刷新写入 ClickHouse 间隔。
table.collapsing.field
-
CollapsingMergeTree 类型列字段的名称。
sink.max-retries
3
写入失败时的重试次数。
local.read-write
false
是否启用直写本地表功能。默认不开启。
注意:该功能仅供高级用户使用,需配合下面几个参数使用。详见本文后续的 ”写入本地表“ 章节。
table.local-nodes
-
启用写本地表(local.read-writetrue)时,local node 列表,举例 '127.1.1.10:8123,127.1.2.13:8123'(需要使用 http port)。
sink.partition-strategy
balanced
启用写本地表(local.read-writetrue)时,需要设置数据分发策略,支持 balanced/shuffle/hash。如果希望实现数据的动态更新,且表引擎使用 CollapsingMergeTree,则取值必须为 hash,且需要配合 sink.partition-key 一同使用。
取值说明:balanced 轮询模式写入,shuffle 随机挑选节点写入, hash 根据 sink.partition-key 的 hash 值,选择节点写入。
sink.partition-key
-
启用写本地表(local.read-writetrue),且 sink.partition-strategyhash 时需要设置,值为所定义表中的主键。如果主键包含多个字段,则需要指定为第一个字段。
sink.ignore-delete
false
启用该参数后,会过滤所有向 ClickHouse 写入的 DELETE(删除)消息。该选项适用于使用 ReplacingMergeTree 表引擎,并期望实现数据的动态更新的场景。
sink.backpressure-aware
false
当 Flink 日志频繁出现 "Too many parts" 报错,且作业因此崩溃时,启用该参数可以大幅减轻服务端负载,提升整体的吞吐量和稳定性。
sink.reduce-batch-by-key
false
开启该参数后,对于定义了主键的 ClickHouse Sink 表,在给定的刷新周期内,同 Key 的数据会做归并,只取最后一条。
sink.max-partitions-per-insert
20
当clickhouse是分区表,且分区函数CK内置为intHash32、toYYYYMM 或toYYYYMMDD 之一时,Flink写入Clickhouse会通过预先在sink端按分区攒数据buffer,当攒的分区数目到达设定值时会触发往下游clickhouse写入(如果sink.flush-intervalsink.batch-size 先到的话也会先触发写入),极大的提高写入clickhouse的吞吐效率。设置为-1时会关闭分区聚合写入功能。
scan.fetch-size
100
每次从数据库读取时,批量获取的行数。
scan.by-part.enabled
false
是否启用读 ClickHouse 表 part。若启用,必须先在所有节点上使用命令'STOP MERGES'和'STOP TTL MERGES'停止表的后台 merge 和基于 TTL 的数据删除操作,否则读取的数据会不正确。
scan.part.modification-time.lower-bound
-
用于根据 modification_time 过滤 ClickHouse 表 part 的最小时间(包含),格式 yyyy-MM-dd HH:mm:ss。
scan.part.modification-time.upper-bound
-
用于根据 modification_time 过滤 ClickHouse 表 part 的最大时间(不包含),格式 yyyy-MM-dd HH:mm:ss。
lookup.cache.max-rows
查询缓存(Lookup Cache)中最多缓存的数据条数。
lookup.cache.ttl
查询缓存中每条记录最长的缓存时间。
lookup.max-retries
3
数据库查询失败时,最多重试的次数。
注意
定义 WITH 参数时,通常只需要填写必填参数即可。当您启用非必须参数时,请您一定要明确参数含义以及可能对数据写入产生的影响。

类型映射

关于 ClickHouse 支持的数据类型定义及其使用,可参考 ClickHouse data-types,这里列举了常用的数据类型,及其与 Flink 类型的对应关系。无特殊情况,建议按下表映射 Flink 数据类型,以避免意想不到的结果。特别注意:
DateTime:由于在 ClickHouse 中时间精度为 1 秒,且 ClickHouse 默认的配置项 date_time_input_format basic,只能解析 YYYY-MM-DD HH:MM:SS 或者 YYYY-MM-DD 格式的时间,如果您的作业出现时间解析异常(例如 java.sql.SQLException: Code: 6. DB::Exception: Cannot parse string '2023-05-24 14:34:55.166' as DateTime),请参考下表调整 Flink 中对应的数据类型为 TIMESTAMP(0),或者调整 ClickHouse 集群的 date_time_input_format 值为 best_effort。此外 ClickHouse 支持以整数格式插入 DateTime 数据,因此您也可以在 Flink 中映射类型为 INTEGER,但不推荐。
DateTime64:ClickHouse 支持以整数和 DECIMAL 格式插入 DateTime64 数据,因此您可能会在 Flink 中以 BIGINT 或者 DECIMAL 映射。如果您映射为 BIGINT 的 Flink 数据类型,请特别注意写入的 BIGINT 值要与 DateTime64 的精度匹配。例如 ClickHouse 中创建表时 DateTime64 默认的精度为毫秒,即 DateTime64(3),写入的 BIGINT 值也需要为毫秒级,示例 1672542671000(2023-01-01 11:11:11.000)。为了避免相关问题,建议参考下表映射为 TIMESTAMP 数据类型。
ClickHouse 数据类型
Flink 数据类型
Java 数据类型
String
VARCHAR/STRING
String
FixedString(N)
VARCHAR/STRING
String
Bool
BOOLEAN
Byte
Int8
TINYINT
Byte
UInt8
SMALLINT
Short
Int16
SMALLINT
Short
UInt16
INTEGER
Integer
Int32
INTEGER
Integer
UInt32
BIGINT
Long
Int64
BIGINT
Long
UInt64
BIGINT
Long
Int128
DECIMAL
BigInteger
UInt128
DECIMAL
BigInteger
Int256
DECIMAL
BigInteger
UInt256
DECIMAL
BigInteger
Float32
FLOAT
Float
Float64
DOUBLE
Double
Decimal(P,S)/Decimal32(S)/Decimal64(S)/Decimal128(S)/Decimal256(S)
DECIMAL
BigDecimal
Date
DATE
LocalDateTime
DateTime([timezone])
TIMESTAMP(0)
LocalDateTime
DateTime64(precision, [timezone])
TIMESTAMP(precision)
LocalDateTime
Array(T)
ARRAY<T>
T[]
Map(K, V)
MAP<K, V>
Map<?, ?>
Tuple(T1, T2, ...)
ROW<f1 T1, f2 T2, ...>
List<Object>

代码示例

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);
CREATE TABLE clickhouse_sink_table (
`id` INT,
`name` STRING
) WITH (
-- 指定数据库连接参数
'connector' = 'clickhouse', -- 指定使用clickhouse连接器
'url' = 'clickhouse://172.28.28.160:8123', -- 指定集群地址,可以通过ClickHouse集群界面查看
-- 如果ClickHouse集群未配置账号密码可以不指定
--'username' = 'root', -- ClickHouse集群用户名
--'password' = 'root', -- ClickHouse集群的密码
'database-name' = 'db', -- 数据写入目的数据库
'table-name' = 'table', -- 数据写入目的数据表
'sink.batch-size' = '1000' -- 触发批量写的条数
);
insert into clickhouse_sink_table select * from datagen_source_table;

常见问题

关于数据更新(Upsert)和删除(Delete)语义

对于需要动态更新和删除数据的场景,由于 ClickHouse 并不完全支持 Upsert 语义,我们通常使用 ReplacingMergeTree 或 CollapsingMergeTree 来模拟更新或删除操作,他们有各自的适用场景。

ReplacingMergeTree 表引擎

如果 ClickHouse 底层表引擎使用 ReplacingMergeTree,则可以设置 sink.ignore-delete 参数为 true。此后 Flink 会自动过滤所有的删除(DELETE)消息,并将插入(INSERT)和更新(UPDATE_AFTER)消息则统一转为插入(INSERT)消息。随后,ClickHouse 底层会自动使用最新写入的记录来覆盖之前同主键的旧记录,从而实现数据的更新。
需要注意的是,此模式只支持插入、更新,而不支持删除数据。因此如果对于 CDC 等需要精确同步的简单 ETL 场景,则使用下面介绍的 CollapsingMergeTree 表引擎会有更好的效果。

CollapsingMergeTree 表引擎

如果 ClickHouse 底层表引擎使用 CollapsingMergeTree,则可以通过 table.collapsing.field 参数来指定 Sign 字段。它的原理是通过发送内容相同,但符号(Sign)相反的消息,来实现旧数据的删除(抵消),以及新数据的插入。
此外,在生产环境中,一般会使用 ReplicatedCollapsingMergeTree,而 ReplicatedMergeTree 的自动去重可能会使得短期内多次写入到 ClickHouse 的数据被判断为重复数据,导致数据丢失。此时,可在建表(或者修改表)时,指定 replicated_deduplication_window=0,以关闭自动去重功能。
例如:
CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;
自动去重更多详情可参见 Data Replication

ClickHouse 分布式表的 sharding_key

创建分布式表时,语句中的 ENGINE = Distributed(cluster_name, database_name, table_name[, sharding_key]); 中 sharding_key 需为 Flink SQL 的 sink 表中的主键(Primary Key),以保证同一个 Primary Key 的记录写入到同一个节点中
各个参数的含义分别如下:
cluster_name:集群名称,与集群配置中的自定义名称相对应。
database_name:数据库名称。
table_name:表名称。
sharding_key:选填,用于分片的 key 值,在数据写入的过程中,分布式表会依据分片 key 的规则,将数据分布到各个节点的本地表。

写入本地表(高级内容)

如果将 local.read-write 参数设置为 true,则 Flink 可以直接写入本地表。
注意
启用写本地表功能后,吞吐量可能会大幅提升。但是如果后续 ClickHouse 集群发生扩容、缩容、节点替换时,可能造成数据分布不均匀甚至写入失败、数据无法更新。因此仅供高级用户使用。
如果表含有主键,且使用了 UPDATE 和 DELETE 语义,建议建表时使用 CollapsingMergeTree 表引擎。我们可以通过 table.collapsing.field 参数来指定 Sign 字段,并设置 sink.partition-strategyhash 以令相同主键的数据落在同一个 shard 上,并将 sink.partition-key 参数设置为主键字段(对于多字段的混合主键,则设置为第一个字段)。

关于 null 数据

若数据的某些字段可能为空,则需要在 ClickHouse 的建表语句(DDL)中,把字段声明改为 Nullable,否则会导致数据写入异常。
CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id ;

分区表写入优化

Oceanus 在写入到 Clickhouse 分区表时,如果 clickhouse 的分区定义中使用的函数在 Oceanus 的支持范围内(intHash32、toYYYYMM、toYYYYMMDD),写入时默认会开启按分区预先攒 buffer 数据再写入的功能。启用后,Oceanus 单批次写入数据会包含尽量少的分区数量(业务分区数据吞吐足够大时,每个批次仅包含一个分区),从而提高 ClickHouse 的 merge 性能。如果单个分区数据不够批大小时,多个分区数据合并成一个批次写入 ClickHouse。详细的配置项可以参见上文中sink.max-partitions-per-insert 参数。

监控指标说明

Oceanus 为 ClickHouse Connector 增加了很多实用的统计指标。单击 Flink UI 的运行图中的 ClickHouse Sink 算子,即可搜索并查看指标:
numberOfInsertRecords:获取输出的 +I 消息数。
numberOfDeleteRecords:获取输出的 -D 消息数。
numberOfUpdateBeforeRecords:获取输出的 -U 消息数。
numberOfUpdateAfterRecords:获取输出的 +U 消息数。

示例:ClickHouse 建表语句

支持 UPDATE、DELETE 的 CollapsingMergeTree 建表语句

-- 创建数据库
CREATE DATABASE test ON cluster default_cluster;

-- 创建本地表
CREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;

-- 基于本地表,创建分布式表
CREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);

仅包含 INSERT 的普通 MergeTree 建表语句

-- 创建数据库
CREATE DATABASE test ON cluster default_cluster;

-- 创建本地表
CREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64) ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}') ORDER BY id SETTINGS replicated_deduplication_window = 0;

-- 基于本地表,创建分布式表
CREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);