第四十章:基于SpringBoot & Quartz完成定时任务分布式多节点负载持久化

在上一章【第三十九章:基于SpringBoot & Quartz完成定时任务分布式单节点持久化】中我们已经完成了任务的持久化,当我们创建一个任务时任务会被quartz定时任务框架自动持久化到数据库,我们采用的是SpringBoot项目托管的dataSource来完成的数据源提供,当然也可以使用quartz内部配置数据源方式,我们的标题既然是提到了定时任务的分布式多节点,那么怎么才算是多节点呢?当有节点故障或者手动停止运行后是否可以自动漂移任务到可用的分布式节点呢?

本章目标

  1. 完成定时任务分布式多节点配置,当单个节点关闭时其他节点自动接管定时任务。
  2. 创建任务时传递自定义参数,方便任务处理后续业务逻辑。

SpringBoot 企业级核心技术学习专题

专题

专题名称

专题描述

001

Spring Boot 核心技术

讲解SpringBoot一些企业级层面的核心组件

002

Spring Boot 核心技术章节源码

Spring Boot 核心技术简书每一篇文章码云对应源码

003

Spring Cloud 核心技术

对Spring Cloud核心技术全面讲解

004

Spring Cloud 核心技术章节源码

Spring Cloud 核心技术简书每一篇文章对应源码

005

QueryDSL 核心技术

全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA

006

SpringDataJPA 核心技术

全面讲解SpringDataJPA核心技术

构建项目

注意:我们本章项目需要结合上一章共同完成,有一点要注意的是任务在持久化到数据库内时会保存任务的全路径,如:com.hengyu.chapter39.timers.GoodStockCheckTimerquartz在运行任务时会根据任务全路径去执行,如果不一致则会提示找不到指定类,我们本章在创建项目时package需要跟上一章完全一致。

我们这里就不去直接创建新项目了,直接复制上一章项目的源码为新的项目命名为Chapter40

配置分布式

在上一章配置文件quartz.properties中我们其实已经为分布式做好了相关配置,下面我们就来看一下分布式相关的配置。 分布式相关配置:

1. org.quartz.scheduler.instanceId : 定时任务的实例编号,如果手动指定需要保证每个节点的唯一性,因为quartz不允许出现两个相同instanceId的节点,我们这里指定为Auto就可以了,我们把生成编号的任务交给quartz

2. org.quartz.jobStore.isClustered: 这个属性才是真正的开启了定时任务的分布式配置,当我们配置为truequartz框架就会调用ClusterManager来初始化分布式节点。

3. org.quartz.jobStore.clusterCheckinInterval:配置了分布式节点的检查时间间隔,单位:毫秒。 下面是quartz.properties配置文件配置信息:

#调度器实例名称
org.quartz.scheduler.instanceName = quartzScheduler

#调度器实例编号自动生成
org.quartz.scheduler.instanceId = AUTO

#持久化方式配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

#持久化方式配置数据驱动,MySQL数据库
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix = QRTZ_

#开启分布式部署
org.quartz.jobStore.isClustered = true
#配置是否使用
org.quartz.jobStore.useProperties = false

#分布式节点有效性检查时间间隔,单位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 10000

#线程池实现类
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

#执行最大并发线程数量
org.quartz.threadPool.threadCount = 10

#线程优先级
org.quartz.threadPool.threadPriority = 5

#配置为守护线程,设置后任务将不会执行
#org.quartz.threadPool.makeThreadsDaemons=true

#配置是否启动自动加载数据库内的定时任务,默认true
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

当我们启动任务节点时,会根据org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread属性配置进行是否自动加载任务,默认true自动加载数据库内的任务到节点。

测试分布式

上一章项目节点名称:quartz-cluster-node-first 本章项目节点名称:quartz-cluster-node-second

由于我们quartz-cluster-node-first的商品库存检查定时任务是每隔30秒执行一次,所以任务除非手动清除否则是不会被清空的,在运行项目测试之前需要将application.yml配置文件的端口号、项目名称修改下,保证quartz-cluster-node-secondquartz-cluster-node-first端口号不一致,可以同时运行,修改后为:

spring:
    application:
        name: quzrtz-cluster-node-second
server:
  port: 8082

然后修改相应控制台输出,为了能够区分任务执行者是具体的节点。

Chapter40Application启动类修改日志输出:
logger.info("【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】");

GoodAddTimer商品添加任务类修改日志输出:
logger.info("分布式节点quartz-cluster-node-second,商品添加完成后执行任务,任务时间:{}",new Date());

GoodStockCheckTimer商品库存检查任务类修改日志输出:
logger.info("分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:{}",new Date());

