前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式调度中间件xxl-job(三):执行器Executor—任务注册

分布式调度中间件xxl-job(三):执行器Executor—任务注册

作者头像
闲宇非鱼
发布2022-02-08 11:27:39
2K0
发布2022-02-08 11:27:39
举报

人生苦短,不如养狗

  在任务调度中,我们经常能看到执行器和任务一起的身影,两者的关系相当的紧密,在xxl-job中也是如此。

  可以看到,执行器是以机器为单位,一个应用中可能包含多个执行器(单机模式下,只会有一个执行器)。而每个执行器中会注册多个任务,其中属于同一个应用的执行器中注册的任务执行程序应当是相同的。

  下面我们就来具体看一下在xxl-job中任务是如何注册的。

一、从示例代码开始

  话不多说,我们直接来看一下示例代码是如何进行任务编写的。首先,我们来看下示例模块的目录:

  在 xxl-job-executor-samples 模块中提供了诸多框架的使用示例,这里我们选择使用SpringBoot版本的示例代码进行演示。可以看到在 jobhandler 目录下有一个 SampleXxlJob.class ,在这个类中有一个 demoJobHandler(String param) 方法,这个方法也就是上一章节中我们在调度中心注册的任务。我们来看一下这里的代码:

代码语言:javascript
复制
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
    XxlJobLogger.log("XXL-JOB, Hello World.");

    for (int i = 0; i < 5; i++) {
        XxlJobLogger.log("beat at:" + i);
        TimeUnit.SECONDS.sleep(2);
    }
    return ReturnT.SUCCESS;
}

  方法体内的逻辑十分简单,这里就不多加赘述了。具体来看下方法的注释、返回参数和参数列表。   可以看到,方法上使用了 @XxlJob("demoJobHandler") 注解来进行任务标识,标明了该方法是一个xxl-job任务,并且任务名为 demoJobHandler 。该方法的返回值是xxl-job提供的一个封装类 ReturnT<T> ,请求参数只有一个,类型是String。   这里需要提的一点是在之前的版本中是通过继承 IJobHandler 和在类上加注解的方式进行任务标识,在最新版中则抛弃了原有的做法,将任务的粒度细化到了方法级别。   前者的好处是任务编写的范式已经规定好,只需要重写对应抽象类中的方法并加上注解,但每一次编写新的任务执行程序都需要创建新的类来重新实现接口。后者的好处是在于细化了任务的粒度,将注解细化到了方法级别,不需要再重复地继承方法,很好地实现了类的复用。但这种方式同样存在问题,虽然只需要加上注解就可以进行方法标识,但是在进行方法编写时仍然还是需要按照 IJobHandler 中规定好的编写范式来进行方法的编写,至于这个编写范式在这种方式就需要去翻阅文档,无形中还是增加了一些步骤。   总体来看,这两种方式各有其优势,选择何种方式来实现只能说是看场景和编写者的喜好了。

二、JobHandler注册

  既然我们选择的是SpringBoot版本的示例,那么任务注册基本也逃不出Spring加载Bean的基本套路。   上面我们提到了,任务需要注册到执行器中,也就是 XxlJobSpringExecutor 中。这也就意味 XxlJobSpringExecutor 在Spring容器中进行Bean初始化时需要用到任务注册信息,这就需要在Bean加载过程中将获取到 Application 中的信息。这种情况,一般会选择实现 ApplicationContextAware 接口来进行来从应用上下文中获取Bean加载所需的容器内的服务。   下面我们来看下具体的任务注册代码:

代码语言:javascript
复制
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean

  首先看下 XxlJobSpringExecutor 实现的几个接口,除了 ApplicationContextAware ,它还实现了 SmartInitializingSingletonDisposableBean 两个接口。前者说明 XxlJobSpringExecutor 在Bean初始化完成后进行回调时要搞些事情,后者则说明在Bean销毁时要搞事情。同时 XxlJobSpringExecutor 还继承了 XxlJobExecutor ,关于 XxlJobExecutor 的具体细节闲鱼会在下一章进行分析。

