前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse整合Kafka(读数据)

ClickHouse整合Kafka(读数据)

原创
作者头像
程序猿梦工厂
修改2020-12-23 18:04:21
2K2
修改2020-12-23 18:04:21
举报
文章被收录于专栏:程序猿梦工厂程序猿梦工厂

本篇文章我们主要讲解ClickHouse读取Kafka数据的实例。

重读Kafka数据


默认从Kafka Topic的开始位置开始,并在到达消息时对其进行读取。这是正常的方式,但是有时重新读取消息很有用。例如,您可能想在修复架构中的错误或重新加载备份后重新读取消息。幸运的是,这很容易做到。我们只是在消费者组中重置偏移量。

  • 假设我们丢失了读数表中的所有消息,并希望从Kafka重新加载它们。首先,让我们使用TRUNCATE命令重载数据。
代码语言:txt
复制
TRUNCATE TABLE kafka_readings;
  • 在重置分区上的偏移之前,我们需要关闭消息使用。通过在ClickHouse中分离kafka_readings_queue表来执行此操作,如下所示。
代码语言:txt
复制
DETACH TABLE kafka_readings_queue;
  • 接下来,使用以下Kafka命令在用于kafka_readings_queue表的使用者组中重置分区偏移量。

注意:改命令需要在Kafka中进行操作。

代码语言:txt
复制
kafka-consumer-groups --bootstrap-server kafka-cluster-001:9092,kafka-cluster-002:9092 \
 --topic test --group readings_consumer_group1 \
 --reset-offsets --to-earliest --execute
  • 登录到ClickHouse,重新连接kafka_readings_queue
代码语言:txt
复制
ATTACH TABLE kafka_readings_queue;

等待几秒钟,丢失的记录将被恢复。此时可以使用SELECT进行查询。

添加数据列


显示原始Kafka信息作为行通常很有用,Kafka表引擎也定义了虚拟列,以下更改数据表以显示Topic分区和偏移量的方法。

  • 分离Kafka表来禁用消息使用。不影响数据的生产
代码语言:txt
复制
DETACH TABLE kafka_readings_queue;
  • 依次执行以下SQL命令来更改目标表和实例化视图

注意:我们只是重新创建实例化视图,而我们更改了目标表,该表保留了现有数据。

代码语言:txt
复制
ALTER TABLE kafka_readings
  ADD COLUMN name String;
  • 删除并重新构建视图表
代码语言:txt
复制
DROP TABLE kafka_readings_view;

CREATE MATERIALIZED VIEW kafka_readings_distributed_view TO kafka_readings_distributed AS
SELECT id, platForm, appname, time, name
FROM kafka_readings_queue;
  • 重新连接kafka_readings_queue表来再次启用消息使用
代码语言:txt
复制
ATTACH TABLE readings_queue
  • 查询数据表信息
代码语言:txt
复制
select * from kafka_readings_view;
代码语言:txt
复制
Query id: 52a27f19-eab1-4932-bc13-51792557809d

┌─id─────────────────────────────┬─platForm──┬─appname─┬────────────────time─┬───name─┐
│ 20201123123337811-840028       │ kafka     │         │ 2020-11-23 12:33:42 │  test  │
└────────────────────────────────┴───────────┴─────────┴─────────────────────┴────────┘

1 rows in set. Elapsed: 0.003 sec. Processed 2.51 thousand rows, 176.96 KB (985.20 thousand rows/s., 69.38 MB/s.)

注意:kafka源数据中需要包含新的字段列,否则数据就是null

消息格式更改时升级架构的方法不变。同样,物化视图提供了一种非常通用的方式来使Kafka消息适应目标表数据。您甚至可以定义多个实例化视图,以将消息流拆分到不同的目标表中。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 重读Kafka数据
  • 添加数据列
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档