前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse 实战笔记 第01期:Kafka 数据同步到 ClickHouse

ClickHouse 实战笔记 第01期:Kafka 数据同步到 ClickHouse

作者头像
数据库交流
发布2022-04-25 08:37:24
2.1K0
发布2022-04-25 08:37:24
举报
文章被收录于专栏:悦专栏

作者简介

马听,多年 DBA 实战经验,对 MySQL、 Redis、ClickHouse 等数据库有一定了解,专栏《一线数据库工程师带你深入理解 MySQL》、《Redis 运维实战》作者。

从这一节开始,将通过几期跟各位分享我的一些 ClickHouse 实战笔记。

这一期首先聊聊 Kafka 数据同步到 ClickHouse 的其中一个方案:通过 Kafka 引擎方式同步,下面进入实际操作过程(环境:CentOS7.4):

1 Kafka 基础环境搭建

因为主要是为了测试数据同步,因此 Kafka 只简单安装了单机版本。
1.1 安装 JDK
代码语言:javascript
复制
cd /usr/src

选择合适的 JDK 版本,并下载。

代码语言:javascript
复制
tar zxvf jdk-8u261-linux-x64.tar.gz
mv jdk1.8.0_261/ java

编辑 /etc/profile

代码语言:javascript
复制
vim /etc/profile

加入以下内容:

代码语言:javascript
复制
JAVA_HOME=/usr/src/java
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME PATH

执行

代码语言:javascript
复制
source /etc/profile
1.2 安装 kafka
代码语言:javascript
复制
cd /usr/src

在这里[http://archive.apache.org/dist/kafka/2.0.0/]选择合适的 kafka 版本,并下载。

代码语言:javascript
复制
tar zxvf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 kafka
1.3 启动 zk

cd /usr/src/kafka

./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

1.4 启动 kafka

nohup ./bin/kafka-server-start.sh config/server.properties &

1.5 创建 topics

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

1.6 查看 topics

./bin/kafka-topics.sh --list --zookeeper localhost:2181

1.7 产生消息

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

执行完上面命令后,会出现下面的窗口:

然后在 > 后面输入需要产生的消息,如下:

1.8 消费消息

另外开一个连接窗口,执行:

代码语言:javascript
复制
cd /usr/src/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

可以看到在 1.7 步骤生成的消息。

2 安装 ClickHouse

ClickHouse 单机版安装参考:https://clickhouse.tech/docs/zh/getting-started/install/

3 创建消费表

在 ClickHouse 上创建 kafka 消费表

登录 ClickHouse

clickhouse-client

进行建库建表操作:

代码语言:javascript
复制
create database kafka_data;
use kafka_data;

create table kafka_queue(id UInt32,code String,name String)engine =Kafka() settings kafka_broker_list = 'localhost:9092',kafka_topic_list='test',kafka_group_name='group1',kafka_format='JSONEachRow',kafka_skip_broken_messages=100;

注:

  • kafka_broker_list:kafka 的连接地址和端口。
  • kafka_topic_list:kafka 的 topic 名。
  • kafka_group_name:kafka 的组名。
  • kafka_format:表示用于解析消息的数据格式,消息发送端必须按此格式发送消息。
  • kafka_skip_broken_messages:当解析数据出现错误时,运行跳过失败的数据行数。

4 创建存储表

因为 Kafka 消费表不能直接作为结果表使用。Kafka 消费表只是用来消费Kafka数据,没有真正的存储所有数据,只要查询一次,数据就会清空。因此需要在 ClickHouse 中创建存储表保存数据。

在 ClickHouse 上创建存储表:

create table kafka_table(id UInt32,code String,name String) engine=MergeTree() order by id

5 创建数据同步视图

创建 view 把 kafka 消费表消费到的数据导入 ClickHouse 存储表:

代码语言:javascript
复制
create materialized view consumer to kafka_table as select id,code,name from kafka_queue

6 测试数据同步

代码语言:javascript
复制
/usr/src/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

输入:

代码语言:javascript
复制
{"id":2,"code":"two","name":"aa"}

确认 ClickHouse 存储表是否能正常获取到数据

代码语言:javascript
复制
select * from kafka_table

7 其他维护操作

停止数据同步,可以删除视图

代码语言:javascript
复制
drop table consumer

或者卸载

代码语言:javascript
复制
detach table consumer

卸载之后,如果想再次恢复,可以使用:

代码语言:javascript
复制
attach materialized view consumer to kafka_table(id UInt32,code String,name String)as select id,code,name from kafka_queue

8 存在的问题

通过 Kafka 引擎进行数据同步的方式尽管很方便,但是在实战过程中发现,Kafka 吐出来的数据不一定会是 {"id":2,"code":"two","name":"aa"} 这类格式,这种情况可以考虑使用另外一种方案:借助 Flume 实现 Kafka 到 CH 的同步,这个方案将在后续的文章中进行介绍。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 悦专栏 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Kafka 基础环境搭建
    • 因为主要是为了测试数据同步,因此 Kafka 只简单安装了单机版本。
      • 1.1 安装 JDK
        • 1.2 安装 kafka
          • 1.3 启动 zk
            • 1.4 启动 kafka
              • 1.5 创建 topics
                • 1.6 查看 topics
                  • 1.7 产生消息
                    • 1.8 消费消息
                    • 可以看到在 1.7 步骤生成的消息。
                    • 2 安装 ClickHouse
                    • 3 创建消费表
                    • 4 创建存储表
                    • 5 创建数据同步视图
                    • 6 测试数据同步
                    • 7 其他维护操作
                    相关产品与服务
                    云数据库 MySQL
                    腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档