前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数仓实战|实时同步Kafka数据到Doris

数仓实战|实时同步Kafka数据到Doris

作者头像
数据社
发布2021-08-27 16:29:31
4.2K0
发布2021-08-27 16:29:31
举报
文章被收录于专栏:数据社数据社

大家好,我是一哥,Doris成为MPP数据库新贵。Doris起源于百度,致力于满足企业用户的多种数据分析场景,支持多种数据模型(明细表, 聚合表), 多种导入方式(批量), 可整合和接入多种现有系统(Spark, Flink, Hive, ElasticSearch)。

今天分享一下Doris的例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能,当前仅支持从 Kafka 系统进行例行导入。

01

基本原理

Routine Load 的基本原理:

  1. Client 向 FE 提交一个例行导入作业
  2. FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行
  3. 在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报
  4. FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入

02

使用场景

Routine Load 在数据仓库中主要有两种应用场景:

  1. 接口数据导入。由于批处理抽取数据存在大量重复抽取的情况,越来越多的交易系统采用binlog或者直接提供接口更新数据到Kafka的方式来完成接口数据的对接。针对binlog日志或者Kafka消息队列,批处理程序是无法抽取的,所以需要采用流式数据写入。
  2. 实时数仓结果数据导入。根据Lambda架构,实时数据通过Kafka对接以后,继续经由Flink加工,加工完的数据继续写回Kafka,然后由Routine Load加载到Doris数据库,即可直接供数据分析应用读取数据。这种应用场景还不太成熟。

03

应用案例

实时接入kafka数据目前是有一些使用限制:

  1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
  2. 支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。
  3. 仅支持 Kafka 0.10.0.0(含) 以上版本。

下面讲解两个例行导入任务案例:

例行导入的操作其实很简单,看再多的说明,不如一个案例来得实际。接下来直接给出两个导入案例。

案例一:数据源源不断的写入,不作更新或者删除操作

代码语言:javascript
复制
CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_cdc_st_entry_detail_et ON ods_drp_cdc_st_entry_detail_et 
COLUMNS(ACCOUNT_LINE_ID, update_time,cdc_op,cdc_time=now()) 
    PROPERTIES
    (
    "desired_concurrent_number"="1",
    "max_batch_interval" = "20",
    "max_batch_rows" = "200000",
    "max_batch_size" = "104857600",
    "strict_mode" = "false",
    "strip_outer_array" = "true",
    "format" = "json",
    "json_root" = "$.data",
    "jsonpaths" = "[\"$.ACCOUNT_LINE_ID\",\"$.update_time\",\"$.type\"]"
     )
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",
    "kafka_topic" = "drds_hana_ods_st_entry_detail_et",
    "kafka_partitions" = "0",
    "kafka_offsets" = "OFFSET_BEGINNING",
    "property.group.id" = "ods_drp_st_entry_detail_et",
    "property.client.id" = "doris"
);

案例二:接口数据存在删除和更新操作

首先需要设置目标表为支持批量删除的模式:

代码语言:javascript
复制
ALTER TABLE ods_drp.ods_drp_vip_weixin ENABLE FEATURE "BATCH_DELETE";

然后创建导入任务:

代码语言:javascript
复制
CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_vip_weixin ON ods_drp_vip_weixin
WITH MERGE
COLUMNS(rec_id, vip_user_id, vip_id, vip_code, tel, vip_source, openid, unionid, appid, brand_code, create_user_name, create_user, create_time, modify_user_name, modify_user, modify_time, version, update_time,CDC_OP),
DELETE ON CDC_OP="DELETE"
    PROPERTIES
    (
    "desired_concurrent_number"="1",
    "max_batch_interval" = "20",
    "max_batch_rows" = "200000",
    "max_batch_size" = "104857600",
    "strict_mode" = "false",
    "strip_outer_array" = "true",
    "format" = "json",
    "json_root" = "$.data",
    "jsonpaths" = "[\"$.rec_id\",\"$.vip_user_id\",\"$.vip_id\",\"$.vip_code\",\"$.tel\",\"$.vip_source\",\"$.openid\",\"$.unionid\",\"$.appid\",\"$.brand_code\",\"$.create_user_name\",\"$.create_user\",\"$.create_time\",\"$.modify_user_name\",\"$.modify_user\",\"$.modify_time\",\"$.version\",\"$.update_time\",\"$.type\"]"
     )
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",
    "kafka_topic" = "drds_hana_ods_vip_weixin",
    "kafka_partitions" = "0",
    "kafka_offsets" = "OFFSET_BEGINNING",
    "property.group.id" = "ods_drp_vip_weixin",
    "property.client.id" = "doris"
);

其中,前面的PROPERTIES括号里面存放的是加载的配置信息,KAFKA后面的括号里面存放的是KAFKA的配置信息。

例行导入常用操作

创建完 Routine Load 任务以后, Routine Load 会在后台持续运行。为了监控和检查 Routine Load 任务状态,我们需要进入对应的数据库schema下执行命令查看任务。

代码语言:javascript
复制
show routine load; --用于显示所有的例行导入任务状态
pause routine load for xxx;  --暂停xxx导入任务
resume routine load for xxx;  --重启xxx导入任务
stop routine load for xxx;  --停止xxx导入任务,停止以后任务会从队列中消失
ALTER ROUTINE LOAD FOR XX
PROPERTIES
(
    "desired_concurrent_number" = "1"
)
FROM kafka
(
    "kafka_partitions" = "0",
    "kafka_offsets" = "OFFSET_BEGINNING",
    "property.group.id" = "xxx_topic",
    "property.client.id" = "doris"
);    --修改任务参数,从kafka队列最开始重新读取

end

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-08-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据社 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 下面讲解两个例行导入任务案例:
  • 例行导入常用操作
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档