本文为个人理解,如有不对之处,欢迎指正。
前言
之前,工作中使用datax作为数据交换组件。也简单的介绍了下datax和源码的基本导读。具体参见:
数据同步工具DataX概述
。数据开发平台在数据交换同步上,从sqoop、kettle等工具,慢慢地向datax并拢。
挑战
datax的扩展性很好,插件式安装配置。在实际使用中,往往针对实际的场景需要定制自己的读或写插件。关于如何编写插件,datax官网上也做了阐述,这里就不在赘述。详细参见:GitHub。
目标
拿项目中的一个需求:实现mysql的增量数据同步到hive来阐插件开发过程。
思路
数据变更来源:mysql变更,通过收集binlog日志,同步到kafka中进行处理。
目标数据源:hive增量分区表,通过Hbase作为中间表处理数据变更,然后再同步到hive。
数据流走向
插件定制
开发从kafka读插件。
开发kafka到hbase到写插件。
定制hbase读插件。
开发hive写插件。
为什么?见下文
插件说明
kafka读插件
由于mysql的binlog会写入到kafka中,所以数据来源需要增加一个可以从kafka中读取的插件。
插件json定义
kafka写hbase插件
由于数据格式定制化,从公司的kafka中读取pb序列化的数据。需要解析数据加工处理。因此,在写插件中
方法中,读取记录处理过程
进行了定制处理。
hbase读插件
针对分库分表的情况,从kafka读取出来的消息存储与hbase中, 的格式为。所以同步同一张mysql表,hbase的rowkey可能会出现多组。如果是每天同步,可能还会落到不同的表中。这就需要hbase读插件支持多组table,多组rowkey处理。
原始插件json格式
定制版插件json定制
对比,主要的变化就是从单表变为多表数组配置项。
后续,讲以此插件的定义过程,讲解插件开发思路。
hive写插件
由于要支持hive分区处理。所以,原生的datax实现hive的读、写,底层原理是通过直接操作hdfs的方式处理的(使用hdfsreader、hdfswriter)。
这样,就hive表的分区信息一无所知。因此,这里采用作为操作hive数据的接口。关于简单说明:
HCatalog屏蔽了底层数据存储的位置格式等信息,为上层计算处理流程提供统一的、共享的metadata。并且将数据以表的形式呈现给用户(如Pig,MR,Hive,Streaming..),用户只需提供表名就可以访问底层数据,并不需要关心底层数据的位置,模式等信息。
插件josn定义
进入主题
插件开发过程
针对上述定制插件中的hbasereader进行讲述。针对读写插件,总结下来就是。
读写组件
一个配置
就是插件的json配置,最终在代码层次上会抽象为对象。这是任务执行的依据。
2个对象
job:作业信息载体。
task:作业执行载体。
核心方法
Job.split
这是一个任务切分处理逻辑,最终会讲总的json,拆分成最小的执行单元配置传递给task。
针对此定制插件,就是通过split方法,将拆分,退化为最原始插件的配置形式(单个table,单组rowkey)。最终,针对task而言,执行配置处理逻辑也不变。
Task.startRead
这个是读记录过程,也是读插件的核心。经过Job的split处理后,对于task的Configuration处理过程也是和原始的一样。这里要做的就是:是否需要对读记录进行二次处理加工。
其他
之后,就会将task提交个任务执行容器框架去处理。另外,如果需要统计处理,也可以在中调用任务收集器进行统计数据收集,如:脏数据条数、过滤数据条数等。
结语
此次开发定制,深入到插件层,对插件的数据走向有了深入了解。同时也对很多组件(kafka、hbase、hive等)有了了解。
实践出真知
领取专属 10元无门槛券
私享最新 技术干货