前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >datax源码解析-任务拆分机制详解

datax源码解析-任务拆分机制详解

作者头像
用户7634691
发布2021-12-17 21:28:58
2K0
发布2021-12-17 21:28:58
举报

datax源码解析-任务拆分机制详解

写在前面

此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。

本文我们来看看datax的任务拆分机制。

正文

理解datax中关于job和task的关系以及概念。

  • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

我们这篇文章其实就是关注的第二个步骤,拆分task。

任务拆分的入口函数是com.alibaba.datax.core.job.JobContainer#split,我们来一点点分析这个方法。

代码语言:javascript
复制
//计算needChannelNumber
        this.adjustChannelNumber();

        if (this.needChannelNumber <= 0) {
            this.needChannelNumber = 1;
        }

        //切分读插件,返回包含各个切分后的读插件配置列表,后续一个服务使用一个
        List<Configuration> readerTaskConfigs = this
                .doReaderSplit(this.needChannelNumber);

        ...

首先是计算needChannelNumber这个变量,这个变量是后面执行具体拆分成task的依据。adjustChannelNumber方法如下:

代码语言:javascript
复制
private void adjustChannelNumber() {
        int needChannelNumberByByte = Integer.MAX_VALUE;
        int needChannelNumberByRecord = Integer.MAX_VALUE;

        //是否指定了字节限速,来自任务配置文件
        boolean isByteLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
        if (isByteLimit) {
            //全局的限速字节数
            long globalLimitedByteSpeed = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);

            // 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
            Long channelLimitedByteSpeed = this.configuration
                    .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
            if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.CONFIG_ERROR,
                        "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
            }

            //计算channel的数量
            needChannelNumberByByte =
                    (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
            needChannelNumberByByte =
                    needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
            LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
        }

        //是否指定了记录数量限流
        boolean isRecordLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
        if (isRecordLimit) {
            long globalLimitedRecordSpeed = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);

            Long channelLimitedRecordSpeed = this.configuration.getLong(
                    CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
            if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
                throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
                        "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
            }

            needChannelNumberByRecord =
                    (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
            needChannelNumberByRecord =
                    needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
            LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
        }

        // 取较小值
        this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
                needChannelNumberByByte : needChannelNumberByRecord;

        // 如果从byte或record上设置了needChannelNumber则退出
        if (this.needChannelNumber < Integer.MAX_VALUE) {
            return;
        }

        //是否直接指定了channel数量
        boolean isChannelLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
        if (isChannelLimit) {
            this.needChannelNumber = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);

            LOG.info("Job set Channel-Number to " + this.needChannelNumber
                    + " channels.");

            return;
        }

        throw DataXException.asDataXException(
                FrameworkErrorCode.CONFIG_ERROR,
                "Job运行速度必须设置");
    }

注释写得比较详细了,总结下该方法的逻辑是,如果指定字节数限流,则据此计算并发数目A。如果指定记录数限流,则据此计算一个并发数目B。再取A和B两者中最小值作为needChannelNumber变量的值。如果两者限流都没指定,则看是否配置文件指定了channel并发数目。配置的示例是这样的:

代码语言:javascript
复制
{
    "core": {
       "transport" : {
          "channel": {
             "speed": {
                "record": 100,
                "byte": 100
             }
          }
       }
    },
    "job": {
      "setting": {
        "speed": {
          "record": 5000,
          "byte": 10000,
          "channel" : 1
        }
      }
    }
}

或者直接指定了channel数量:

代码语言:javascript
复制
"job": {
      "setting":{
            "speed":{
                "channel":"2"
            }
        }
    }

继续看split代码,

代码语言:javascript
复制
//切分读插件,返回包含各个切分后的读插件配置列表,后续一个服务使用一个
        List<Configuration> readerTaskConfigs = this
                .doReaderSplit(this.needChannelNumber);

        //拆分的任务数量
        int taskNumber = readerTaskConfigs.size();
        //先拆reader,再拆writer
        List<Configuration> writerTaskConfigs = this
                .doWriterSplit(taskNumber);

        ...

这里似乎有点奇怪,为啥reader拆分传入的是needChannelNumber,而writer拆分入参是taskNumber。这是因为datax的执行逻辑就是,必须先切分Reader,然后Writer是根据Reader切分后的数目进行切分的。这个仔细想想也可以理解,毕竟传输的源头是reader,根据reader进行分工是自然的。

