前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse系列--消费kafka数据

ClickHouse系列--消费kafka数据

作者头像
IT云清
发布2021-12-06 14:16:37
9120
发布2021-12-06 14:16:37
举报
文章被收录于专栏:IT云清IT云清

1.使用方式

主要是使用ClickHouse的表引擎。

代码语言:javascript
复制
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]

kafka_broker_list :逗号分隔的brokers地址 (localhost:9092). kafka_topic_list :Kafka 主题列表,多个主题用逗号分隔. kafka_group_name :消费者组. kafka_format – Message format. 比如JSONEachRow、JSON、CSV等等

2.示例

2.1在kafka中创建user_behavior主题,并向该主题写入数据,数据示例为:

代码语言:javascript
复制
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}

在ClickHouse中创建表,选择表引擎为Kafka(),如下:

代码语言:javascript
复制
 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT '用户id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品类id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '时间戳'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;
-- 查询
cdh04 :) select * from kafka_user_behavior ;

-- 再次查看数据,发现数据为空
cdh04 :) select count(*) from kafka_user_behavior;

SELECT count(*)
FROM kafka_user_behavior

┌─count()─┐
│       0 │
└─────────┘

2.2通过物化视图将kafka数据导入ClickHouse

当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。

  • 首先创建一张Kafka表引擎的表,用于从Kafka中读取数据
  • 然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用
  • 最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中
代码语言:javascript
复制
--  创建Kafka引擎表
 CREATE TABLE kafka_user_behavior_src (
    user_id UInt64 COMMENT '用户id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品类id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '时间戳'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;

-- 创建一张终端用户使用的表
 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT '用户id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品类id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '时间戳'
  ) ENGINE = MergeTree()
    ORDER BY user_id
;
-- 创建物化视图,同步数据
CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
    AS SELECT * FROM kafka_user_behavior_src ;
-- 查询,多次查询,已经被查询的数据依然会被输出
cdh04 :) select * from kafka_user_behavior;

Note: Kafka消费表不能直接作为结果表使用。Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。

这里还有一个疑问: 在众多资料中,kafka示例消息都是最简单的json格式,如果消息格式是复杂类型呢?是否支持?

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-05-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.使用方式
  • 2.示例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档