有奖捉虫:行业应用 & 管理与支持文档专题 HOT
本文介绍如何从 Kafka 中实时消费数据到云数据仓库 TCHouse-C。

前提条件

数据源 Kafka 集群和目的端云数据仓库 TCHouse-C 集群必须在同一个 VPC 下。

操作步骤

1. 登录 云数据仓库 TCHouse-C 集群,创建 Kafka 消费表。
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1,
kafka_max_block_size = 65536,
kafka_skip_broken_messages = 0,
kafka_auto_offset_reset = 'latest';
常用参数说明如下:
名称
是否必选
说明
kafka_broker_list
Kafka 服务的 broker 列表,用逗号分隔,这里建议用 Ip:port, 不要用域名(可能存在 DNS 解析问题)。
kafka_topic_list
Kafka topic,多个 topic 用逗号分隔。
kafka_group_name
Kafka 的消费组名称。
kafka_format
Kafka 数据格式
kafka_row_delimiter
行分隔符,用于分割不同的数据行。默认为“\\n”,您也可以根据数据写入的实际分割格式进行设置。
kafka_num_consumers
单个 Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应 topic 的 partitions 总数。
kafka_max_block_size
Kafka Engine 批量从 Kafka 读取数据的行数。
kafka_skip_broken_messages
表示忽略解析异常的 Kafka 数据的条数。如果出现了 N 条异常后,后台线程结束 默认值为0。
kafka_commit_every_batch
执行 Kafka commit 的频率,取值如下:0:完全写入一整个Block数据块的数据后才执行commit;1:每写完一个Batch批次的数据就执行一次commit。
kafka_auto_offset_reset
从哪个 offset 开始读取 Kafka 数据。取值范围:earlist,latest。
2. 创建 TCHouse-C 本地表(目标表)。
如果您的集群是单副本版:
CREATE TABLE daily on cluster default_cluster
(
  day Date,
   level String,
  total UInt64
)
engine = SummingMergeTree()
order by int_id;
如果您的集群是双副本版:
create table daily on cluster default_cluster
(
  day Date,
   level String,
  total UInt64
)
engine = ReplicatedSummingMergeTree('/clickhouse/tables/test/test/{shard}', '{replica}')
order by int_id;
创建分布式表:
create table daily_dis on cluster default_cluster
AS test.test
engine = Distributed('default_cluster', 'default', 'daily', rand());
3. 创建物化视图,把 Kafka 消费表消费到的数据同步到目的表。
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
4. 查询。
SELECT level, sum(total) FROM daily GROUP BY level;

其他

如果要停止接收主题数据或更改转换逻辑,可以进行 detach 和 attach 视图操作。
DETACH TABLE consumer;
ATTACH TABLE consumer;