clickhouse是一款强大的数据仓库选择,不需要额外的依赖;兼容SQL
,还提供了许多引擎。我们考虑使用,kafka作为分析数据的收集,各个服务节点只要向kafka发送数据,而无需关心数据的落地。
而后,需要用到clickhouse提供的kafka()
表引擎,和物化视图进行落地数据。
一个例子,包含kafka
表,MergeTree
数据表,以及物化视图。
需要创建两个库,kafka
库用来映射kafka的主题,product
库保存实际的数据。
CREATE DATABASE kafka;;
CREATE DATABASE product;;
DROP TABLE IF EXISTS kafka.item_int;;
CREATE TABLE kafka.item_int (
`time_stamp` DateTime,
`uid` Int64,
`item_id` Int32,
`extra` String
) ENGINE = Kafka(
'172.31.1.1:9092,172.31.1.1:9093,172.31.1.1:9094',
'item_int',
'item_int',
'CSV'
);;
选用kafka
引擎,必要传入参数:
CSV
和JSONEachRow
两种格式,默认都是要\n
结束CREATE TABLE product.item_int (
`time_stamp` DateTime,
`uid` Int64,
`item_id` Int32,
`extra` String
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time_stamp)
ORDER BY
toYYYYMMDD(time_stamp) SETTINGS index_granularity = 8192;;
数据表是实际保存数据的表,kafka
表只是一个数据的中转。
CREATE MATERIALIZED VIEW product.item_int_load TO product.hxyx_item_int AS
SELECT
*
FROM
kafka.item_int;;
kafka
表提供了三个隐藏的虚拟列:
_topic
: String, 消费的kafka
主题名_offset
: UInt64, 消息的偏移量_partition
: UInt64, 消息消费的分区使用也很简单,只需要再数据表中加入这三个字段:
CREATE TABLE product.item_int (
`_topic` String,
`_offset` UInt64,
`_partition` UInt64,
`time_stamp` DateTime,
`uid` Int64,
`item_id` Int32,
`extra` String
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time_stamp)
ORDER BY
toYYYYMMDD(time_stamp) SETTINGS index_granularity = 8192;;
然后,修改物化视图即可:
CREATE MATERIALIZED VIEW product.item_int_load TO product.hxyx_item_int AS
SELECT
_topic,_offset,_partition,*
FROM
kafka.item_int;;
在需要修改表结构时,需要暂停消费,不然会造成数据消费丢失(偏移量变了,而数据为落库)。方法只要detach
kafka表就可以了:
detach table kafka.item_int
重新启动:
attach table kafka.item_int
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。