深入到doReaderSplit方法继续看,

代码语言:javascript
复制
private List<Configuration> doReaderSplit(int adviceNumber) {
        classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
                PluginType.READER, this.readerPluginName));
        //内部还是调用插件的split
        List<Configuration> readerSlicesConfigs =
                this.jobReader.split(adviceNumber);
        if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                    "reader切分的task数目不能小于等于0");
        }
        LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",
                this.readerPluginName, readerSlicesConfigs.size());
        classLoaderSwapper.restoreCurrentThreadClassLoader();
        return readerSlicesConfigs;
    }

没啥东西,因为委托给了插件自己的split方法进行拆分,这里以mysql为例,最终调用的是com.alibaba.datax.plugin.rdbms.reader.util.ReaderSplitUtil#doSplit方法,来看下,

代码语言:javascript
复制
public static List<Configuration> doSplit(
            Configuration originalSliceConfig, int adviceNumber) {
        //默认isTableMode是true
        boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
        int eachTableShouldSplittedNumber = -1;
        if (isTableMode) {
            // adviceNumber这里是channel数量大小, 即datax并发task数量
            // eachTableShouldSplittedNumber是单表应该切分的份数, 向上取整可能和adviceNumber没有比例关系了已经
            eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
                    adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
        }

        //从配置文件获取列信息
        String column = originalSliceConfig.getString(Key.COLUMN);
        //从配置文件获取where设置,如果配置文件没有指定就是空
        String where = originalSliceConfig.getString(Key.WHERE, null);

        //数据库连接信息,这里仅指reader的连接信息
        List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class);

        List<Configuration> splittedConfigs = new ArrayList<Configuration>();

        for (int i = 0, len = conns.size(); i < len; i++) {
            Configuration sliceConfig = originalSliceConfig.clone();

            Configuration connConf = Configuration.from(conns.get(i).toString());
            String jdbcUrl = connConf.getString(Key.JDBC_URL);
            sliceConfig.set(Key.JDBC_URL, jdbcUrl);

            // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作
            sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));

            sliceConfig.remove(Constant.CONN_MARK);

            Configuration tempSlice;

            // 说明是配置的 table 方式
            if (isTableMode) {
                // 已在之前进行了扩展和`处理,可以直接使用
                List<String> tables = connConf.getList(Key.TABLE, String.class);

                Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误.");

                //要不要根据主键进一步拆分,如果配置文件没有指定就不需要拆分
                String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);

                //最终切分份数不一定等于 eachTableShouldSplittedNumber
                boolean needSplitTable = eachTableShouldSplittedNumber > 1
                        && StringUtils.isNotBlank(splitPk);
                //是否需要对单表进行拆分
                //当满足并发数要求较高,并且配置了splitPk(表分割的主键)参数时,则要求进行单表拆分
                if (needSplitTable) {
                    if (tables.size() == 1) {
                        //原来:如果是单表的,主键切分num=num*2+1
                        // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑
                        //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾

                        //考虑其他比率数字?(splitPk is null, 忽略此长尾)
                        //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;

                        //为避免导入hive小文件 默认基数为5,可以通过 splitFactor 配置基数
                        // 最终task数为(channel/tableNum)向上取整*splitFactor
                        Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR);
                        eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor;
                    }
                    // 尝试对每个表,切分为eachTableShouldSplittedNumber 份
                    for (String table : tables) {
                        tempSlice = sliceConfig.clone();
                        tempSlice.set(Key.TABLE, table);

                        List<Configuration> splittedSlices = SingleTableSplitUtil
                                .splitSingleTable(tempSlice, eachTableShouldSplittedNumber);

                        splittedConfigs.addAll(splittedSlices);
                    }
                } else {
                    for (String table : tables) {
                        tempSlice = sliceConfig.clone();
                        tempSlice.set(Key.TABLE, table);
                        String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
                        //sql的示例:select col1,col2,col3 from table1
                        tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where));
                        splittedConfigs.add(tempSlice);
                    }
                }
            } else {
                // 说明是配置的 querySql 方式
                List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);

                // TODO 是否check 配置为多条语句??
                for (String querySql : sqls) {
                    tempSlice = sliceConfig.clone();
                    tempSlice.set(Key.QUERY_SQL, querySql);
                    splittedConfigs.add(tempSlice);
                }
            }

        }

        return splittedConfigs;
    }

