前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据-数据源离线同步工具DataX的使用

大数据-数据源离线同步工具DataX的使用

作者头像
码客说
发布2022-10-04 21:36:17
1.6K0
发布2022-10-04 21:36:17
举报
文章被收录于专栏:码客码客

前言

官方网址https://github.com/alibaba/DataX

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

93b7fc1c-6927-11e6-8cda-7cf8420fc65f
93b7fc1c-6927-11e6-8cda-7cf8420fc65f

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

设计理念

ec7e36f4-6927-11e6-8f5f-ffc43d6a468b
ec7e36f4-6927-11e6-8f5f-ffc43d6a468b

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

支持的数据类型

类型

数据源

Reader(读)

Writer(写)

文档

RDBMS 关系型数据库

MySQL

读 、写

Oracle

读 、写

OceanBase

读 、写

SQLServer

读 、写

PostgreSQL

读 、写

DRDS

读 、写

通用RDBMS(支持所有关系型数据库)

读 、写

阿里云数仓数据存储

ODPS

读 、写

ADS

OSS

读 、写

OCS

NoSQL数据存储

OTS

读 、写

Hbase0.94

读 、写

Hbase1.1

读 、写

Phoenix4.x

读 、写

Phoenix5.x

读 、写

MongoDB

读 、写

Hive

读 、写

Cassandra

读 、写

无结构化数据存储

TxtFile

读 、写

FTP

读 、写

HDFS

读 、写

Elasticsearch

时间序列数据库

OpenTSDB

TSDB

读 、写

TDengine

读 、写

DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

aa6c95a8-6891-11e6-94b7-39f0ab5af3b4
aa6c95a8-6891-11e6-94b7-39f0ab5af3b4

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。

DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

工具部署

环境

直接下载编译后的DataX工具包:DataX下载地址

下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

解压到目录

/data/tools/bigdata/datax3

配置环境变量

创建配置文件

代码语言:javascript
复制
vi /etc/profile.d/datax.sh

内容设置为

代码语言:javascript
复制
# DataX
export DATAX_HOME=/data/tools/bigdata/datax3
export PATH=$PATH:$DATAX_HOME/bin

配置生效

代码语言:javascript
复制
source /etc/profile

查看是否生效

代码语言:javascript
复制
echo $DATAX_HOME

运行方式

代码语言:javascript
复制
cd $DATAX_HOME/bin/
python datax.py YOUR_JOB.json

自检脚本:

代码语言:javascript
复制
python $DATAX_HOME/bin/datax.py $DATAX_HOME/job/job.json

定时任务

crontab 命令

  • -l 在标准输出上显示当前的crontab。
  • -r 删除当前的crontab文件。
  • -e 使用VISUAL或者EDITOR环境变量所指的编辑器编辑当前的crontab文件。当结束编辑离开时,编辑后的文件将自动安装。

格式如下:

代码语言:javascript
复制
minute hour day-of-month month-of-year day-of-week commands

合法值

代码语言:javascript
复制
00-59 00-23 01-31 01-12 0-6 (0 is sunday)

除了数字还有几个个特殊的符号就是*/-,*代表所有的取值范围内的数字

  • /代表每的意思,/5表示每5个单位
  • -代表从某个数字到某个数字
  • ,分开几个离散的数字

查看定时任务

代码语言:javascript
复制
crontab -l

添加定时任务

代码语言:javascript
复制
crontab -e

添加如下

代码语言:javascript
复制
5 1 * * * python $DATAX_HOME/bin/datax.py $DATAX_HOME/config/oracle2oracle.json  >>$DATAX_HOME/log/datax_log.`date +\%Y\%m\%d\%H\%M\%S`  2>&1

或者自己添加配置文件

代码语言:javascript
复制
crontab /data/cron/mysqlRollBack.cron

配置

配置项

  • job 下面有两个配置项,content 和 setting,其中 content 用来描述该任务的源和目的端的信息,setting 用来描述任务本身的信息;
  • content 又分为两部分,reader 和 writer,分别用来描述源端和目的端的信息;
  • setting 中的 speed 项表示同时起几个并发执行该任务;
  • job.setting.speed(流量控制) Job 支持用户对速度的自定义控制,channel 的值可以控制同步时的并发数,byte 的值可以控制同步时的速度。
  • job.setting.errorLimit(脏数据控制) Job 支持用户对于脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record 值)或者脏数据占比阈值(percentage 值),当 Job 传输过程出现的脏数据大于用户指定的数量/百分比,DataX Job 报错退出。

示例

代码语言:javascript
复制
{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "id",
              "name"
            ],
            "splitPk": "db_id",
            "connection": [
              {
                "table": [
                  "table"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/database"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print":true
          }
        }
      }
    ]
  }
}

测试

Stream ==> Stream

使用 streamreader + streamwriter(这种情况常用于测试) 配置文件:stream2stream.json

代码语言:javascript
复制
{
  "job": {
    "content": [{
      "reader": {
        "name": "streamreader",
        "parameter": {
          "sliceRecordCount": 10,
          "column": [{
            "type": "String",
            "value": "hello DataX"
          }, {
            "type": "string",
            "value": "DataX Stream To Stream"
          }, {
            "type": "string",
            "value": "数据迁移工具"
          }]
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "encoding": "GBK",
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 2
      }
    }
  }
}

输入执行命令:

代码语言:javascript
复制
python $DATAX_HOME/bin/datax.py ../job/stream2stream.json
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 设计理念
  • 支持的数据类型
  • DataX3.0核心架构
  • 工具部署
  • 定时任务
  • 配置
  • 测试
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档