关于 DataX
DataX 是阿里开源的通用离线数据导入工具,在业界有比较广泛的使用。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
使用 Datax 导入数据到 Doris 需要使用 DataX doriswriter 插件,这个插件是利用 Doris 的 Stream Load 功能进行数据导入的,需要配合 DataX 服务一起使用。
使用手册
DataX doriswriter 插件代码在 这里。这里包含插件代码以及 DataX 项目的开发环境。
doriswriter 插件依赖 DataX 代码中的一些模块。而这些模块并没有在 Maven 官方仓库中。所以我们在开发 doriswriter 插件时,需要下载完整的 DataX 代码库,才能进行插件的编译和开发。
目录结构
doriswriter/
这个目录是 doriswriter 插件的代码目录。这个目录中的所有代码,都托管在 Apache Doris 的代码库中。doriswriter 插件帮助文档在:
doriswriter/doc
init-env.sh
这个脚本主要用于构建 DataX 开发环境,主要进行了以下操作:
1. 将 DataX 代码库 clone 到本地。
2. 将
doriswriter/
目录软链到 DataX/doriswriter
目录。3. 在
DataX/pom.xml
文件中添加 <module>doriswriter</module>
模块。4. 将
DataX/core/pom.xml
文件中的 httpclient 版本从 4.5 改为 4.5.13。说明
httpclient v4.5 在处理 307 转发时有 bug。
这个脚本执行后,开发者就可以进入
DataX/
目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter
目录中文件的修改,都会反映到 doriswriter/
目录中,方便开发者提交代码。编译
1. 运行
init-env.sh
。2. 按需修改
DataX/doriswriter
中的代码。3. 编译 doriswriter:
3.1 单独编译 doriswriter 插件:
mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests
3.2 编译整个 DataX 项目:
mvn package assembly:assembly -Dmaven.test.skip=true
产出在
target/datax/datax/
.说明
hdfsreader,hdfswriter 和 oscarwriter 这三个插件需要额外的 jar 包。如果您并不需要这些插件,可以在
DataX/pom.xml
中删除这些插件的模块。3.3 编译错误
如遇到如下编译错误:
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT ...
可尝试以下方式解决:
3.3.1 下载 alibaba-datax-maven-m2-20210928.tar.gz。
3.3.2 解压后,将得到的
alibaba/datax/
目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/
下。3.3.3 再次尝试编译。
3.3.4 按需提交修改。
示例
Stream 读取数据后导入至 Doris
Mysql 读取数据后导入Doris
1. Mysql 表结构
CREATE TABLE `t_test`(`id`bigint(30) NOT NULL,`order_code` varchar(30) DEFAULT NULL COMMENT '',`line_code` varchar(30) DEFAULT NULL COMMENT '',`remark` varchar(30) DEFAULT NULL COMMENT '',`unit_no` varchar(30) DEFAULT NULL COMMENT '',`unit_name` varchar(30) DEFAULT NULL COMMENT '',`price` decimal(12,2) DEFAULT NULL COMMENT '',PRIMARY KEY(`id`) USING BTREE)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='';
2. Doris 表结构
CREATE TABLE `ods_t_test` (`id`bigint(30) NOT NULL,`order_code` varchar(30) DEFAULT NULL COMMENT '',`line_code` varchar(30) DEFAULT NULL COMMENT '',`remark` varchar(30) DEFAULT NULL COMMENT '',`unit_no` varchar(30) DEFAULT NULL COMMENT '',`unit_name` varchar(30) DEFAULT NULL COMMENT '',`price` decimal(12,2) DEFAULT NULL COMMENT '')ENGINE=OLAPUNIQUE KEY(`id`, `order_code`)DISTRIBUTED BY HASH(`order_code`) BUCKETS 1PROPERTIES ("replication_allocation" = "tag.location.default: 3","in_memory" = "false","storage_format" = "V2");
3. 创建 datax 脚本
import_t_test.json
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "xxx","password": "xxx","column": ["id","order_code","line_code","remark","unit_no","unit_name","price"],"connection": [ { "table": [ "t_test" ], "jdbcUrl": [ "jdbc:mysql://10.10.10.1:3306/demo" ] } ] }},"writer": {"name": "doriswriter","parameter": {"feLoadUrl": ["127.**.**.1:8030","127.**.**.2:8030"],"beLoadUrl": ["127.**.**.3:8040","127.**.**.4:8040","127.**.**.5:8040"],"jdbcUrl": "jdbc:mysql://127.**.**.1:9030/","database": "demo","table": "ods_t_test","column": ["id","order_code","line_code","remark","unit_no","unit_name","price"],"username": "*****","password": "*****","postSql": [],"preSql": [],"loadProps": {},"maxBatchRows" : 300000,"maxBatchByteSize" : 20971520}}}]}}
4. 执行 datax 任务:python datax.py import_t_test.json。具体用法参考 datax 官网。