这个方法比较长,我加了比较详细的注释。其实就是先判断是否需要进行单表切分,当满足并发数要求较高,并且配置了splitPk(表分割的主键)参数时,则要求进行单表拆分,拆分个数前面已经经过计算得出,如果不需要就是几张表开启几个并发。拆分之后会返回一个Configuration的List,每个Configuration代表原先总配置文件中需要同步的数据的一部分。并加入到总配置文件存储,为后续调用提供配置的支持。

然后继续看writer的拆分方法,最终调用的是com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#doSplit方法,来看下,

代码语言:javascript
复制
public static List<Configuration> doSplit(Configuration simplifiedConf,
                                              int adviceNumber) {

        List<Configuration> splitResultConfigs = new ArrayList<Configuration>();

        int tableNumber = simplifiedConf.getInt(Constant.TABLE_NUMBER_MARK);

        //处理单表的情况
        if (tableNumber == 1) {
            //由于在之前的  master prepare 中已经把 table,jdbcUrl 提取出来,所以这里处理十分简单
            for (int j = 0; j < adviceNumber; j++) {
                splitResultConfigs.add(simplifiedConf.clone());
            }

            return splitResultConfigs;
        }
        ...

其中adviceNumber传入的是根据reader切分的任务数,simplifiedConf是从配置文件获取的writer相关的配置。为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则会报错,

代码语言:javascript
复制
if (tableNumber != adviceNumber) {
            throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
                    String.format("您的配置文件中的列配置信息有误. 您要写入的目的端的表个数是:%s , 但是根据系统建议需要切分的份数是:%s. 请检查您的配置并作出修改.",
                            tableNumber, adviceNumber));
        }

拆分完reader和writer之后,接下来有一行代码:

代码语言:javascript
复制
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);

这个是做什么的呢?我举个例子,我们定义任务配置的时候可以指定转换的规则,比如:

代码语言:javascript
复制
{
"job": {
 "setting": {
   "speed": {
     "channel": 2
   },
   "errorLimit": {
     "record": 10000,
     "percentage": 1
   }
 },
 "content": [
   {
     // 字段转换部分
     "transformer": [
       {
         // 使用字段截取转换
         "name": "dx_substr",
         "parameter": {
           // 操作读取出来的record的第一列
           "columnIndex": 0,
           // 意思是截取第0到4个字符
           "paras": ["0","4"]
         }
       }
     ],
     ...

如下图所示,在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。

最后是合并配置,方法是mergeReaderAndWriterTaskConfigs

代码语言:javascript
复制
private List<Configuration> mergeReaderAndWriterTaskConfigs(
            List<Configuration> readerTasksConfigs,
            List<Configuration> writerTasksConfigs,
            List<Configuration> transformerConfigs) {
        //reader和writer切分的数量要相等
        if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                    String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
                            readerTasksConfigs.size(), writerTasksConfigs.size())
            );
        }

        List<Configuration> contentConfigs = new ArrayList<Configuration>();
        for (int i = 0; i < readerTasksConfigs.size(); i++) {
            Configuration taskConfig = Configuration.newDefault();

            //reader相关的配置
            taskConfig.set(CoreConstant.JOB_READER_NAME,
                    this.readerPluginName);
            taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
                    readerTasksConfigs.get(i));

            //writer相关的配置
            taskConfig.set(CoreConstant.JOB_WRITER_NAME,
                    this.writerPluginName);
            taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
                    writerTasksConfigs.get(i));

            //transform相关的配置,可以为空
            if(transformerConfigs!=null && transformerConfigs.size()>0){
                taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
            }

            taskConfig.set(CoreConstant.TASK_ID, i);
            contentConfigs.add(taskConfig);
        }

        return contentConfigs;
    }

这个其实就是把任务整合后输出,输出的配置文件可以在task中使用。

参考:

  • https://github.com/alibaba/DataX/blob/master/introduction.md
  • https://www.jianshu.com/p/6b4173d3fc74
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 犀牛的技术笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写在前面
  • 正文
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档