前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >开源框架分布式任务调度xxl-job

开源框架分布式任务调度xxl-job

作者头像
疯狂的KK
发布2021-01-18 12:22:50
7330
发布2021-01-18 12:22:50
举报
文章被收录于专栏:Java项目实战Java项目实战

官网

代码语言:javascript
复制
https://www.xuxueli.com/xxl-job/

是什么

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用

特点

弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;

任务失败告警;默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式

后台界面可视化

去哪下

代码语言:javascript
复制
https://gitee.com/xuxueli0323/xxl-job.git

中央仓库地址

代码语言:javascript
复制
  1. <!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
  2. <dependency>
  3. <groupId>com.xuxueli</groupId>
  4. <artifactId>xxl-job-core</artifactId>
  5. <version>${最新稳定版本}</version>
  6. </dependency>

怎么玩

1.0初始化“调度数据库”

请下载项目源码并解压,获取 “调度数据库初始化SQL脚本” 并执行即可。

“调度数据库初始化SQL脚本” 位置为:

代码语言:javascript
复制
/xxl-job/doc/db/tables_xxl_job.sql

调度中心支持集群部署,集群情况下各节点务必连接同一个mysql实例;

如果mysql做主从,调度中心集群节点务必强制走主库;

官网demo部署非常简单,执行完sql后按着官网给出配置更改。然后用Maven下载组件jar包启动,支持docker部署

后台页面

架构设计

将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。

将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。

因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

架构图

xxl-job vs quartz

Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:

  • 问题一:调用API的的方式操作任务,不人性化;
  • 问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
  • 问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务;
  • 问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。

XXL-JOB弥补了quartz的上述不足之处

详细功能请参考官网

源码分析

代码语言:javascript
复制
package com.xxl.job.admin.core.trigger;


public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {

                            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);    

                               }

看下processTrigger

代码语言:javascript
复制
// 4、trigger remote executor
        ReturnT<String> triggerResult = null;
if (address != null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }

远程执行方法triggerResult = runExecutor(triggerParam, address);

初始化参数,获取执行策略分片广播

执行参数

代码语言:javascript
复制
/**
     * run executor
     * @param triggerParam
     * @param address
     * @return
     */
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
return runResult;
    }

getExecutorBiz方法

以前版本的同样方法

代码语言:javascript
复制
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
//        // valid
//        if (address==null || address.trim().length()==0) {
//            return null;
//        }
//
//        // load-cache
//        address = address.trim();
//        ExecutorBiz executorBiz = executorBizRepository.get(address);
//        if (executorBiz != null) {
//            return executorBiz;
//        }
//
//        // set-cache
//        executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
//                NetEnum.NETTY_HTTP,
//                Serializer.SerializeEnum.HESSIAN.getSerializer(),
//                CallType.SYNC,
//                LoadBalance.ROUND,
//                ExecutorBiz.class,
//                null,
//                5000,
//                address,
//                XxlJobAdminConfig.getAdminConfig().getAccessToken(),
//                null,
//                null).getObject();
//
//        executorBizRepository.put(address, executorBiz);
//        return executorBiz;
//    }

实例化bean时使用序列化协议

代码语言:javascript
复制
Serializer.SerializeEnum.HESSIAN.getSerializer()

且使用NetEnum.NETTY_HTTP,NETTY客户端

CallType.SYNC 采用同步的请求方式, LoadBalance.ROUND 负载使用轮询的方式

返回来我们接着看runExecutor执行run方法时

代码语言:javascript
复制
/**
     * run executor
     * @param triggerParam
     * @param address
     * @return
     */
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
return runResult;
    }

不知道你们干没干过代码申诉的活,这让我想起了申诉时被周末加班支配的恐惧

这里封装了Netty的调用方法,把老版本的通过连接池获取 NettyHttpConnectClient 连接,调用其send方法发送请求进行了封装,不过老版本的代码只是注释了,还能看见,所以网上很多资料发现看不到代码,全局搜一下就可以了。

总结

xxl-job 底层就是通过封装quartz+netty http封装的rpc框架来完成每一次分布式定时任务的执行

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 赵KK日常技术记录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.0初始化“调度数据库”
  • 架构设计
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档