前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse For Kafka

ClickHouse For Kafka

原创
作者头像
jasong
发布2022-06-17 11:58:18
3.1K2
发布2022-06-17 11:58:18
举报
文章被收录于专栏:ClickHouseClickHouse

为使用ClickHouse 消费Kafka 实时数据的同学提供一些参考

一 架构流程图:

可以看到ClickHouse 内置Kafka 消费引擎,不需要我们业务方写新的消费程序,再往ClickHouse 导入数据

二 前提条件:

  • 已创建Kafka集群,且在生产数据
  • 已创建云数据库 CDW-ClickHouse集群

三 使用限制:

Kafka集群和ClickHouse集群需要在同一VPC下。

四 操作步骤:

这里忽略Kafka 集群本身的一些操作,以上三个步骤是可以调整顺序的

  1. Kafka Table Engine: 在ClickHouse 内部创建Kafka消费表(这里可以理解为 消费了一部分Kafka 表的一个队列,存储消费Kafka Topic的一部分数据)
  2. MergeTree Table Engine: 在ClickHouse 内部创建 Kafka 数据存储表
  3. MATERIALIZED Table: 在ClickHouse 内部创建 Kafka 消费表,这里可以理解为它是一个搬运者,将 Kafka Table Engine 挪到 MergeTree Table Engine

五 操作步骤详解:

1 Kafka Table Engine
代码语言:javascript
复制
CREATE TABLE IF NOT EXISTS data_sync.test_queue(
  name String,
  age int,
  gongzhonghao String,
  my_time DateTime64(3, 'UTC')
) ENGINE = Kafka
SETTINGS
kafka_broker_list = '172.16.16.4:9092',
kafka_topic_list = 'lemonCode',
kafka_group_name = 'lemonNan',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
kafka_schema = '',
kafka_num_consumers = 1

名称

是否必选

说明

kafka_broker_list

Kafka 服务的 broker 列表,用逗号分隔,这里建议用 Ip:port, 不要用域名(可能存在 DNS 解析问题)。

kafka_topic_list

Kafka topic,多个 topic 用逗号分隔。

kafka_group_name

Kafka 的消费组名称。

kafka_format

Kafka 数据格式, ClickHouse 支持的 Format, 详见 文档 可选参数。

kafka_row_delimiter

行分隔符,用于分割不同的数据行。默认为“\n”,您也可以根据数据写入的实际分割格式进行设置。

kafka_num_consumers

单个 Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应 topic 的 partitions 总数。

kafka_max_block_size

Kafka 数据写入目标表的 Block 大小,超过该数值后,就将数据刷盘;单位:Byte,默认值为65536 Byte。

kafka_skip_broken_messages

表示忽略解析异常的 Kafka 数据的条数。如果出现了 N 条异常后,后台线程结束 默认值为0。

kafka_commit_every_batch

执行 Kafka commit 的频率,取值如下: 0:完全写入一整个Block数据块的数据后才执行commit; 1:每写完一个Batch批次的数据就执行一次commit。

kafka_auto_offset_reset

从哪个 offset 开始读取 Kafka 数据。取值范围:earlist,latest。

2 MergeTree Table Engine
代码语言:javascript
复制
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
3 MATERIABLIZED Table Engine
代码语言:javascript
复制
CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;

六 如何维护

1 停止消费Kafka 数据
代码语言:javascript
复制
DETACH TABLE cppla.kafka_readings_view ;
2 恢复消费Kafka 数据
代码语言:javascript
复制
ATTACH TABLE cppla.kafka_readings_view ;

七 分布式写入

1 写入ClickHouse 分布式表

2 Kafka Engine 消费不同分区

八 数据高可用方案

1 ClickHouse ReplicateMergeTree 内部机制保证:

2 ClickHouse 双写保证

九 更新

采用 MergeTree + MATERIALIZED + AggregatingMergeTree

1 MergeTree
代码语言:javascript
复制
CREATE TABLE base
(
 `i` Int64,
 `s` String,
 `v` DateTime64,
 `id` Int64 DEFAULT 0,
 `tag` String DEFAULT '',
 `level` Int32 DEFAULT 0
)
ENGINE = MergeTree 
PARTITION BY (i % 256)
ORDER BY (i, s)
2 Aggregating MergeTree
代码语言:javascript
复制
CREATE TABLE updating
(
 `i` Int64,
 `s` String,
 `version` AggregateFunction(max, DateTime64),
 `id` AggregateFunction(argMaxIf, Int64, DateTime64, UInt8),
 `tag` AggregateFunction(argMaxIf, String, DateTime64, UInt8),
 `level` AggregateFunction(argMaxIf, Int32, DateTime64, UInt8)
)
ENGINE = AggregatingMergeTree
PARTITION BY (i % 256)
ORDER BY (i, s)
3 MATERIALIZED
代码语言:javascript
复制
CREATE MATERIALIZED VIEW mv TO updating AS
SELECT
 i,
 s,
 maxState(v) AS version,
 argMaxIfState(id, v, id != 0) AS id,
 argMaxIfState(tag, v, tag != '') AS tag,
 argMaxIfState(level, v, level != 0) AS level
FROM base
GROUP BY (i, s)
4 查询
代码语言:javascript
复制
SELECT
 i,
 s,
 maxMerge(version),
 argMaxIfMerge(id),
 argMaxIfMerge(tag),
 argMaxIfMerge(level)
FROM updating
GROUP BY
 i,
 s

希望对阅读的您有所帮助

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一 架构流程图:
  • 二 前提条件:
  • 三 使用限制:
  • 四 操作步骤:
  • 五 操作步骤详解:
    • 1 Kafka Table Engine
      • 2 MergeTree Table Engine
        • 3 MATERIABLIZED Table Engine
        • 六 如何维护
          • 1 停止消费Kafka 数据
            • 2 恢复消费Kafka 数据
            • 七 分布式写入
              • 1 写入ClickHouse 分布式表
                • 2 Kafka Engine 消费不同分区
                • 八 数据高可用方案
                  • 1 ClickHouse ReplicateMergeTree 内部机制保证:
                    • 2 ClickHouse 双写保证
                    • 九 更新
                      • 1 MergeTree
                        • 2 Aggregating MergeTree
                          • 3 MATERIALIZED
                            • 4 查询
                            相关产品与服务
                            数据库
                            云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档