前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >datax源码解析-JobContainer的初始化阶段解析

datax源码解析-JobContainer的初始化阶段解析

作者头像
用户7634691
发布2021-12-17 21:27:19
7720
发布2021-12-17 21:27:19
举报

datax源码解析-JobContainer的初始化阶段解析

写在前面

此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。

我所使用的任务模版的json文件是:

代码语言:javascript
复制
{
    "job":{
        "content":[
            {
                "reader":{
                    "name":"mysqlreader",
                    "parameter":{
                        "column":[
                            "id",
                            "name",
                            "age"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":[
                                    "jdbc:mysql://127.0.0.1:3306/test"
                                ],
                                "table":[
                                    "t_datax_test"
                                ]
                            }
                        ],
                        "password":"11111111",
                        "username":"root"
                    }
                },
                "writer":{
                    "name":"mysqlwriter",
                    "parameter":{
                        "column":[
                            "id",
                            "name",
                            "age"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test2",
                                "table":[
                                    "t_datax_test"
                                ]
                            }
                        ],
                        "password":"11111111",
                        "username":"root"
                    }
                }
            }
        ],
        "setting":{
            "speed":{
                "channel":"2"
            }
        }
    }
}

另外就是,我在阅读源码的时候加入了很多注释,大家看我贴出来的代码要注意看注释哦。贴出来的代码有些可能只是包含核心的片段。

JobContainer初始化阶段

接着上篇文章:

datax源码解析-启动类分析

进入JobContainer的start方法,jobContainer主要负责的工作全部在start()里面,包括:

  • preHandle,前置处理
  • init,初始化,主要是调用插件的init方法实现初始化
  • prepare,准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作
  • split,根据配置的并发参数,对job进行切分,切分为多个task
  • scheduler,把上一步reader和writer split的结果整合到taskGroupContainer,启动任务
  • post,执行完任务后的后置操作
  • invokeHooks,DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等

从代码中看,也可以清晰的看到这几个过程:

代码语言:javascript
复制
public void start() {
        LOG.info("DataX jobContainer starts job.");

        boolean hasException = false;
        boolean isDryRun = false;
        try {
            this.startTimeStamp = System.currentTimeMillis();
            isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
            if(isDryRun) {
                LOG.info("jobContainer starts to do preCheck ...");
                this.preCheck();
            } else {
                //线程安全考虑
                userConf = configuration.clone();
                LOG.debug("jobContainer starts to do preHandle ...");
                ////初始化preHandler插件并执行插件的preHandler
                this.preHandle();

                LOG.debug("jobContainer starts to do init ...");
                //初始化reader和writer
                this.init();
                LOG.info("jobContainer starts to do prepare ...");
                //准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作。
                this.prepare();
                LOG.info("jobContainer starts to do split ...");
                //拆分task,实际的拆分工作还是调用插件的实现
                this.totalStage = this.split();
                LOG.info("jobContainer starts to do schedule ...");
                //把上一步reader和writer split的结果整合到taskGroupContainer,启动任务
                this.schedule();
                LOG.debug("jobContainer starts to do post ...");
                //执行任务后的操作
                this.post();

                LOG.debug("jobContainer starts to do postHandle ...");
                //不知道是干啥的
                this.postHandle();
                LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

                //DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等
                this.invokeHooks();
                ...

本篇文章只关注前面三个部分,也就是preHandle,init,prepare三个阶段,我认为这三个阶段都属于任务开始前的初始化阶段。

preHandler

preHandler目前官方也没有实现,com.alibaba.datax.common.plugin.AbstractPlugin#preHandler方法目前是空的,所以这里我们也先略过。

init

继续看init方法,

代码语言:javascript
复制
private void init() {
        //从配置中获取jobid
        this.jobId = this.configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);

        if (this.jobId < 0) {
            LOG.info("Set jobId = 0");
            this.jobId = 0;
            this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
                    this.jobId);
        }

        Thread.currentThread().setName("job-" + this.jobId);

        //DataX所有的状态及统计信息交互类,job、taskGroup、task等的消息汇报
        JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
                this.getContainerCommunicator());
        //必须先Reader ,后Writer
        this.jobReader = this.initJobReader(jobPluginCollector);
        //writer的初始化做的事情会多一些,比如会检查写入表的字段和指定的字段个数是否一致等
        this.jobWriter = this.initJobWriter(jobPluginCollector);
    }

可以看到,init方法分别调用的是reader和writer的init方法进行初始化。先来看下initJobReader方法,

