前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >XXL-JOB系列二之执行器注册

XXL-JOB系列二之执行器注册

原创
作者头像
用户9511949
修改2024-07-02 14:26:18
1250
修改2024-07-02 14:26:18
举报
文章被收录于专栏:XXL-JOBXXL-JOB

1 JobHanlder注册

XxlJobSpringExecutor

如果是在基于Spring的项目中使用xxl-job,那么是由XxlJobSpringExecutor这个类来进行JobHanlder的初始化,首先这个类实现了SmartInitializingSingleton接口,这个接口的作用是在Spring容器管理的所有单例对象(非懒加载)完成初始化之后执行其回调方法afterSingletonsInstantiated, 在该方法中由initJobHanlderMethodRepository去完成初始化操作

代码语言:java
复制
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
    // start
    @Override
    public void afterSingletonsInstantiated() {
        // 注册JobHandler
        initJobHandlerMethodRepository(applicationContext);
        // refresh GlueFactory
        GlueFactory.refreshInstance(1);
        try {
            // 调用父类的start方法完成初始化
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

下面分析下initJobHanlderMethodRepository方法的逻辑

代码语言:java
复制
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    // 1. 获取Spring容器中所有的单例对象
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = null;
        Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
        if (onBean!=null){
            // 2. 如果实例配置了@Lazy注解,直接跳过懒加载的对象
            logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
            continue;
        }else {
            bean = applicationContext.getBean(beanDefinitionName);
        }
        Map<Method, XxlJob> annotatedMethods = null;
        try {
            // 3. 获取该对象所有配置了@XxlJob注解的方法
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method executeMethod = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            // 4. 注册JobHanlder
            registJobHandler(xxlJob, bean, executeMethod);
        }

    }
}

initJobHanlderMethodRepository方法比较简单,就是从Spring容器中获取所有配置了@XxlJob的方法,然后调用registJobHanlder方法注册JobHanlder,registJobHanlder方法位于父类XxlJobExecutor中,接下来看看其实现逻辑

代码语言:java
复制
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
    if (xxlJob == null) {
        return;
    }
    String name = xxlJob.value();
    Class<?> clazz = bean.getClass();
    String methodName = executeMethod.getName();
    if (name.trim().length() == 0) {
        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
    }
    // 对job的名称判重
    if (loadJobHandler(name) != null) {
        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
    }
    executeMethod.setAccessible(true);
    // 获取init和destroy方法
    Method initMethod = null;
    Method destroyMethod = null;

    if (xxlJob.init().trim().length() > 0) {
        try {
            initMethod = clazz.getDeclaredMethod(xxlJob.init());
            initMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
        }
    }
    if (xxlJob.destroy().trim().length() > 0) {
        try {
            destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
            destroyMethod.setAccessible(true);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
        }
    }
    // 注册jobhandler
    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}

以上方法的实现逻辑也比较简单,就是封装了一个MethodJobHandler对象,这个对象的属性包括要具体执行业务逻辑的方法、init方法以及destory方法,然后调用XxlJobExecutor的静态方法registJobHandler注册

代码语言:java
复制
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
......
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    return jobHandlerRepository.put(name, jobHandler);
}

就是将MethodJobHandler保存到一个Map结构中,至此执行器的注册操作完成。

XxlJobSimpleExecutor

如果是在非Spring项目中使用xxl-job,则需要手动创建XxlJobSimpleExecutor,并且需要手动设置xxlJobBeanList,然后再手动调用start方法执行具体注册的逻辑,注册逻辑和XxlJobSpringExecutor一致。

代码语言:javascript
复制
public class XxlJobSimpleExecutor extends XxlJobExecutor {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobSimpleExecutor.class);

    private List<Object> xxlJobBeanList = new ArrayList<>();
    public List<Object> getXxlJobBeanList() {
        return xxlJobBeanList;
    }
    public void setXxlJobBeanList(List<Object> xxlJobBeanList) {
        this.xxlJobBeanList = xxlJobBeanList;
    }

    @Override
    public void start() {
        // 注册JobHandler
        initJobHandlerMethodRepository(xxlJobBeanList);
        // super start
        try {
            // 调用父类的start方法完成初始化操作
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2 JobHanlder初始化

XxlJobSpringExecutor和XxlJobSimpleExecutor完成注册操作之后,调用父类XxlJobExecutor的start方法完成初始化操作,逻辑如下

代码语言:javascript
复制
public void start() throws Exception {
    // 1,初始化日志目录,默认为/data/applogs/xxl-job/jobhandler
    XxlJobFileAppender.initLogPath(logPath);
    // 2,初始化调度器Client
    initAdminBizList(adminAddresses, accessToken);
    // 3,日志文件清理线程启动
    JobLogFileCleanThread.getInstance().start(logRetentionDays);
    // 4,回调线程启动
    TriggerCallbackThread.getInstance().start();
    // 5,初始化接受调度器请求的HttpServer
    initEmbedServer(address, ip, port, appname, accessToken);
}

1,2步比较简单,看下第3步日志清理线程的逻辑,具体清理的逻辑就不仔细看了,就是删除过期的日志文件,主要看看代码结构,通过一个线程和一个Flag标记来循环处理日志文件,这样的结构在xxl-job中的应用很多,第4步回调线程也是如此。

代码语言:javascript
复制
private Thread localThread;
private volatile boolean toStop = false;
public void start(final long logRetentionDays){
    ......
    
    localThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!toStop) {
                ......
            }
            logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");
        }
    });
    localThread.setDaemon(true);
    localThread.setName("xxl-job, executor JobLogFileCleanThread");
    localThread.start();
}

