大家好,我是一哥,Doris成为MPP数据库新贵。Doris起源于百度,致力于满足企业用户的多种数据分析场景,支持多种数据模型(明细表, 聚合表), 多种导入方式(批量), 可整合和接入多种现有系统(Spark, Flink, Hive, ElasticSearch)。
今天分享一下Doris的例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能,当前仅支持从 Kafka 系统进行例行导入。
01
基本原理
Routine Load 的基本原理:
02
使用场景
Routine Load 在数据仓库中主要有两种应用场景:
03
应用案例
实时接入kafka数据目前是有一些使用限制:
例行导入的操作其实很简单,看再多的说明,不如一个案例来得实际。接下来直接给出两个导入案例。
案例一:数据源源不断的写入,不作更新或者删除操作
CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_cdc_st_entry_detail_et ON ods_drp_cdc_st_entry_detail_et
COLUMNS(ACCOUNT_LINE_ID, update_time,cdc_op,cdc_time=now())
PROPERTIES
(
"desired_concurrent_number"="1",
"max_batch_interval" = "20",
"max_batch_rows" = "200000",
"max_batch_size" = "104857600",
"strict_mode" = "false",
"strip_outer_array" = "true",
"format" = "json",
"json_root" = "$.data",
"jsonpaths" = "[\"$.ACCOUNT_LINE_ID\",\"$.update_time\",\"$.type\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",
"kafka_topic" = "drds_hana_ods_st_entry_detail_et",
"kafka_partitions" = "0",
"kafka_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "ods_drp_st_entry_detail_et",
"property.client.id" = "doris"
);
案例二:接口数据存在删除和更新操作
首先需要设置目标表为支持批量删除的模式:
ALTER TABLE ods_drp.ods_drp_vip_weixin ENABLE FEATURE "BATCH_DELETE";
然后创建导入任务:
CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_vip_weixin ON ods_drp_vip_weixin
WITH MERGE
COLUMNS(rec_id, vip_user_id, vip_id, vip_code, tel, vip_source, openid, unionid, appid, brand_code, create_user_name, create_user, create_time, modify_user_name, modify_user, modify_time, version, update_time,CDC_OP),
DELETE ON CDC_OP="DELETE"
PROPERTIES
(
"desired_concurrent_number"="1",
"max_batch_interval" = "20",
"max_batch_rows" = "200000",
"max_batch_size" = "104857600",
"strict_mode" = "false",
"strip_outer_array" = "true",
"format" = "json",
"json_root" = "$.data",
"jsonpaths" = "[\"$.rec_id\",\"$.vip_user_id\",\"$.vip_id\",\"$.vip_code\",\"$.tel\",\"$.vip_source\",\"$.openid\",\"$.unionid\",\"$.appid\",\"$.brand_code\",\"$.create_user_name\",\"$.create_user\",\"$.create_time\",\"$.modify_user_name\",\"$.modify_user\",\"$.modify_time\",\"$.version\",\"$.update_time\",\"$.type\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",
"kafka_topic" = "drds_hana_ods_vip_weixin",
"kafka_partitions" = "0",
"kafka_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "ods_drp_vip_weixin",
"property.client.id" = "doris"
);
其中,前面的PROPERTIES括号里面存放的是加载的配置信息,KAFKA后面的括号里面存放的是KAFKA的配置信息。
创建完 Routine Load 任务以后, Routine Load 会在后台持续运行。为了监控和检查 Routine Load 任务状态,我们需要进入对应的数据库schema下执行命令查看任务。
show routine load; --用于显示所有的例行导入任务状态
pause routine load for xxx; --暂停xxx导入任务
resume routine load for xxx; --重启xxx导入任务
stop routine load for xxx; --停止xxx导入任务,停止以后任务会从队列中消失
ALTER ROUTINE LOAD FOR XX
PROPERTIES
(
"desired_concurrent_number" = "1"
)
FROM kafka
(
"kafka_partitions" = "0",
"kafka_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "xxx_topic",
"property.client.id" = "doris"
); --修改任务参数,从kafka队列最开始重新读取
end