前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse实战-ClickHouse整合Kafka

ClickHouse实战-ClickHouse整合Kafka

原创
作者头像
程序猿梦工厂
修改2020-12-18 15:02:29
3.1K0
修改2020-12-18 15:02:29
举报
文章被收录于专栏:程序猿梦工厂程序猿梦工厂

要将数据从Kafka主题读取到ClickHouse表,我们需要三件事:

  • 一个目标MergeTree表,以提供接收数据的宿主
  • Kafka引擎表,使主题看起来像ClickHouse表
  • 物化视图,可将数据自动从Kafka移动到目标表

创建存储消费数据表


创建kafka_readings用于接收Kafka的数据,登录到ClickHouse并执行以下SQL

代码语言:txt
复制
CREATE TABLE kafka_readings (
    id String,
    platForm String,
    appname String,
    time DateTime
) Engine = MergeTree
PARTITION BY toYYYYMMDD(time)
ORDER BY (time);
  • MergeTree 指定创建表的引擎
  • PARTITION BY 指定我们的分区数据,我们使用时间转换为ymd格式
  • ORDER BY 指定我们的排序规则,当然也可以不指定

创建消费Kafka数据表


使用Kafka引擎创建一个表以连接到主题并读取数据。该引擎将使用消费主题test和消费者组test_consumer_group1从kafka的集群中读取数据。输入格式为JSONEachRow

请注意,我们省略了time列。这是目标表中的别名,将从time列自动填充。

登录到ClickHouse并执行以下SQL

代码语言:txt
复制
CREATE TABLE kafka_readings_queue (
    id String,
    platForm String,
    appname String,
    time DateTime
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-cluster-001:9092,kafka-cluster-002:9092',
       kafka_topic_list = 'test',
       kafka_group_name = 'readings_consumer_group1',
       kafka_format = 'JSONEachRow';
  • kafka_broker_list kafka消费集群的broker列表
  • kafka_topic_list 消费kafka的Topic
  • kafka_group_name kafka消费组
  • kafka_format 消费数据的格式化类型,当然还有其他格式的数据详见Formats for Input and Output Data
    • JSONEachRow表示每行一条数据的json格式。一般如果是json格式的话,设置JSONEachRow即可
    • 如果需要输入嵌套的json,请设置input_format_import_nested_json=1

创建物化视图合并表传输数据


我们已经创建了本地数据表和消费Kafka表,最后需要创建视图表方便把数据导入到ClickHouse,登录到ClickHouse并执行以下SQL

代码语言:txt
复制
CREATE MATERIALIZED VIEW kafka_readings_view TO kafka_readings AS
SELECT id, platForm, appname, time
FROM kafka_readings_queue;

测试各个数据表


我们使用以下SQL分别去测试查询数据

  • 查询kafka_readings表,会返回相关数据总数
代码语言:txt
复制
select count(1) from kafka_readings;
代码语言:txt
复制
SELECT count(1)
FROM kafka_readings

Query id: 25375f84-9271-4e7b-bf32-cf1ccadef78e

┌─count(1)─┐
│     8834 │
└──────────┘

1 rows in set. Elapsed: 0.003 sec.
  • 查询kafka_readings_queue表,会返回当前Kafka新增消费数据总数(连接kafka会有些慢)
代码语言:txt
复制
select count(1) from kafka_readings_queue;
代码语言:txt
复制
SELECT count(1)
FROM kafka_readings_queue

Query id: 420b4430-673e-411c-8db9-509fcc23feef

┌─count(1)─┐
│        0 │
└──────────┘

1 rows in set. Elapsed: 7.215 sec.

如果有新增数据的话那么这里的count(1)就是非0数据,这里的数会出现变化,根据新的数据而定

  • 查询kafka_readings_view表,一般得到的数据和kafka_readings相差无几,除非实时数据很多
代码语言:txt
复制
select count(1) from kafka_readings_view;
代码语言:txt
复制
SELECT count(1)
FROM kafka_readings_view

Query id: 302e3306-dc38-4668-9469-edd109430e39

┌─count(1)─┐
│     8861 │
└──────────┘

1 rows in set. Elapsed: 0.003 sec. Processed 8.86 thousand rows, 35.44 KB (3.13 million rows/s., 12.50 MB/s.)

创建分布式表


代码语言:txt
复制
CREATE TABLE kafka_readings_distributed ON CLUSTER mycluster_1
(
    id String,
    platForm String,
    appname String,
    time DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated/{shard}/kafka_readings_distributed', '{replica}')
PARTITION BY ymd
ORDER BY id

这里也可以去创建Distributed表,看情况而定

  • 创建视图的转换
代码语言:txt
复制
CREATE MATERIALIZED VIEW kafka_readings_distributed_view TO kafka_readings_distributed AS
SELECT id, platForm, appname, time
FROM kafka_readings_queue;

此时去查询该表数据即可。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建存储消费数据表
  • 创建消费Kafka数据表
  • 创建物化视图合并表传输数据
  • 测试各个数据表
  • 创建分布式表
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档