DataX二次开发小记

本文为个人理解,如有不对之处,欢迎指正。

前言

之前,工作中使用datax作为数据交换组件。也简单的介绍了下datax和源码的基本导读。具体参见:

数据同步工具DataX概述

。数据开发平台在数据交换同步上,从sqoop、kettle等工具,慢慢地向datax并拢。

挑战

datax的扩展性很好,插件式安装配置。在实际使用中,往往针对实际的场景需要定制自己的读或写插件。关于如何编写插件,datax官网上也做了阐述,这里就不在赘述。详细参见:GitHub。

目标

拿项目中的一个需求:实现mysql的增量数据同步到hive来阐插件开发过程。

思路

数据变更来源:mysql变更,通过收集binlog日志,同步到kafka中进行处理。

目标数据源:hive增量分区表,通过Hbase作为中间表处理数据变更,然后再同步到hive。

数据流走向

插件定制

开发从kafka读插件。

开发kafka到hbase到写插件。

定制hbase读插件。

开发hive写插件。

为什么?见下文

插件说明

kafka读插件

由于mysql的binlog会写入到kafka中,所以数据来源需要增加一个可以从kafka中读取的插件。

插件json定义

kafka写hbase插件

由于数据格式定制化,从公司的kafka中读取pb序列化的数据。需要解析数据加工处理。因此,在写插件中

方法中,读取记录处理过程

进行了定制处理。

hbase读插件

针对分库分表的情况,从kafka读取出来的消息存储与hbase中, 的格式为。所以同步同一张mysql表,hbase的rowkey可能会出现多组。如果是每天同步,可能还会落到不同的表中。这就需要hbase读插件支持多组table,多组rowkey处理。

原始插件json格式

定制版插件json定制

对比,主要的变化就是从单表变为多表数组配置项。

后续,讲以此插件的定义过程,讲解插件开发思路。

hive写插件

由于要支持hive分区处理。所以,原生的datax实现hive的读、写,底层原理是通过直接操作hdfs的方式处理的(使用hdfsreader、hdfswriter)。

这样,就hive表的分区信息一无所知。因此,这里采用作为操作hive数据的接口。关于简单说明:

HCatalog屏蔽了底层数据存储的位置格式等信息,为上层计算处理流程提供统一的、共享的metadata。并且将数据以表的形式呈现给用户(如Pig,MR,Hive,Streaming..),用户只需提供表名就可以访问底层数据,并不需要关心底层数据的位置,模式等信息。

插件josn定义

进入主题

插件开发过程

针对上述定制插件中的hbasereader进行讲述。针对读写插件,总结下来就是。

读写组件

一个配置

就是插件的json配置,最终在代码层次上会抽象为对象。这是任务执行的依据。

2个对象

job:作业信息载体。

task:作业执行载体。

核心方法

Job.split

这是一个任务切分处理逻辑,最终会讲总的json,拆分成最小的执行单元配置传递给task。

针对此定制插件,就是通过split方法,将拆分,退化为最原始插件的配置形式(单个table,单组rowkey)。最终,针对task而言,执行配置处理逻辑也不变。

Task.startRead

这个是读记录过程,也是读插件的核心。经过Job的split处理后,对于task的Configuration处理过程也是和原始的一样。这里要做的就是:是否需要对读记录进行二次处理加工

其他

之后,就会将task提交个任务执行容器框架去处理。另外,如果需要统计处理,也可以在中调用任务收集器进行统计数据收集,如:脏数据条数、过滤数据条数等。

结语

此次开发定制,深入到插件层,对插件的数据走向有了深入了解。同时也对很多组件(kafka、hbase、hive等)有了了解。

实践出真知

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181018G023YY00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券