代码语言:javascript
复制
private Reader.Job initJobReader(
            JobPluginCollector jobPluginCollector) {
        this.readerPluginName = this.configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
                PluginType.READER, this.readerPluginName));

        //loadJobPlugin需要用到jarLoader
        Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
                PluginType.READER, this.readerPluginName);

        // 设置reader的jobConfig
        jobReader.setPluginJobConf(this.configuration.getConfiguration(
                CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));

        // 设置reader的readerConfig
        jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
                CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));

        jobReader.setJobPluginCollector(jobPluginCollector);
        //调用插件自己内部的init方法进行个性初始化,以mysql的初始化为例
        //mysql reader会检查username,password等是否存在
        jobReader.init();

        classLoaderSwapper.restoreCurrentThreadClassLoader();
        return jobReader;
    }

首先看这个方法返回的是Reader.Job这样的一个内部类,这个类是AbstractJobPlugin的一个实现。所以返回的其实是一个reader插件的实例。

接着看到是com.alibaba.datax.core.util.container.LoadUtil#getJarLoader方法,它根据类型和名称从缓存中获取,如果没有则去创建,创建的流程首先获取插件的路径.比如:D:\DataX\target\datax\datax\plugin\reader\mysqlreader,然后根据JarLoader里面的getURLs(paths)获取插件路径下所有的jar包。创建单独的JarLoader,把创建的JarLoader缓存起来。

然后它返回一个是一个自定义的类加载器JarLoader,根据java类加载器的原理我们知道,JarLoader是Application ClassLoader的子类。DataX通过Thread.currentThread().setContextClassLoader在每次对插件调用前后的进行classLoader的切换实现jar隔离的加载机制。

接下里的loadJobPlugin就会用到这个类加载器去实例化插件的实现类。

插件加载这部分的设计还是值得学习的,即实现了jar的隔离加载,也实现了热加载功能。

最后就是调用插件本身的init方法,以mysql为例,这里主要是检查 username/password 配置是否存在等。

writer的初始化流程基本是一样的,这里不展开了。

prepare

prepare也是调用插件的prepare方法进行准备阶段的工作,

代码语言:javascript
复制
private void prepare() {
        this.prepareJobReader();
        this.prepareJobWriter();
    }

mysql reader的prepare没有实现,意味着不需要prepare,我们直接来看下writer的prepare方法。

代码语言:javascript
复制
// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)
        public void prepare(Configuration originalConfig) {
            int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK);
            if (tableNumber == 1) {
                String username = originalConfig.getString(Key.USERNAME);
                String password = originalConfig.getString(Key.PASSWORD);

                List<Object> conns = originalConfig.getList(Constant.CONN_MARK,
                        Object.class);
                Configuration connConf = Configuration.from(conns.get(0)
                        .toString());

                // 这里的 jdbcUrl 已经 append 了合适后缀参数
                String jdbcUrl = connConf.getString(Key.JDBC_URL);
                originalConfig.set(Key.JDBC_URL, jdbcUrl);

                //表名
                String table = connConf.getList(Key.TABLE, String.class).get(0);
                originalConfig.set(Key.TABLE, table);

                //如果有需要提前执行的sql,比如清空表等
                List<String> preSqls = originalConfig.getList(Key.PRE_SQL,
                        String.class);
                /**
                 * sql转换,比如把@table换成实际的table name
                 *
                 */
                List<String> renderedPreSqls = WriterUtil.renderPreOrPostSqls(
                        preSqls, table);

                originalConfig.remove(Constant.CONN_MARK);
                if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
                    // 说明有 preSql 配置,则此处删除掉
                    originalConfig.remove(Key.PRE_SQL);

                    Connection conn = DBUtil.getConnection(dataBaseType,
                            jdbcUrl, username, password);
                    LOG.info("Begin to execute preSqls:[{}]. context info:{}.",
                            StringUtils.join(renderedPreSqls, ";"), jdbcUrl);

                    WriterUtil.executeSqls(conn, renderedPreSqls, jdbcUrl, dataBaseType);
                    DBUtil.closeDBResources(null, null, conn);
                }
            }

其实prepare的核心思想就是,看下任务的配置文件有没有需要提前执行的sql,比如清空表之类的,有的话就先执行了。

参考:

  • https://www.cnblogs.com/xmzpc/p/15193622.html
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 犀牛的技术笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写在前面
  • JobContainer初始化阶段
    • preHandler
      • init
        • prepare
        相关产品与服务
        云数据库 SQL Server
        腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档