下面我们启动本章项目,查看控制台输出内容,如下所示:

2017-11-12 10:28:39.969  INFO 11048 --- [           main] c.hengyu.chapter39.Chapter40Application  : 【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】
2017-11-12 10:28:41.930  INFO 11048 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now, after delay of 2 seconds
2017-11-12 10:28:41.959  INFO 11048 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_yuqiyu1510453719308 started.
2017-11-12 10:28:51.963  INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: detected 1 failed or restarted instances.
2017-11-12 10:28:51.963  INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: Scanning for instance "yuqiyu1510450938654"'s failed in-progress jobs.
2017-11-12 10:28:51.967  INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: ......Freed 1 acquired trigger(s).
2017-11-12 10:29:00.024  INFO 11048 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 10:29:00 CST 2017

可以看到项目启动完成后自动分配的instanceIdyuqiyu1510450938654,生成的规则是当前用户的名称+时间戳。然后ClusterManager分布式管理者自动介入进行扫描是否存在匹配的触发器任务,如果存在则会自动执行任务逻辑,而商品库存检查定时任务确实由quartz-cluster-node-second进行输出的。

测试任务自动漂移

下面我们也需要把quartz-cluster-node-first的输出进行修改,如下所示:

Chapter39Application启动类修改日志输出:
logger.info("【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】");

GoodAddTimer商品添加任务类修改日志输出:
logger.info("分布式节点quartz-cluster-node-first,商品添加完成后执行任务,任务时间:{}",new Date());

GoodStockCheckTimer商品库存检查任务类修改日志输出:
logger.info("分布式节点quartz-cluster-node-first,执行库存检查定时任务,执行时间:{}",new Date());

接下来我们启动quartz-cluster-node-first,并查看控制台的输出内容:

2017-11-12 10:34:09.750  INFO 192 --- [           main] c.hengyu.chapter39.Chapter39Application  : 【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】
2017-11-12 10:34:11.690  INFO 192 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now, after delay of 2 seconds
2017-11-12 10:34:11.714  INFO 192 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_yuqiyu1510454049066 started.

项目启动完成后,定时节点并没有实例化ClusterManager来完成分布式节点的初始化,因为quartz检测到有其他的节点正在处理任务,这样也是保证了任务执行的唯一性。

关闭quartz-cluster-node-second

我们关闭quartz-cluster-node-second运行的项目,预计的目的可以达到quartz-cluster-node-first会自动接管数据库内的任务,完成任务执行的自动漂移,我们来查看quartz-cluster-node-first的控制台输出内容:

2017-11-12 10:34:09.750  INFO 192 --- [           main] c.hengyu.chapter39.Chapter39Application  : 【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】
2017-11-12 10:34:11.690  INFO 192 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now, after delay of 2 seconds
2017-11-12 10:34:11.714  INFO 192 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_yuqiyu1510454049066 started.
2017-11-12 10:41:11.793  INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: detected 1 failed or restarted instances.
2017-11-12 10:41:11.793  INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: Scanning for instance "yuqiyu1510453719308"'s failed in-progress jobs.
2017-11-12 10:41:11.797  INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: ......Freed 1 acquired trigger(s).
2017-11-12 10:41:11.834  INFO 192 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-first,执行库存检查定时任务,执行时间:Sun Nov 12 10:41:11 CST 2017

控制台已经输出了持久的定时任务,输出节点是quartz-cluster-node-first,跟我们预计的一样,节点quartz-cluster-node-first完成了自动接管quartz-cluster-node-second的工作,而这个过程肯定有一段时间间隔,而这个间隔可以修改quartz.properties配置文件内的属性org.quartz.jobStore.clusterCheckinInterval进行调节。

关闭quartz-cluster-node-first

我们同样可以测试启动任务节点quartz-cluster-node-second后,再关闭quartz-cluster-node-first任务节点,查看quartz-cluster-node-second控制台的输出内容:

2017-11-12 10:53:31.010  INFO 3268 --- [           main] c.hengyu.chapter39.Chapter40Application  : 【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】
2017-11-12 10:53:32.967  INFO 3268 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now, after delay of 2 seconds
2017-11-12 10:53:32.992  INFO 3268 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_yuqiyu1510455210493 started.
2017-11-12 10:53:52.999  INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: detected 1 failed or restarted instances.
2017-11-12 10:53:52.999  INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: Scanning for instance "yuqiyu1510454049066"'s failed in-progress jobs.
2017-11-12 10:53:53.003  INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : ClusterManager: ......Freed 1 acquired trigger(s).
2017-11-12 10:54:00.020  INFO 3268 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 10:54:00 CST 2017

