前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据分析实战:kafka+clickhouse数据收集

数据分析实战:kafka+clickhouse数据收集

原创
作者头像
yield9tk
发布2022-04-20 23:26:04
1.5K0
发布2022-04-20 23:26:04
举报
文章被收录于专栏:小叶的coding记录

clickhouse是一款强大的数据仓库选择,不需要额外的依赖;兼容SQL,还提供了许多引擎。我们考虑使用,kafka作为分析数据的收集,各个服务节点只要向kafka发送数据,而无需关心数据的落地。

而后,需要用到clickhouse提供的kafka()表引擎,和物化视图进行落地数据。

简单实例

一个例子,包含kafka表,MergeTree数据表,以及物化视图。

1. 创建数据库

需要创建两个库,kafka库用来映射kafka的主题,product库保存实际的数据。

代码语言:sql
复制
CREATE DATABASE kafka;;

CREATE DATABASE product;;

2. kafka主题映射表

代码语言:sql
复制
DROP TABLE IF EXISTS kafka.item_int;;
CREATE TABLE kafka.item_int (
    `time_stamp` DateTime,
    `uid` Int64,
    `item_id` Int32,
    `extra` String
  
) ENGINE = Kafka(
  '172.31.1.1:9092,172.31.1.1:9093,172.31.1.1:9094',
  'item_int',
  'item_int',
  'CSV'
);;

选用kafka引擎,必要传入参数:

  • 第一个参数:kafka集群的地址
  • 第二个参数:消费的主题名
  • 第三个参数:消费组id,如果想多个主题数据顺序,需要设置一样的组id
  • 第四个参数:解析数据的格式,支持CSVJSONEachRow两种格式,默认都是要\n结束

3. 创建数据表

代码语言:sql
复制
CREATE TABLE product.item_int (
    `time_stamp` DateTime,
    `uid` Int64,
    `item_id` Int32,
    `extra` String
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time_stamp)
ORDER BY
  toYYYYMMDD(time_stamp) SETTINGS index_granularity = 8192;;

数据表是实际保存数据的表,kafka表只是一个数据的中转。

4. 物化视图监控更改

代码语言:sql
复制
CREATE MATERIALIZED VIEW product.item_int_load TO product.hxyx_item_int  AS
SELECT
  *
FROM
  kafka.item_int;;

使用虚拟列

kafka表提供了三个隐藏的虚拟列:

  • _topic: String, 消费的kafka主题名
  • _offset: UInt64, 消息的偏移量
  • _partition: UInt64, 消息消费的分区

使用也很简单,只需要再数据表中加入这三个字段:

代码语言:sql
复制
CREATE TABLE product.item_int (
    `_topic` String,
    `_offset` UInt64,
    `_partition` UInt64,
    `time_stamp` DateTime,
    `uid` Int64,
    `item_id` Int32,
    `extra` String
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time_stamp)
ORDER BY
  toYYYYMMDD(time_stamp) SETTINGS index_granularity = 8192;;

然后,修改物化视图即可:

代码语言:sql
复制
CREATE MATERIALIZED VIEW product.item_int_load TO product.hxyx_item_int AS
SELECT
  _topic,_offset,_partition,*
FROM
  kafka.item_int;;

暂停消费

在需要修改表结构时,需要暂停消费,不然会造成数据消费丢失(偏移量变了,而数据为落库)。方法只要detachkafka表就可以了:

代码语言:sql
复制
detach table kafka.item_int

重新启动:

代码语言:sql
复制
attach table kafka.item_int

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简单实例
    • 1. 创建数据库
      • 2. kafka主题映射表
        • 3. 创建数据表
          • 4. 物化视图监控更改
          • 使用虚拟列
          • 暂停消费
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档