前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Quartz分布式调度存在的性能问题分析

Quartz分布式调度存在的性能问题分析

作者头像
吴就业
发布2021-12-20 21:45:12
2.2K0
发布2021-12-20 21:45:12
举报
文章被收录于专栏:Java艺术Java艺术

Quartz分布式调度的实现是去中心化的,需要依赖数据库在集群间同步调度状态,基于分布式锁实现一致性调度,而我们当前使用的xxl-job版本(1.9.x)的分布式调度又是基于Quartz实现,因此我们所了解到的xxl-job调度性能差,本质上就是Quartz的调度性能差。

在任务量只有少量,且没有秒级调度任务的情况下,我们是看不到quartz的性能缺陷的,在任务量明显增加情况下,我们就会发现,调度延迟会有明显增加,特别是秒级任务,尽管我们横向扩展节点,秒级任务的调度延迟也不会降低,且整体调度性能没有明显好转,反而更糟糕。

我们将带着这些疑惑,分析Quartz 的分布式调度原理,以及过时恢复、故障恢复的实现原理。

核心调度流程分析

核心流程代码在org.quartz.core.QuartzSchedulerThread类的run方法,即首先是调度线程(只有一个线程)启动,QuartzSchedulerThread实例的run执行,源码如下。

代码语言:javascript
复制
class QuartzSchedulerThread{
    @Override
    public void run() {
        int acquiresFailed = 0;
        while (!halted.get()) {
            // .....
                    (1)
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) {

                    // ....
                    (2)                
                    try {
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                       
                    } //.....

                    if (triggers != null && !triggers.isEmpty()) {
                        (3)        
                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 2) {
                            // wait......
                        }

                        // .....
                        if(goAhead) {
                            (4)
                            try {
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } //....
                        }
                        (5)
                        for (int i = 0; i < bndles.size(); i++) {
                           // ..........
                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
        
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                (6)
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }

                //....
            
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }
}

流程分析:

  • 1.检查当前线程池(业务线程池)可用数量是否大于0,大于0才执行调度逻辑;
  • 2、获取分布式锁(LOCK_TRIGGER_ACCESS),然后根据配置的maxBatchSize(默认maxBatchSize为1)获取30秒内需要调度的任务(job下次执行时间在30秒内),根据执行时间排序(在数据库查询时排序),最后释放分布式锁(LOCK_TRIGGER_ACCESS);

在获取job时,会将状态由WAITING更新为ACQUIRED,更新成功才表示当时节点获取到了调度的权限;

  • 3、遍历调度这一批次任务,休眠等待到当前毫秒"至少有一个"job(批量)需要触发执行;
  • 4、获取分布式锁(LOCK_TRIGGER_ACCESS),批量触发调度,for遍历修改job状态,由ACQUIRED改为EXECUTING状态,如果修改成功,则更新cron下一次执行时间以及状态恢复为WAITING,最后释放分布式锁(LOCK_TRIGGER_ACCESS);
  • 5、for遍历执行这一批次job,第(4)步将Job状态由ACQUIRED改为EXECUTING状态成功的Job,获取线程池,将Job放入线程池执行;
  • 6、如果Job放入线程池失败,就获取一次分布式锁(LOCK_TRIGGER_ACCESS),修改Job状态为ERROR,然后释放锁;

假设只部署一个节点,且maxBatchSize最大为1,即不考虑分布式锁、不考虑批量的情况,那么quartz的调度流程可以简化为:

  • 1、查询数据库,获取需要调度的job,并更新job的状态,由WAITING更新为ACQUIRED;
  • 2、更新job状态,由ACQUIRED改为EXECUTING状态,如果修改成功,则更新cron下一次执行时间以及状态恢复为WAITING;
  • 3、如果第(2)步成功,则获取线程池,将Job放入线程池,线程池分配线程调用Job的execute方法执行Job;
  • 4、继续循环步骤1;

故障恢复流程分析

故障恢复与错过恢复的启动入口都在JobStoreSupport#schedulerStarted方法中,由QuartzScheduler#start方法调用,源码如下。

代码语言:javascript
复制
class JobStoreSupport{
     public void schedulerStarted()throws SchedulerException{
        (1)
        if(isClustered()){
         //...
         clusterManagementThread.initialize();
        }else{
             (2)
             try{
                 recoverJobs();
             }//....
        }
        (3)
        misfireHandler=new MisfireHandler();
        // ...
        misfireHandler.initialize();
        schedulerRunning=true;
        //...
     }
 }
  • (1)判断是否是集群模式,如果是则启动集群管理线程;
  • (2)非集群模式当前线程执行recoverJobs方法恢复;
  • (3)启动过时恢复处理线程;

分布式调度下的故障恢复流程:

  • 定时每clusterCheckinInterval毫秒检测掉线的节点;
  • 如果有节点掉线,加分布式锁(LOCK_TRIGGER_ACCESS),获取掉线的节点的调度记录;
  • 如果调度状态为ACQUIRED,则将Job状态设置为WAITING;
  • 释放分布式锁(LOCK_TRIGGER_ACCESS);

错过恢复流程分析

错过的原因主要有以下几种情况:

  • 1、暂停调度后恢复调度;
  • 2、线程池被用满之后可能会出现;
  • 3、单节点重启;

错过恢复流程:

  • 定时每misfireThreshold毫秒 (超时阈值) ,获取分布式锁(LOCK_TRIGGER_ACCESS);
  • 查询WAITING状态且下次调度时间小于(当前时间 - misfireThreshold)的,截取maxMisfiresToHandleAtATime(数量)条记录;
  • 根据策略:MISFIRE_INSTRUCTION_DO_NOTHING或MISFIRE_INSTRUCTION_FIRE_ONCE_NOW, 更新这些misfires记录的下次调度时间,如果是MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,则下次执行时间为当前时间,即立即执行一次,这样QuartzSchedulerThread线程就能马上获取到这个job触发调度;
  • 释放分布式锁(LOCK_TRIGGER_ACCESS);

影响性能的原因

解答问题:基于mysql实现分布式锁实现的分布式一致性调度为什么性能差?

原因分析:单个进程只有一个QuartzSchedulerThread线程,但因为maxBatchSize默认为1,因此会导致同一时刻会有多个节点竞争分布式锁。执行一次job,锁竞争次数至少为3次,当节点数增多时,锁竞争将会变得激烈,并且未获取到锁时会导致阻塞等待锁的释放,除非数据库设置等待超时时间很短。

是否可以通过调参优化?

方案一:将maxBatchSize调大,如果总job数为300,部署3个节点,那么maxBatchSize可配置为100。

这样依然存在锁竞争激烈,即任务执行前的状态修改依然需要加锁,并非是每个Job使用独立的分布式锁。

那将maxBatchSize调大为300呢?

这样就只会有一个节点处理job了,增加再多节点也无法提升性能,且由于调度是单线程的,批量job过多,锁是消除了,但数据库的操作次数依然没有减少,耗时就会增加。job多了,单节点调度导致延迟增加。

方案二:基于redis实现分布式锁。

问题的根本依然没有解决,主要原因还是锁的粒度问题。

Quartz除了是依赖数据库外,如果每个应用都独立使用Quartz,而不是xxl-job,那么就有点类似ElasticJob框架,都是去中心化的,但性能比不上ElasticJob,原因是ElasticJob基于Quartz将锁的粒度改造为Job级别了,而不是应用级的,且负责调度的线程也不是只有一个,而是每个Job一个。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java艺术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 核心调度流程分析
  • 故障恢复流程分析
  • 错过恢复流程分析
  • 影响性能的原因
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档