前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >XxlJob注册及发现原理

XxlJob注册及发现原理

原创
作者头像
Kokomo
发布2023-11-27 14:32:36
4801
发布2023-11-27 14:32:36
举报
文章被收录于专栏:灵墨AI探索室灵墨AI探索室

各位程序员伙伴在工作中一定会接触到定时触发任务的场景,而目前业界内使用最多的就是XXL-JOB了,在接下来的几篇文章中,笔者会带各位深入了解一下XXL-JOB底层的原理,话不多说,我们现在开始

简介

XXL-JOB是一款轻量级的分布式任务调度中间件,默认支持6000个定时任务,如果生产环境的任务数量在这个范围内,可以选择使用 XXL-JOB。目前很多大型公司在任务调度的场景使用的都是XXL-JOB。

整体架构

先简单介绍一下xxl的整体架构

XXL-JOB分为执行器和调度中心,执行器在启动时注册到调度中心上,任务相关的配置,包括执行时间,阻塞策略,执行的方法,执行的策略保存在调度中心中,上述配置完毕后,调度中心就会跟据配置在指定的时间根据指定的策略执行任务。此外调度中心也会记录每次任务执行的状态。

注册与销毁

XXL-JOB采用的是Http通信,实际上,调度中心和执行器各启动了一个Netty的服务器,用来通信,调度中心也提供了对应的接口用来处理执行器发送的请求。

下面这个类的记录的是调度中心的相关属性

在执行器启动时,初始netty的过程中,会调用注册方法,通知调度中心,执行器已经启动

代码语言:java
复制
public void start(final String address, final int port, final String appname, final String accessToken) {
        executorBiz = new ExecutorBizImpl();
        thread = new Thread(new Runnable() {
            @Override
            public void run() {
                // param
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0,
                        200,
                        60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(2000),
                        new ThreadFactory() {
                            @Override
                            public Thread newThread(Runnable r) {
                                return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
                            }
                        },
                        new RejectedExecutionHandler() {
                            @Override
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                            }
                        });
                try {
                    // start server
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                            .addLast(new HttpServerCodec())
                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                }
                            })
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind
                    ChannelFuture future = bootstrap.bind(port).sync();

                    logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                    // start registry
                    startRegistry(appname, address);

                    // wait util stop
                    future.channel().closeFuture().sync();

                } catch (InterruptedException e) {
                    logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                } catch (Exception e) {
                    logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                } finally {
                    // stop
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
        thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
        thread.start();
    }
代码语言:java
复制
public void startRegistry(final String appname, final String address) {
        // start registry
        ExecutorRegistryThread.getInstance().start(appname, address);
    }

ExecutorRegistryThread 类处理的就是执行器与调度中心的心跳,在首次启动时会立刻通知调度中心,其次,会与调度中心维护一个30s间隔的心跳,使调度中心能够了解各执行器的状态,在停止时也会调用 registryRemove 方法通知调度中心,使自己下线

代码语言:java
复制
 registryThread = new Thread(new Runnable() {
            @Override
            public void run() {

                // registry
                while (!toStop) {
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

                    try {
                        if (!toStop) {
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        }
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                        }
                    }
                }

                // registry remove
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                            }

                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");

            }
        });
        registryThread.setDaemon(true);
        registryThread.setName("xxl-job, executor ExecutorRegistryThread");
        registryThread.start();

调度中心提供了通用的restful接口,处理执行器的请求,当执行器注册时,会将注册器的信息保存在xxl_job_registry 表中,在移除时也会修改对应记录的信息

此外,为防止意外导致执行器不能正常下线下线,调度中心在启动时也维护了一个线程用来管理管理各执行器

在这个线程中会定期处理长时间未与调度中心保持心跳的执行器

总结

当执行器启动时,它通过Netty框架提供的服务暴露调度接口。通过调度中心的注册接口,执行器将自己的名称、IP地址和端口信息传输到调度中心,并每30秒更新注册信息。同样地,在执行器停止时,它会请求调度中心的注销接口进行注销。

调度中心提供注册和注销接口供执行器调用。当调度中心接收到注册或注销请求后,会更新xxl_job_registry表中的执行器注册信息。此外,调度中心还启动一个探活线程,定期检查90秒内未更新注册信息的执行器,并将其删除。

需要注意的是,以上流程仅涉及执行器的注册与发现,任务调度和回调相关的逻辑将在下一篇文章中进行讨论。

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 整体架构
  • 注册与销毁
  • 总结
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档