前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式任务调度利器—Xxl-job框架详解

分布式任务调度利器—Xxl-job框架详解

作者头像
灰小猿
发布2024-05-25 08:53:01
6100
发布2024-05-25 08:53:01
举报

hello,大家好,我是灰小猿

近期开发中的功能中需要用到定时任务来做数据库的备份和文件的定时删除,所以调研了当前比较主流的几个定时任务框架,经过对比选定了今天要讲的xxl-job,所以这篇文章,我主要和大家分享一下xxl-job的学习总结,记录一下在分布式项目下如何优雅的使用xxl-job实现定时任务。

现有定时任务框架对比

在使用xxl-job之前也对市面上现有的一些定时任务框架做了对比,具体如下:

功能

quartz

Elastic-job

Xxl-job

任务分片

不支持

支持

支持

文档完善

完善

完善

完善

界面管理

难易程度

简单

较复杂

简单

公司

OpenSymphony

当当网

个人(大众点评)

缺点

没有管理界面,不支持任务分片,不适用于分布式场景,需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。

需要引入Zookeeper,增加系统复杂度,使用成本高。

通过获取数据库锁的方式,保证集群中执行任务的唯一性,但性能不好

为什么选择 Xxl-job?

考虑到我们是分布式项目,且定时任务的业务处理相对来说比较独立,所以在选择定时任务框架时,我们需要考虑一些关键因素,而 Xxl-job 恰好满足这些需求:

  1. 可视化管理界面:Xxl-job 提供直观、易用的可视化管理界面,让我们能够方便地管理和监控定时任务。
  2. 分布式任务调度:对于分布式系统,Xxl-job 提供了强大的分布式任务调度能力,可以轻松地实现任务在集群中的分发和执行。
  3. 任务执行日志:Xxl-job 支持任务执行日志的记录和查看,这有助于及时发现和解决任务执行中的问题。
  4. 支持动态增删任务:Xxl-job允许在项目运行或上线后动态添加和删除任务,无需停止整个服务。
  5. 代码入侵较低:定时任务相对独立,代码入侵比较严重的框架肯定是不会考虑的,而xxl-job完全采用注解方式,使用成本低,且兼容性较好。

综上所述,我们最终确定以xxl-job作为我们的定时任务框架。

接下来和大家介绍一下xxl-job的基本原理和实际使用:

xxl-job简介

XXL-JOB是一个轻量级分布式任务调度平台。特点是平台化,易部署,开发迅速、学习简单、轻量级、易扩展。

它主要是由调度中心和执行器功能完成定时任务的执行。其中调度中心负责统一调度,执行器负责接收调度并执行。并且 现在xxl-job已经开放源代码并接入多家公司的线上产品线,开箱即用。所以相对来说可以参考的资料很多,技术实现也相对比较成熟,xxl-job提供了非常完善的帮助文档,感兴趣的小伙伴也可以去阅读以下:

分布式任务调度平台XXL-JOB

Xxl-job设计思想:

xxl-job的设计思想可以与Nacos类似,xxl-job的调度中心就可以看做是nacos的注册中心,nacos是将一个个服务注册到nacos注册中心,而xxl-job是将一个个定时任务注册到“调度中心”。

  • Xxl-job将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
  • 将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
  • 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

Xxl-job的主要架构

调度模块(调度中心)

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;

支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

调度中心界面:

执行模块(执行器)

负责接收“调度中心”的调度并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。

以下是Xxl-job整体的项目架构图:

Xxl-Job架构图

xxl-job工作原理及使用步骤

步骤一:拉取Xxl-job项目到本地

首先去gitee拉取xxl-job的项目源码,建议选用较新且稳定的版本,我选择的是V2.2.0版本,

项目地址:https://gitee.com/xuxueli0323/xxl-job

导入项目到IDEA之后,项目结构如下:

步骤二;配置调度中心

首先在doc文件夹下找到源码中带的sql脚本文件,在本地mysql下运行该脚本文件,之后就可以生成一个调度中心的数据库,该数据库主要用于对后面调度中心配置的一些数据进行存储和统计。

sql脚本中的数据表功能说明如下:

xxl-job数据库表介绍

lxxl_job_group:执行器信息表,维护任务执行器信息; lxxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等; lxxl_job_lock:任务调度锁表,分布式环境下,为了确保同一时间只有一个节点在执行同一个任务,需要使用分布式锁来实现任务的互斥执行 ; lxxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等; lxxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到; lxxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能; lxxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息; lxxl_job_user:系统用户表;

  • lxxl_job_group:执行器信息表,维护任务执行器信息;
  • lxxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
  • lxxl_job_lock:任务调度锁表,分布式环境下,为了确保同一时间只有一个节点在执行同一个任务,需要使用分布式锁来实现任务的互斥执行 ;
  • lxxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
  • lxxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
  • lxxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
  • lxxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
  • lxxl_job_user:系统用户表;
