前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >datax源码解析-datax的hook机制解析

datax源码解析-datax的hook机制解析

作者头像
用户7634691
发布2021-12-17 21:27:40
1.4K0
发布2021-12-17 21:27:40
举报

JobContainer的start方法,最后一步调用的是invokeHooks,这个方法就是datax的自定义hook被调用的地方。datax的hook提供了一种机制,可以让开发者再任务执行完成后做一些定制化的事情,比如给任务的负责人发送一条短信提醒之类的。

我们顺着invokeHooks方法来分析下,

代码语言:javascript
复制
private void invokeHooks() {
        Communication comm = super.getContainerCommunicator().collect();
        //配置也要传过去,因为你实现的hook可能需要一些参数,比如accessKey等,配置的格式是可以自己定义的
        HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, comm.getCounter());
        invoker.invokeAll();
    }

HookInvoker是datax实现hook机制的一个管理类,我们看到它接受三个参数,一个是目录,这里存放的是datax主目录+/hook。第二个参数是配置,因为你自己实现的hook可能需要一些外部配置的参数,比如你要实现发送短信功能,可能会把一些api密钥之类的东西放在配置里传进去。第三个参数是用来统计的。

接着看下invokeAll方法,

代码语言:javascript
复制
public void invokeAll() {
        if (!baseDir.exists() || baseDir.isFile()) {
            LOG.info("No hook invoked, because base dir not exists or is a file: " + baseDir.getAbsolutePath());
            return;
        }

        //过滤子目录保存在subDirs
        String[] subDirs = baseDir.list(new FilenameFilter() {
            @Override
            public boolean accept(File dir, String name) {
                return new File(dir, name).isDirectory();
            }
        });

        //没有子目录,不符合hook目录要求
        if (subDirs == null) {
            throw DataXException.asDataXException(FrameworkErrorCode.HOOK_LOAD_ERROR, "获取HOOK子目录返回null");
        }

        /**
         * 子目录的格式示例:
         * META-INF/services/com.example.CodecSet
         */
        for (String subDir : subDirs) {
            doInvoke(new File(baseDir, subDir).getAbsolutePath());
        }

    }

首先是做一些基本的检查,确保目录结构是合法的。接着扫描给定目录的所有一级子目录,每个子目录当作一个Hook的目录。对于每个子目录,必须符合ServiceLoader的标准目录格式

hook的目录结构看起来类似这个样子:

简单来讲,ServiceLoader实现了一种机制,可以动态加载指定目录的实现类并且实例化,它是java SPI机制的重要组成部分。

Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制,提供了通过interface寻找implement的方法。类似于IOC的思想,将装配的控制权移到程序之外,从而实现解耦。如下图所示:

继续看doInvoke方法,

代码语言:javascript
复制
private void doInvoke(String path) {
        ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            JarLoader jarLoader = new JarLoader(new String[]{path});
            Thread.currentThread().setContextClassLoader(jarLoader);
            //这里通过 ServiceLoader 机制加载Hook的实现类
            Iterator<Hook> hookIt = ServiceLoader.load(Hook.class).iterator();
            if (!hookIt.hasNext()) {
                LOG.warn("No hook defined under path: " + path);
            } else {
                Hook hook = hookIt.next();
                LOG.info("Invoke hook [{}], path: {}", hook.getName(), path);
                hook.invoke(conf, msg);//调用实际的hook实现
            }
        } catch (Exception e) {
            LOG.error("Exception when invoke hook", e);
            throw DataXException.asDataXException(
                    CommonErrorCode.HOOK_INTERNAL_ERROR, "Exception when invoke hook", e);
        } finally {
            Thread.currentThread().setContextClassLoader(oldClassLoader);
        }
    }

这里首先是把类加载器设置成jar包的加载器,这是继承的应用加载器,然后使用ServiceLoader加载用户自己是实现的Hook接口的实现类并进行实例化。实例化后调用invoke方法执行自定义的逻辑。Hook接口的定义如下:

代码语言:javascript
复制
/**
 * Created by xiafei.qiuxf on 14/12/17.
 * DataX 的 Hook 机制,这里定义了开放的接口
 * https://xie.infoq.cn/article/68102f356019f52560f4b8c70
 */
public interface Hook {

    /**
     * 返回名字
     *
     * @return
     */
    public String getName();

    /**
     * TODO 文档
     *
     * @param jobConf
     * @param msg
     */
    public void invoke(Configuration jobConf, Map<String, Number> msg);

}

我们自己的实现类示例:

代码语言:javascript
复制
public class SMSReport implements Hook {


    @Override
    public String getName() {
        return "SMSReportHook";
    }

    @Override
    public void invoke(Configuration configuration, Map<String, Number> map) {
        //调用第三方的短信api发送短信

        ....
    }

实现类写完后我们打成jar包,在 DataX 根目录下,新建hook文件夹,然后在其下创建sms文件夹,将jar包放在这里就可以了。

我们来总结下:

datax提供了一种Hook机制,可以在执行完核心逻辑后触发一个开发者自己定义的逻辑。实现的原理是利用了java SPI机制,datax定义了一个Hook接口,开发者实现这个接口。通过interface寻找implement的方法。

参考:

  • https://xie.infoq.cn/article/68102f356019f52560f4b8c70
  • https://docs.oracle.com/javase/6/docs/api/java/util/ServiceLoader.html
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 犀牛的技术笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
短信
腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档