代码语言:javascript
复制
@Override
public void afterSingletonsInstantiated() {

    // 从方法级别进行任务处理程序仓库初始化,这里进行了优化,将任务的粒度由原先的类转变为了方法级别
    initJobHandlerMethodRepository(applicationContext);

    // 刷新GlueFactory,此处获取SpringGlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

  可以看到在 afterSingletonsInstantiated 方法做了这样几件事情:

  • 初始化任务执行程序仓库,即进行任务注册
  • 刷新GlueFactory,获取SpringGlueFactory(用于动态脚本任务)
  • 启动执行器,此处调用父类start()方法

  我们具体来看下 initJobHandlerMethodRepository(applicationContext); 方法:

代码语言:javascript
复制
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    // 进行任务处理程序初始化
    // 获取bean名称列表
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        // 从上下文中根据bean元数据名称获取bean对象
        Object bean = applicationContext.getBean(beanDefinitionName);
        // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        // 获取被@XxlJob注解的方法
        Map<Method, XxlJob> annotatedMethods = null;
        try {
            // 根据bean类元信息获取被@XxlJob注解的方法
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        if (annotatedMethods == null || annotatedMethods.isEmpty()) {
            continue;
        }

        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            // 被注解的方法,即具体的任务
            Method method = methodXxlJobEntry.getKey();
            // XxlJob注解类,获取设置的注解信息
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            if (xxlJob == null) {
                continue;
            }
            // 任务名称
            String name = xxlJob.value();
            if (name.trim().length() == 0) {
                throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
            }
            // 检查当前任务名是否已经被使用,注意这里是通过任务名称来进行任务判重的
            if (loadJobHandler(name) != null) {
                throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
            }

            // 进行方法参数类型数量和参数类型判断,这里要求方法的参数列表只能有一个参数,并且参数类型为String
            if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                        "The correct method format like \" public ReturnT<String> execute(String param) \" .");
            }
            // 判断方法返回参数类型是否为ReturnT,如果不为ReturnT则为非法
            if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                        "The correct method format like \" public ReturnT<String> execute(String param) \" .");
            }
            method.setAccessible(true);

            // 设置初始化方法和销毁方法
            Method initMethod = null;
            Method destroyMethod = null;

            if (xxlJob.init().trim().length() > 0) {
                try {
                    initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                    initMethod.setAccessible(true);
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                }
            }
            if (xxlJob.destroy().trim().length() > 0) {
                try {
                    destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                    destroyMethod.setAccessible(true);
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                }
            }

            // 进行任务处理程序注册
            registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
        }
    }

}

  方法的入参 ApplicationContext 就是通过实现 ApplicationContextAware 接口获取的应用上下文。具体注册过程如下:

  1. ApplicationContext中获取所有Bean元数据名称,通过Bean元数据名称获取所有Bean;
  2. 遍历获取到的Bean,找到有XxlJob注解的类,获取类中被注解的所有方法;
  3. 获取被注解方法的相应信息,根据注解中的任务名称,调用loadJobHandler(name)方法检查该任务是否已经注册;
  4. 进行方法编写范式检查,主要检查方法名称、入参类型以及返回值类型是否符合要求;
  5. 设置被注解方法的初始化方法和销毁方法;
  6. 最后,将上述被注解方法注册到任务处理程序仓库中;

  至此,我们编写的任务就已经注册到了执行器Executor中了。

三、总结

  总体来看,将任务注册到执行器中的过程就是Bean加载过程中加载其他服务的过程,整体的代码和流程还是相对简单流畅的。下一章我们会开始学习执行器Executor注册和执行任务的过程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Brucebat的伪技术鱼塘 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、从示例代码开始
  • 二、JobHandler注册
  • 三、总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档