配置中心主要配置

数据库生成完毕后,需要去调度中心的配置文件中对调度中心进行配置,

配置中心主要配置如下:

代码语言:javascript
复制
### xxl-job, datasource  调度中心数据库连接,连接本地数据库
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

### datasource-pool  数据库连接池配置
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.maximum-pool-size=30
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=HikariCP
spring.datasource.hikari.max-lifetime=900000
spring.datasource.hikari.connection-timeout=10000
spring.datasource.hikari.connection-test-query=SELECT 1

### xxl-job, email  配置邮箱服务器,用于执行或调度异常时发送报警邮件
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory

### xxl-job, access token  认证token,这个token要和执行器的token一致,用于调度中心和执行器互相认证
xxl.job.accessToken=

### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en")  国际化配置
xxl.job.i18n=zh_CN

## xxl-job, triggerpool max size  线程池配置
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100

### xxl-job, log retention days  调度中心日志文件保存天数,默认是大于等于7天时生效,小于7或为-1时默认是不保存日志
xxl.job.logretentiondays=30

至此,调度中心就配置完成了,直接启动admin服务,访问默认地址:

http://127.0.0.1:8080/xxl-job-admin,(该地址执行器将会使用到,作为回调地址)

即可进入调度中心首页,默认登录账号 “admin/123456”, 登录后运行界面如下图所示:

步骤三:配置执行器

代码语言:javascript
复制
# 服务器端口
server.port=8081
# no web
#spring.main.web-environment=false

# log config
logging.config=classpath:logback.xml

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
### 调度中心地址,用于将定时任务注册到该地址上去
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job, access token  认证token,这个token要和执行器的token一致,用于调度中心和执行器互相认证
xxl.job.accessToken=

### xxl-job executor appname  执行器名称
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
### 执行器执行定时任务的地址,这个地址和下面的ip、port选择配置即可
xxl.job.executor.address=
### xxl-job executor server-info  执行器执行定时任务的地址和端口号,这个端口号和上面执行器服务的端口号不能一样
xxl.job.executor.ip=127.0.0.1
xxl.job.executor.port=9999
### xxl-job executor log-path  操作日志保存地址
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days  执行器端的操作日志保存天数,大于等于3时生效,小于3或为-1时默认不保存
xxl.job.executor.logretentiondays=30
执行器实现原理

执行器实际上是一个内嵌的Server,默认端口9999(配置项:xxl.job.executor.port)。

在项目启动时,执行器会通过“@JobHandler”识别Spring容器中“Bean模式任务”,以注解的value属性为key管理起来。

“执行器”接收到“调度中心”的调度请求时,如果任务类型为“Bean模式”,将会匹配Spring容器中的“Bean模式任务”,然后调用其execute方法,执行任务逻辑。

如果任务类型为“GLUE模式”,将会加载GLUE代码,实例化Java对象,注入依赖的Spring服务(注意:Glue代码中注入的Spring服务,必须存在与该“执行器”项目的Spring容器中),然后调用execute方法,执行任务逻辑。

步骤四:编写任务执行代码

编写任务执行代码有两种方式,一种是通过Bean模式在后台编写任务代码,另一种则是通过GLUE模式直接在调度中心写任务脚本,下面我们分别介绍下在每一种方式下的使用。

任务运行模式
Bean模式(类形式)

Bean模式任务,支持基于类的开发方式,每个任务对应一个Java类。

优点:

  • 不限制项目环境,兼容性好。即使是无框架项目,如main方法直接启动的项目也可以提供支持,

缺点:

  • 每个任务需要占用一个Java类,造成类的浪费;
  • 不支持自动扫描任务并注入到执行器容器,需要手动注入。

这种方式一般适用于非框架项目,具体开发步骤如下:

1、在执行器(也可能是你的项目模块)的properties文件中配置xxl.job的相关配置

2、实现定时任务的类继承"IJobHandler":“com.xxl.job.core.handler.IJobHandler”;

3、在execute()方法中编写对应的任务代码

代码语言:javascript
复制
package com.xuxueli.executor.sample.frameless.jobhandler;

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobLogger;

import java.util.concurrent.TimeUnit;

