Quartz分布式调度的实现是去中心化的,需要依赖数据库在集群间同步调度状态,基于分布式锁实现一致性调度,而我们当前使用的xxl-job版本(1.9.x)的分布式调度又是基于Quartz实现,因此我们所了解到的xxl-job调度性能差,本质上就是Quartz的调度性能差。
在任务量只有少量,且没有秒级调度任务的情况下,我们是看不到quartz的性能缺陷的,在任务量明显增加情况下,我们就会发现,调度延迟会有明显增加,特别是秒级任务,尽管我们横向扩展节点,秒级任务的调度延迟也不会降低,且整体调度性能没有明显好转,反而更糟糕。
我们将带着这些疑惑,分析Quartz 的分布式调度原理,以及过时恢复、故障恢复的实现原理。
核心流程代码在org.quartz.core.QuartzSchedulerThread类的run方法,即首先是调度线程(只有一个线程)启动,QuartzSchedulerThread实例的run执行,源码如下。
class QuartzSchedulerThread{
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
// .....
(1)
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) {
// ....
(2)
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
} //.....
if (triggers != null && !triggers.isEmpty()) {
(3)
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
// wait......
}
// .....
if(goAhead) {
(4)
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} //....
}
(5)
for (int i = 0; i < bndles.size(); i++) {
// ..........
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
(6)
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
//....
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
}
流程分析:
在获取job时,会将状态由WAITING更新为ACQUIRED,更新成功才表示当时节点获取到了调度的权限;
假设只部署一个节点,且maxBatchSize最大为1,即不考虑分布式锁、不考虑批量的情况,那么quartz的调度流程可以简化为:
故障恢复与错过恢复的启动入口都在JobStoreSupport#schedulerStarted方法中,由QuartzScheduler#start方法调用,源码如下。
class JobStoreSupport{
public void schedulerStarted()throws SchedulerException{
(1)
if(isClustered()){
//...
clusterManagementThread.initialize();
}else{
(2)
try{
recoverJobs();
}//....
}
(3)
misfireHandler=new MisfireHandler();
// ...
misfireHandler.initialize();
schedulerRunning=true;
//...
}
}
分布式调度下的故障恢复流程:
错过的原因主要有以下几种情况:
错过恢复流程:
解答问题:基于mysql实现分布式锁实现的分布式一致性调度为什么性能差?
原因分析:单个进程只有一个QuartzSchedulerThread线程,但因为maxBatchSize默认为1,因此会导致同一时刻会有多个节点竞争分布式锁。执行一次job,锁竞争次数至少为3次,当节点数增多时,锁竞争将会变得激烈,并且未获取到锁时会导致阻塞等待锁的释放,除非数据库设置等待超时时间很短。
是否可以通过调参优化?
方案一:将maxBatchSize调大,如果总job数为300,部署3个节点,那么maxBatchSize可配置为100。
这样依然存在锁竞争激烈,即任务执行前的状态修改依然需要加锁,并非是每个Job使用独立的分布式锁。
那将maxBatchSize调大为300呢?
这样就只会有一个节点处理job了,增加再多节点也无法提升性能,且由于调度是单线程的,批量job过多,锁是消除了,但数据库的操作次数依然没有减少,耗时就会增加。job多了,单节点调度导致延迟增加。
方案二:基于redis实现分布式锁。
问题的根本依然没有解决,主要原因还是锁的粒度问题。
Quartz除了是依赖数据库外,如果每个应用都独立使用Quartz,而不是xxl-job,那么就有点类似ElasticJob框架,都是去中心化的,但性能比不上ElasticJob,原因是ElasticJob基于Quartz将锁的粒度改造为Job级别了,而不是应用级的,且负责调度的线程也不是只有一个,而是每个Job一个。