DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
此前已经开源DataX1.0版本,此次介绍为阿里云开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
DRDS | √ | √ | 读 、写 | |
达梦 | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
ADS | √ | 写 | ||
OSS | √ | √ | 读 、写 | |
OCS | √ | √ | 读 、写 | |
NoSQL数据存储 | OTS | √ | √ | 读 、写 |
Hbase0.94 | √ | √ | 读 、写 | |
Hbase1.1 | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 |
DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。详情请看:DataX数据源指南
DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
datax使用插件式开发,官方参考文档如下:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
描述:streaming reader—>streaming writer (官网例子)
[root@hadoop01 home]# cd /usr/local/datax/
[root@hadoop01 datax]# vi ./job/first.json
内容如下:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
运行job:
[root@hadoop01 datax]# python ./bin/datax.py ./job/first.json
运行结果如下:
…(省略很多)
描述:mysql reader----> hdfs writer
[root@hadoop01 datax]# vi ./job/mysql2hdfs.json
内容如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"user_id",
"user_name",
"trade_time"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://master:3306/test"],
"table": ["user"]
}
],
"password": "root",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hd/",
"hadoopConfig":{
"dfs.nameservices": "hd",
"dfs.ha.namenodes.hd": "n1,n2",
"dfs.namenode.rpc-address.hd.n1": "master:9000",
"dfs.namenode.rpc-address.hd.n2": "hdp01:9000",
"dfs.client.failover.proxy.provider.hd":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"fileType": "orc",
"path": "/datax/mysql2hdfs/mytest",
"fileName": "m2h01",
"column":[
{
"name": "user_id",
"type": "BIGINT"
},
{
"name": "user_name",
"type": "string"
},
{
"name": "trade_time",
"type": "DATE"
}
],
"writeMode": "nonConflict",
"fieldDelimiter": "\t",
"compress":"NONE"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
注:
运行前,需提前创建好输出目录:
[root@hadoop01 datax]# hdfs dfs -mkdir -p /datax/mysql2hdfs/orcfull
运行job:
[root@hadoop01 datax]# python ./bin/datax.py ./job/mysql2hdfs.json
运行结果如下:
然后建表看一下
"fileType": "orc"
"fieldDelimiter": "\t"
文件类型是orc
create table orcfull(
id bigint,
name string
)
row format delimited fields terminated by "\t"
stored as orc
location '/datax/mysql2hdfs/orcfull';
描述:hdfs reader----> mysql writer
[root@hadoop01 datax]# vi ./job/hdfs2mysql.json
内容如下:
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/datax/mysql2hdfs/mytest/*",
"defaultFS": "hdfs://hd/",
"hadoopConfig":{
"dfs.nameservices": "hd",
"dfs.ha.namenodes.hd": "n1,n2",
"dfs.namenode.rpc-address.hd.n1": "master:9000",
"dfs.namenode.rpc-address.hd.n2": "hdp01:9000",
"dfs.client.failover.proxy.provider.hd": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"column":[
{
"index": "0",
"type": "LONG"
},
{
"index": "1",
"type": "string"
},
{
"index": "2",
"type": "DATE"
}
],
"fileType": "orc",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"user_id",
"user_name",
"trade_time"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/test",
"table": ["user1"]
}
],
"password": "root",
"username": "root"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
注:
运行前,需提前创建好输出的stu1表:
CREATE TABLE `stu1` (
'user_id' int(11) DEFAULT NULL,
'user_name' varchar(32) DEFAULT NULL
'trade_time' varchar(32) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
运行job:
[root@hadoop01 datax]# python ./bin/datax.py ./job/hdfs2mysql.json
运行结果如下:
注意,列的类型,如果名称或者类型不对会有错误,我这儿采用所有列读写。
到此为止,dataX的学习就暂时结束。