/**
 * 任务Handler示例(Bean模式)
 *
 * 开发步骤:
 * 1、继承"IJobHandler":“com.xxl.job.core.handler.IJobHandler”;
 * 2、注册到执行器工厂:在 "JFinalCoreConfig.initXxlJobExecutor" 中手动注册,注解key值对应的是调度中心新建任务的JobHandler属性的值。
 * 3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
 *
 * @author xuxueli 2015-12-19 19:43:36
 */
public class DemoJobHandler extends IJobHandler {

   @Override
   public ReturnT<String> execute(String param) throws Exception {
      XxlJobLogger.log("XXL-JOB, Hello World.");

      for (int i = 0; i < 5; i++) {
         XxlJobLogger.log("beat at:" + i);
         TimeUnit.SECONDS.sleep(2);
      }
      return SUCCESS;
   }

}

4、编写FrameLessXxlJobConfig配置文件,用于注册定时任务类

5、注册到执行器工厂:在 "JFinalCoreConfig.initXxlJobExecutor" 中手动注册,注解key值对应的是调度中心新建任务的JobHandler属性的值。3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;

代码语言:javascript
复制
package com.xuxueli.executor.sample.frameless.config;

import com.xuxueli.executor.sample.frameless.jobhandler.CommandJobHandler;
import com.xuxueli.executor.sample.frameless.jobhandler.DemoJobHandler;
import com.xuxueli.executor.sample.frameless.jobhandler.HttpJobHandler;
import com.xuxueli.executor.sample.frameless.jobhandler.ShardingJobHandler;
import com.xxl.job.core.executor.XxlJobExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;

/**
 * @author xuxueli 2018-10-31 19:05:43
 */
public class FrameLessXxlJobConfig {
    private static Logger logger = LoggerFactory.getLogger(FrameLessXxlJobConfig.class);


    private static FrameLessXxlJobConfig instance = new FrameLessXxlJobConfig();
    public static FrameLessXxlJobConfig getInstance() {
        return instance;
    }


    private XxlJobExecutor xxlJobExecutor = null;

    /**
     * init
     */
    public void initXxlJobExecutor() {

        // registry jobhandler
        XxlJobExecutor.registJobHandler("demoJobHandler", new DemoJobHandler());
        XxlJobExecutor.registJobHandler("shardingJobHandler", new ShardingJobHandler());
        XxlJobExecutor.registJobHandler("httpJobHandler", new HttpJobHandler());
        XxlJobExecutor.registJobHandler("commandJobHandler", new CommandJobHandler());

        // load executor prop
        Properties xxlJobProp = loadProperties("xxl-job-executor.properties");


        // init executor
        xxlJobExecutor = new XxlJobExecutor();
        xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
        xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
        xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
        xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
        xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
        xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
        xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
        xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));

        // start executor
        try {
            xxlJobExecutor.start();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    /**
     * destory
     */
    public void destoryXxlJobExecutor() {
        if (xxlJobExecutor != null) {
            xxlJobExecutor.destroy();
        }
    }


    public static Properties loadProperties(String propertyFileName) {
        InputStreamReader in = null;
        try {
            ClassLoader loder = Thread.currentThread().getContextClassLoader();

            in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");;
            if (in != null) {
                Properties prop = new Properties();
                prop.load(in);
                return prop;
            }
        } catch (IOException e) {
            logger.error("load {} error!", propertyFileName);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    logger.error("close {} error!", propertyFileName);
                }
            }
        }
        return null;
    }

}
Bean模式(方法形式)

Bean模式任务,支持基于方法的开发方式,每个任务对应一个方法。

优点:

  • 每个任务只需要开发一个方法,并添加”@XxlJob”注解即可,更加方便、快速。支持自动扫描任务并注入到执行器容器。

缺点:

  • 要求Spring容器环境;

在Bean模式下采用方法形式编写定时任务的方式对spring项目更加的友好,而且使用也更加简单方便,所以一般推荐使用这种方式来编写定时任务。

实现步骤大体上和类形式类似,但是不同点就是在于采用方法形式不需要每一个任务都建立一个新的类。具体的开发步骤如下:

1、在执行器(也可能是你的项目模块)的properties或yaml文件中配置xxl.job的相关配置,具体配置在上面执行器配置那里已经说过了

2、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param)" 3、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。

代码语言:javascript
复制
package com.xxl.job.executor.service.jobhandler;

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob开发示例(Bean模式)
 *
 * 开发步骤:
 * 1、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param)"
 * 2、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
 * 3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
 *
 * @author xuxueli 2019-12-11 21:52:51
 */