得到的结果是同样可以完成任务的自动漂移。

如果两个节点同时启动,哪个节点先把节点信息注册到数据库就获得了优先执行权。

传递参数到任务

我们平时在使用任务时,如果是针对性比较强的业务逻辑,肯定需要特定的参数来完成业务逻辑的实现。

下面我们来模拟商品秒杀的场景,当我们添加商品后自动添加一个商品提前五分钟的秒杀提醒,为关注该商品的用户发送提醒消息。 我们在节点quartz-cluster-node-first中添加秒杀提醒任务,如下所示:

package com.hengyu.chapter39.timers;

import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;

/**
 * 商品秒杀提醒定时器
 * 为关注该秒杀商品的用户进行推送提醒
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/12
 * Time:9:23
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public class GoodSecKillRemindTimer
extends QuartzJobBean
{
    /**
     * logback
     */
    private Logger logger = LoggerFactory.getLogger(GoodSecKillRemindTimer.class);

    /**
     * 任务指定逻辑
     * @param jobExecutionContext
     * @throws JobExecutionException
     */
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        //获取任务详情内的数据集合
        JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        //获取商品编号
        Long goodId = dataMap.getLong("goodId");
        
        logger.info("分布式节点quartz-cluster-node-first,开始处理秒杀商品:{},关注用户推送消息.",goodId);

        //.../
    }
}

在秒杀提醒任务逻辑中,我们通过获取JobDetailJobDataMap集合来获取在创建任务的时候传递的参数集合,我们这里约定了goodId为商品的编号,在创建任务的时候传递到JobDataMap内,这样quartz在执行该任务的时候就会自动将参数传递到任务逻辑中,我们也就可以通过JobDataMap获取到对应的参数值。

设置秒杀提醒任务

我们找到节点项目quartz-cluster-node-first中的GoodInfoService,编写方法buildGoodSecKillRemindTimer设置秒杀提醒任务,如下所示:

/**
     * 构建商品秒杀提醒定时任务
     * 设置五分钟后执行
     * @throws Exception
     */
    public void buildGoodSecKillRemindTimer(Long goodId) throws Exception
    {
        //任务名称
        String name = UUID.randomUUID().toString();
        //任务所属分组
        String group = GoodSecKillRemindTimer.class.getName();
        //秒杀开始时间
        long startTime = System.currentTimeMillis() + 1000 * 5 * 60;
        JobDetail jobDetail = JobBuilder
                .newJob(GoodSecKillRemindTimer.class)
                .withIdentity(name,group)
                .build();

        //设置任务传递商品编号参数
        jobDetail.getJobDataMap().put("goodId",goodId);

        //创建任务触发器
        Trigger trigger = TriggerBuilder.newTrigger().withIdentity(name,group).startAt(new Date(startTime)).build();
        //将触发器与任务绑定到调度器内
        scheduler.scheduleJob(jobDetail,trigger);
    }

我们模拟秒杀提醒时间是添加商品后的5分钟,我们通过调用jobDetail实例的getJobDataMap方法就可以获取该任务数据集合,直接调用put方法就可以进行设置指定key的值,该集合继承了StringKeyDirtyFlagMap并且实现了Serializable序列化,因为需要将数据序列化到数据库的qrtz_job_details表内。 修改保存商品方法,添加调用秒杀提醒任务:

    /**
     * 保存商品基本信息
     * @param good 商品实例
     * @return
     */
    public Long saveGood(GoodInfoEntity good) throws Exception
    {
        goodInfoRepository.save(good);
        //构建创建商品定时任务
        buildCreateGoodTimer();
        //构建商品库存定时任务
        buildGoodStockTimer();
        //构建商品描述提醒定时任务
        buildGoodSecKillRemindTimer(good.getId());
        return good.getId();
    }

添加测试商品

下面我们调用节点quartz-cluster-node-first的测试Chapter39ApplicationTests.addGood方法完成商品的添加,由于我们的quartz-cluster-node-second项目并没有停止,所以我们在quartz-cluster-node-second项目的控制台查看输出内容:

