前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >datax源码解析-启动类分析

datax源码解析-启动类分析

作者头像
用户7634691
发布2021-12-17 21:26:55
7450
发布2021-12-17 21:26:55
举报

写在前面

此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。

我所使用的任务模版的json文件是:

代码语言:javascript
复制
{
    "job":{
        "content":[
            {
                "reader":{
                    "name":"mysqlreader",
                    "parameter":{
                        "column":[
                            "id",
                            "name",
                            "age"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":[
                                    "jdbc:mysql://127.0.0.1:3306/test"
                                ],
                                "table":[
                                    "t_datax_test"
                                ]
                            }
                        ],
                        "password":"11111111",
                        "username":"root"
                    }
                },
                "writer":{
                    "name":"mysqlwriter",
                    "parameter":{
                        "column":[
                            "id",
                            "name",
                            "age"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test2",
                                "table":[
                                    "t_datax_test"
                                ]
                            }
                        ],
                        "password":"11111111",
                        "username":"root"
                    }
                }
            }
        ],
        "setting":{
            "speed":{
                "channel":"2"
            }
        }
    }
}

另外就是,我在阅读源码的时候加入了很多注释,大家看我贴出来的代码要注意看注释哦。贴出来的代码有些可能只是包含核心的片段。

启动类分析

datax的启动类是com.alibaba.datax.core.Engine,通过main方法启动datax进程。

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
        int exitCode = 0;
        try {
            Engine.entry(args);
        } catch (Throwable e) {
            exitCode = 1;
            LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e));
            ...

继续看entry方法,

代码语言:javascript
复制
public static void entry(final String[] args) throws Throwable {
        Options options = new Options();
        options.addOption("job", true, "Job config.");
        options.addOption("jobid", true, "Job unique id.");
        options.addOption("mode", true, "Job runtime mode.");

        BasicParser parser = new BasicParser();
        CommandLine cl = parser.parse(options, args);

        //datax运行目录/xxx.json
        String jobPath = cl.getOptionValue("job");

        // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
        String jobIdString = cl.getOptionValue("jobid");
        RUNTIME_MODE = cl.getOptionValue("mode");

        Configuration configuration = ConfigParser.parse(jobPath);

        long jobId;
        if (!"-1".equalsIgnoreCase(jobIdString)) {
            jobId = Long.parseLong(jobIdString);
        } else {
            // only for dsc & ds & datax 3 update
            String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
            String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
            String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
            List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
                    dsJobUrlPatternString, dsTaskGroupUrlPatternString);
            jobId = parseJobIdFromUrl(patternStringList, jobPath);
        }

        boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
        if (!isStandAloneMode && jobId == -1) {
            // 如果不是 standalone 模式,那么 jobId 一定不能为-1
            throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
        }
        configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

        //打印vmInfo
        VMInfo vmInfo = VMInfo.getVmInfo();
        if (vmInfo != null) {
            LOG.info(vmInfo.toString());
        }

        LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");

        LOG.debug(configuration.toJSON());

        ConfigurationValidate.doValidate(configuration);
        Engine engine = new Engine();
        //完成配置初始化后该方法将实例化本身并调用其start方法
        engine.start(configuration);
    }

首先是解析我们运行datax制定的运行参数,比如我在idea里给的配置是

代码语言:javascript
复制
-mode
standalone
-jobid
-1
-job
/Users/malu/Documents/code/idea_study/DataX/core/target/datax/job/mysql2mysql.json

那自然的,jobPath的值就是/Users/malu/Documents/code/idea_study/DataX/core/target/datax/job/mysql2mysql.json,jobIdString的值是-1,RUNTIME_MODE的值是standalone

这几个关键的变量值明确之后,下面的流程明确了。

接着看一个比较重要的方法,ConfigParser.parse,这个方法返回的是Configuration类的实例,这个类在datax里非常重要,所有的配置信息都由它来管理,相当于大管家的角色。我后面打算专门写一篇介绍这个类。

代码语言:javascript
复制
/**
     * 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回
     */
    public static Configuration parse(final String jobPath) {
        //首先从任务配置文件解析基本的配置,包括reader、writer的信息,channel的数量等
        Configuration configuration = ConfigParser.parseJobConfig(jobPath);

        //合并datax本身的一些配置,主要是在core.json文件里,比如限速的一些配置等
        configuration.merge(
                ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
                false);
        // todo config优化,只捕获需要的plugin
        //reader plugin的名字,比如mysql是mysqlreader
        String readerPluginName = configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        //writer plugin的名字,比如mysql是mysqlwriter
        String writerPluginName = configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

        String preHandlerName = configuration.getString(
                CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);

        String postHandlerName = configuration.getString(
                CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

        Set<String> pluginList = new HashSet<String>();
        pluginList.add(readerPluginName);
        pluginList.add(writerPluginName);

        if(StringUtils.isNotEmpty(preHandlerName)) {
            pluginList.add(preHandlerName);
        }
        if(StringUtils.isNotEmpty(postHandlerName)) {
            pluginList.add(postHandlerName);
        }
        try {
            configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
            ...

注释写得很清楚了。

VMInfo里面放的是电脑本身的一些配置信息,这里不表。

接着是filterJobConfiguration方法,

代码语言:javascript
复制
public static String filterJobConfiguration(final Configuration configuration) {
        //clone一份,因为后面会修改
        Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();

        Configuration jobContent = jobConfWithSetting.getConfiguration("content");

        //过滤敏感信息,比如password
        filterSensitiveConfiguration(jobContent);

        jobConfWithSetting.set("content",jobContent);

        //格式化json字符串显示
        return jobConfWithSetting.beautify();
    }

这里也没啥好说的,都是一些基本操作。然后进入start方法,

代码语言:javascript
复制
/* check job model (job/task) first */
    public void start(Configuration allConf) {

        // 绑定column转换信息
        ColumnCast.bind(allConf);

        /**
         * 初始化PluginLoader,可以获取各种插件配置
         */
        LoadUtil.bind(allConf);

        boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
                .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
        //JobContainer会在schedule后再行进行设置和调整值
        int channelNumber =0;
        AbstractContainer container;
        ...


        //缺省打开perfTrace
        boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
        boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);

        //standlone模式的datax shell任务不进行汇报
        if(instanceId == -1){
            perfReportEnable = false;
        }

        int priority = 0;
        try {
            priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
        }catch (NumberFormatException e){
            LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
        }

        Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
        //初始化PerfTrace
        PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
        perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
        /**
         * 有两个实现:JobContainer和TaskGroupContainer
         * 从配置看,基本上都是JobContainer,所以主要分析它
         */
        container.start();

    }

注释也比较清楚了,这里说明一点就是PerfTrace类,它是一个追踪性能的类,也就是datax在执行任务的时候记录一些指标,比如传输了多少数据,耗时多少等。下面是一个示例,它是datax在执行完一个任务后其中一部分打印内容:

代码语言:javascript
复制
2021-11-28 09:11:32.532 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 39 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%

container.start方法就进入JobContainer内部了,放在下一篇文章讲吧。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 犀牛的技术笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写在前面
  • 启动类分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档