@Component
public class SampleXxlJob {
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);


    /**
     * 1、简单任务示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public ReturnT<String> demoJobHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) {
            XxlJobLogger.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        return ReturnT.SUCCESS;
    }


    /**
     * 2、分片广播任务
     */
    @XxlJob("shardingJobHandler")
    public ReturnT<String> shardingJobHandler(String param) throws Exception {

        // 分片参数
        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());

        // 业务逻辑
        for (int i = 0; i < shardingVO.getTotal(); i++) {
            if (i == shardingVO.getIndex()) {
                XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
            } else {
                XxlJobLogger.log("第 {} 片, 忽略", i);
            }
        }

        return ReturnT.SUCCESS;
    }


    /**
     * 3、命令行任务
     */
    @XxlJob("commandJobHandler")
    public ReturnT<String> commandJobHandler(String param) throws Exception {
        String command = param;
        int exitValue = -1;

        BufferedReader bufferedReader = null;
        try {
            // command process
            Process process = Runtime.getRuntime().exec(command);
            BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
            bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

            // command log
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                XxlJobLogger.log(line);
            }

            // command exit
            process.waitFor();
            exitValue = process.exitValue();
        } catch (Exception e) {
            XxlJobLogger.log(e);
        } finally {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
        }

        if (exitValue == 0) {
            return IJobHandler.SUCCESS;
        } else {
            return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
        }
    }


    /**
     * 4、跨平台Http任务
     *  参数示例:
     *      "url: http://www.baidu.com\n" +
     *      "method: get\n" +
     *      "data: content\n";
     */
    @XxlJob("httpJobHandler")
    public ReturnT<String> httpJobHandler(String param) throws Exception {

        // param parse
        if (param==null || param.trim().length()==0) {
            XxlJobLogger.log("param["+ param +"] invalid.");
            return ReturnT.FAIL;
        }
        String[] httpParams = param.split("\n");
        String url = null;
        String method = null;
        String data = null;
        for (String httpParam: httpParams) {
            if (httpParam.startsWith("url:")) {
                url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
            }
            if (httpParam.startsWith("method:")) {
                method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
            }
            if (httpParam.startsWith("data:")) {
                data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
            }
        }

        // param valid
        if (url==null || url.trim().length()==0) {
            XxlJobLogger.log("url["+ url +"] invalid.");
            return ReturnT.FAIL;
        }
        if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
            XxlJobLogger.log("method["+ method +"] invalid.");
            return ReturnT.FAIL;
        }

        // request
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try {
            // connection
            URL realUrl = new URL(url);
            connection = (HttpURLConnection) realUrl.openConnection();

            // connection setting
            connection.setRequestMethod(method);
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(5 * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            // do connection
            connection.connect();

            // data
            if (data!=null && data.trim().length()>0) {
                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                dataOutputStream.write(data.getBytes("UTF-8"));
                dataOutputStream.flush();
                dataOutputStream.close();
            }

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) {
                throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
            }

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                result.append(line);
            }
            String responseMsg = result.toString();

            XxlJobLogger.log(responseMsg);
            return ReturnT.SUCCESS;
        } catch (Exception e) {
            XxlJobLogger.log(e);
            return ReturnT.FAIL;
        } finally {
            try {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (connection != null) {
                    connection.disconnect();
                }
            } catch (Exception e2) {
                XxlJobLogger.log(e2);
            }
        }

    }

    /**
     * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
     */
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public ReturnT<String> demoJobHandler2(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");
        return ReturnT.SUCCESS;
    }
    public void init(){
        logger.info("init");
    }
    public void destroy(){
        logger.info("destory");
    }


}

4、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;

5、编写XxlJobConfig配置文件,用于读取配置文件中配置的XXL-JOB配置

代码语言:javascript
复制
package com.xxl.job.executor.core.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


}

参考上述方法就可以在采用Bean模式在代码中编写定时任务了,

那么采用这种方式实现的定时任务,他具体的工作原理是怎样的呢?

实现原理

每个Bean模式任务都是一个Spring的Bean类实例,它被维护在“执行器”项目的Spring容器中。任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。 任务类需要继承统一接口“IJobHandler”,任务逻辑在execute方法中开发,因为“执行器”在接收到调度中心的调度请求时,将会调用“IJobHandler”的execute方法,执行任务逻辑。 上面说了,除了在项目中编写定时任务的代码外,还有一种直接在调度中心编写定时任务脚本的方式,这种方式的好处就是即使你的项目已经上线,仍然可以添加或删除定时任务,无需暂停项目。

GLUE模式(Java)

GLUE模式任务,任务以源码方式维护在调度中心,支持通过Web IDE在线更新,实时编译和生效,因此不需要指定JobHandler。