2017-11-12 11:45:00.008  INFO 11652 --- [ryBean_Worker-5] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:45:00 CST 2017
2017-11-12 11:45:30.013  INFO 11652 --- [ryBean_Worker-6] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:45:30 CST 2017
2017-11-12 11:45:48.230  INFO 11652 --- [ryBean_Worker-7] c.hengyu.chapter39.timers.GoodAddTimer   : 分布式节点quartz-cluster-node-second,商品添加完成后执行任务,任务时间:Sun Nov 12 11:45:48 CST 2017
2017-11-12 11:46:00.008  INFO 11652 --- [ryBean_Worker-8] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:46:00 CST 2017
2017-11-12 11:46:30.016  INFO 11652 --- [ryBean_Worker-9] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:46:30 CST 2017
2017-11-12 11:47:00.011  INFO 11652 --- [yBean_Worker-10] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:47:00 CST 2017
2017-11-12 11:47:30.028  INFO 11652 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:47:30 CST 2017
2017-11-12 11:48:00.014  INFO 11652 --- [ryBean_Worker-2] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:48:00 CST 2017
2017-11-12 11:48:30.013  INFO 11652 --- [ryBean_Worker-3] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:48:30 CST 2017
2017-11-12 11:49:00.010  INFO 11652 --- [ryBean_Worker-4] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:49:00 CST 2017
2017-11-12 11:49:30.028  INFO 11652 --- [ryBean_Worker-5] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:49:30 CST 2017
2017-11-12 11:49:48.290  INFO 11652 --- [ryBean_Worker-6] c.h.c.timers.GoodSecKillRemindTimer      : 分布式节点quartz-cluster-node-second,开始处理秒杀商品:15,关注用户推送消息.
2017-11-12 11:50:00.008  INFO 11652 --- [ryBean_Worker-7] c.h.c.timers.GoodStockCheckTimer         : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:50:00 CST 2017

秒杀任务在添加完成商品后的五分钟开始执行的,而我们也正常的输出了传递过去的goodId商品编号的参数,而秒杀提醒任务执行一次后也被自动释放了。

总结

本章主要是结合上一章完成了分布式任务的讲解,完成了测试持久化的定时任务自动漂移,以及如何向定时任务传递参数。当然在实际的开发过程中,任务创建是需要进行封装的,目的也是为了减少一些冗余代码以及方面后期统一维护定时任务。

本章源码已经上传到码云: SpringBoot配套源码地址:https://gitee.com/hengboy/spring-boot-chapter SpringCloud配套源码地址:https://gitee.com/hengboy/spring-cloud-chapter SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录 SpringBoot相关文章请访问:目录:SpringBoot学习目录,感谢阅读!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

如何使用CDSW在CDH集群通过sparklyr提交R的Spark作业

继上一章介绍如何使用R连接Hive与Impala后,Fayson接下来讲讲如何在CDH集群中提交R的Spark作业,Spark自带了R语言的支持,在此就不做介绍...

3516
来自专栏Kubernetes

原 荐 基于Kubernetes的ESaaS

概述 ESaaS(ElasticSearch as a Service)是ElasticSearch on Kubernetes的产品实现,是利用Docker和...

2938
来自专栏惨绿少年

分布式文件系统---GlusterFS

1.1 分布式文件系统 1.1.1 什么是分布式文件系统   相对于本机端的文件系统而言,分布式文件系统(英语:Distributed file system,...

3487
来自专栏Java架构

Dubbo架构原理

Remoting:远程通讯,提供对多种NIO框架抽象封装,包括“同步转异步”和“请求-响应”模式的信息交换方式。

1123
来自专栏BeJavaGod

阿里巴巴Druid数据源,史上最强的数据源,没有之一

目前常用的数据源主要有c3p0、dbcp、proxool、druid,先来说说他们 Spring 推荐使用dbcp; Hibernate 推荐使用c3p0和pr...

3499
来自专栏喵了个咪的博客空间

[喵咪Redis]Redis安装与介绍

[喵咪Redis]Redis安装与介绍 ? 前言 哈喽大家好啊,这次要来和大家一起来了解学习Redis的一系列技术,最终目的是搭建一个高可用redis集群自动负...

3657
来自专栏编程坑太多

跟我学习springmvc+dubbo-简介(一)

964
来自专栏MongoDB中文社区

MongoDB 认证鉴权那点事

为了体验Mongodb 的权限管理,我们找一台已经安装好的Mongodb,可以参见这里搭建一个单节点的Mongodb。

1202
来自专栏程序猿DD

为Spring Cloud Ribbon配置请求重试【Camden.SR2+】

当我们使用Spring Cloud Ribbon实现客户端负载均衡的时候,通常都会利用@LoadBalanced来让RestTemplate具备客户端负载功能,...

1979
来自专栏实用工具入门教程

如何部署 Redis 集群

Redis 是我们目前大规模使用的缓存中间件,由于它强大高效而又便捷的功能,得到了广泛的使用。单节点的Redis已经就达到了很高的性能,为了提高可用性我们可以使...

1144

扫码关注云+社区