有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

关于 DataX

DataX 是阿里开源的通用离线数据导入工具,在业界有比较广泛的使用。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。 使用 Datax 导入数据到 Doris 需要使用 DataX doriswriter 插件,这个插件是利用 Doris 的 Stream Load 功能进行数据导入的,需要配合 DataX 服务一起使用。

使用手册

DataX doriswriter 插件代码在 这里。这里包含插件代码以及 DataX 项目的开发环境。 doriswriter 插件依赖 DataX 代码中的一些模块。而这些模块并没有在 Maven 官方仓库中。所以我们在开发 doriswriter 插件时,需要下载完整的 DataX 代码库,才能进行插件的编译和开发。

目录结构

doriswriter/

这个目录是 doriswriter 插件的代码目录。这个目录中的所有代码,都托管在 Apache Doris 的代码库中。doriswriter 插件帮助文档在:doriswriter/doc

init-env.sh

这个脚本主要用于构建 DataX 开发环境,主要进行了以下操作:
1. 将 DataX 代码库 clone 到本地。
2. doriswriter/ 目录软链到 DataX/doriswriter 目录。
3. DataX/pom.xml 文件中添加 <module>doriswriter</module> 模块。
4. DataX/core/pom.xml 文件中的 httpclient 版本从 4.5 改为 4.5.13。
说明
httpclient v4.5 在处理 307 转发时有 bug。
这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中,方便开发者提交代码。

编译

1. 运行 init-env.sh
2. 按需修改 DataX/doriswriter 中的代码。
3. 编译 doriswriter:
3.1 单独编译 doriswriter 插件:
mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests
3.2 编译整个 DataX 项目:
mvn package assembly:assembly -Dmaven.test.skip=true
产出在 target/datax/datax/.
说明
hdfsreader,hdfswriter 和 oscarwriter 这三个插件需要额外的 jar 包。如果您并不需要这些插件,可以在 DataX/pom.xml 中删除这些插件的模块。
3.3 编译错误 如遇到如下编译错误:
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT ...
可尝试以下方式解决:
3.3.2 解压后,将得到的 alibaba/datax/ 目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/ 下。
3.3.3 再次尝试编译。
3.3.4 按需提交修改。

示例

Stream 读取数据后导入至 Doris

该示例插件的使用说明请参阅 这里

Mysql 读取数据后导入Doris

1. Mysql 表结构
CREATE TABLE `t_test`(
`id`bigint(30) NOT NULL,
`order_code` varchar(30) DEFAULT NULL COMMENT '',
`line_code` varchar(30) DEFAULT NULL COMMENT '',
`remark` varchar(30) DEFAULT NULL COMMENT '',
`unit_no` varchar(30) DEFAULT NULL COMMENT '',
`unit_name` varchar(30) DEFAULT NULL COMMENT '',
`price` decimal(12,2) DEFAULT NULL COMMENT '',
PRIMARY KEY(`id`) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='';
2. Doris 表结构
CREATE TABLE `ods_t_test` (
`id`bigint(30) NOT NULL,
`order_code` varchar(30) DEFAULT NULL COMMENT '',
`line_code` varchar(30) DEFAULT NULL COMMENT '',
`remark` varchar(30) DEFAULT NULL COMMENT '',
`unit_no` varchar(30) DEFAULT NULL COMMENT '',
`unit_name` varchar(30) DEFAULT NULL COMMENT '',
`price` decimal(12,2) DEFAULT NULL COMMENT ''
)ENGINE=OLAP
UNIQUE KEY(`id`, `order_code`)
DISTRIBUTED BY HASH(`order_code`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
3. 创建 datax 脚本import_t_test.json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxx",
"password": "xxx",
"column": ["id","order_code","line_code","remark","unit_no","unit_name","price"],
"connection": [ { "table": [ "t_test" ], "jdbcUrl": [ "jdbc:mysql://10.10.10.1:3306/demo" ] } ] }
},
"writer": {
"name": "doriswriter",
"parameter": {
"feLoadUrl": ["127.0.0.1:8030","127.0.0.2:8030"],
"beLoadUrl": ["127.0.0.3:8040","127.0.0.4:8040","127.0.0.5:8040"],
"jdbcUrl": "jdbc:mysql://127.0.0.1:9030/",
"database": "demo",
"table": "ods_t_test",
"column": ["id","order_code","line_code","remark","unit_no","unit_name","price"],
"username": "xxx",
"password": "xxx",
"postSql": [],
"preSql": [],
"loadProps": {
},
"maxBatchRows" : 300000,
"maxBatchByteSize" : 20971520
}
}
}
]
}
}
4. 执行 datax 任务:python datax.py import_t_test.json。具体用法参考 datax 官网