开发步骤:

1、调度中心新建调度任务

2、进入GLUE IDE,直接在调度中心开发任务代码(支持代码版本回溯)

3、开启执行即可启动任务

GLUE模式(Shell、Python、PHP、NodeJS、Powershell)

GLUE模式任务的其他语言脚本,执行步骤和Java脚本一样,在这里就不做赘述了。

开发步骤:

  1. 调度中心新建调度任务
  2. 直接在调度中心开发对应脚本
  3. 开启执行

那么这种采用在调度中心直接编写脚本的方式来执行定时任务的方法,具体的实现原理是什么呢?

实现原理

脚本任务的源码托管在调度中心,脚本逻辑在执行器运行。当触发脚本任务时,执行器会加载脚本源码在执行器机器上生成一份脚本文件,然后通过Java代码调用该脚本;并且实时将脚本输出日志写到任务日志文件中,从而在调度中心可以实时监控脚本运行情况;

步骤五:注册任务到调度中心

注册任务到调度中心的操作主要是针对于采用Bean模式编写执行任务的情况,因为这种情况下在没有注册任务到调度中心的时候,调度中心是不知道要将哪些任务作为调度任务的,

GLUE模式之所以不需要这一步骤是因为GLUE模式在新建任务的时候就是一个注册任务到调度中心的过程了。

以Bean模式下注册任务为例:

1、首先在界面右侧点击新建,新建任务。

2、在弹出的新建任务窗口中,选择BEAN模式,这时对应右侧的JobHandler输入框是使能的,此时在这里输入你要注册的任务的唯一ID,也就是你在后台代码中标识这个任务时使用的名称。

3、填写完其他必填项之后,任务就注册成功了。

下面就是使用调度中心去执行任务,

步骤六:调度中心执行任务

无论是BEAN模式还是GLUE模式下新建(注册)的任务,在新建完成后,任务都是不会立即执行的,所以需要我们在想要执行任务的时候手动的去启动任务,

启动任务的操作如下:

查看任务调度日志

任务执行之后,想要查看任务执行成功或者失败,可以去“调度日志”模块查看,点击调度备注列的查看,也可以查看具体的失败原因。

以上就是整个调度任务的执行流程,对上面的流程做一下总结:

一次完整的任务调度通讯流程
  1. 首先,调度中心将任务信息发送给执行器。这里的任务信息包括任务ID、任务参数等。执行器收到任务后会将该任务信息存储在自己的任务队列中,等待执行。
  2. 执行器不断地从任务队列中取出任务,并执行。执行过程中,执行器会将任务的执行状态和执行结果发送给调度中心。任务状态包括“正在执行”、“执行成功”、“执行失败”等。
  3. 调度中心接收到执行器发送的任务状态和执行结果后,会更新自己的任务状态。如果任务执行成功,则调度中心会将任务标记为“已完成”,否则会将任务标记为“执行失败”。
  4. 在任务执行过程中,执行器会定时向调度中心发送心跳包,以确保执行器与调度中心之间的连接正常。

Xxl-job使用场景

下面列出了在实际项目中可能会使用到Xxl-job来作为分布式任务框架执行定时任务的场景,都是为了让业务之外的操作变得更加的简单高效。具体如下:

  1. 日志处理:当系统产生大量日志文件时,通过XXL-JOB创建定时任务,定期将日志文件进行压缩、归档或上传到云存储等操作
  2. 脚本执行:即使服务已经上线,仍然支持多种格式脚本执行。
  3. 定期数据备份
  4. 定期删除旧文件
  5. 定时发送邮件等

以上就是关于分布式调度任务XXL-JOB使用的全部教程及原理分享,有问题的小伙伴可以留言或私信我一起学习。

我是灰小猿,我们下期见!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-02-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 现有定时任务框架对比
  • 为什么选择 Xxl-job?
  • xxl-job简介
    • Xxl-job设计思想:
      • Xxl-job的主要架构
        • 调度模块(调度中心)
        • 执行模块(执行器)
        • Xxl-Job架构图
    • xxl-job工作原理及使用步骤
      • 步骤一:拉取Xxl-job项目到本地
        • 步骤二;配置调度中心
          • xxl-job数据库表介绍
          • 配置中心主要配置
        • 步骤三:配置执行器
          • 执行器实现原理
        • 步骤四:编写任务执行代码
          • 任务运行模式
        • 步骤五:注册任务到调度中心
          • 步骤六:调度中心执行任务
            • 查看任务调度日志
            • 一次完整的任务调度通讯流程
        • Xxl-job使用场景
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档