下面来看看initEmbedServer方法,非核心逻辑省略,新建了一个EmbedServer对象,并且启动

代码语言:javascript
复制
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
    ......
    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

看看embedServer的start方法

代码语言:javascript
复制
public void start(final String address, final int port, final String appname, final String accessToken) {
    // 1 新建一个executor实例
    executorBiz = new ExecutorBizImpl();
    // 2 启动一个HttpServer
    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            // 2.1 业务处理线程池,采用一个单独的线程池处理业务逻辑,和Netty的I/O线程隔离
            // 核心线程数设置为0(当有任务要执行时,会创建一个临时线程执行),当没有任务需要调度时,不占用资源
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });
            try {
                // 2.2 启动HttpServer
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // 2.3 同步等待HttpServer启动成功
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // 2.4 将执行器注册到调度中心
                startRegistry(appname, address);

                // 2.5 等待直到连接关闭
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

start方法的逻辑比较清晰,首先创建了一个ExecutorBizImpl对象,该对象是ExecutorBiz接口的其中一个实现类,另外一个实现类是ExecutorBizClient,ExecutorBizClient是调度中心用来向执行器发送请求,ExecutorBizImpl是执行器用来执行具体请求对应的业务逻辑。

然后是启动了一个NettyHttpServer,启动成功之后调用startRegistry方法将执行注册到调度中心,下面来看看startRegistry方法的逻辑

代码语言:javascript
复制
public void startRegistry(final String appname, final String address) {
    // start registry
    ExecutorRegistryThread.getInstance().start(appname, address);
}

可以看出也是创建了一个单独的线程来执行具体的逻辑,看下ExecutorRegistryThread的start方法的代码,非核心逻辑省略

代码语言:javascript
复制
private Thread registryThread;
private volatile boolean toStop = false;
public void start(final String appname, final String address){
    
    ......

    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {
            // 1. 将执行器注册到调度中心
            while (!toStop) {
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registry(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                try {
                    if (!toStop) {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    }
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }
            }
            // 2. 删除执行器
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
        }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
}

逻辑也比较简单,每隔30s(默认)向调度中心注册一遍,本质上来说就是一种续约的机制,告诉调度中心本执行器还活着,最后如果注册线程停了,再调用调度中心的接口删除执行器。

调度中心提供callback,registry,registryRemove接口给执行器调用,代码如下

代码语言:javascript
复制
public class JobApiController {

    @RequestMapping("/{uri}")
    @ResponseBody
    @PermissionLimit(limit=false)
    public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
        ......
        // services mapping
        if ("callback".equals(uri)) {
            List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
            return adminBiz.callback(callbackParamList);
        } else if ("registry".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registry(registryParam);
        } else if ("registryRemove".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registryRemove(registryParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }
    }
}

注册就调用adminBiz.registry方法将执行器的group,appname,adress信息保存在xxl_job_registry表中,并且设置更新时间为当前时间,freshGroupRegistryInfo暂时没有处理逻辑

代码语言:javascript
复制
public ReturnT<String> registry(RegistryParam registryParam) {
    return JobRegistryHelper.getInstance().registry(registryParam);
}
代码语言:javascript
复制
public ReturnT<String> registry(RegistryParam registryParam) {
    ......
    // async execute
    registryOrRemoveThreadPool.execute(new Runnable() {
       @Override
       public void run() {
          // 保存或者更新执行器
          int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
          if (ret < 1) {
             XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
             // fresh
             freshGroupRegistryInfo(registryParam);
          }
       }
    });
    return ReturnT.SUCCESS;
}

至此执行器的注册和初始化就完成了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 JobHanlder注册
    • XxlJobSpringExecutor
      • XxlJobSimpleExecutor
      • 2 JobHanlder初始化
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档