本文主要研究下quartz的QuartzSchedulerThread的调度以及quartz的性能问题。
spring-context-support-4.3.7.RELEASE-sources.jar!/org/springframework/scheduling/quartz/SchedulerFactoryBean.java
@Override
public void afterPropertiesSet() throws Exception {
if (this.dataSource == null && this.nonTransactionalDataSource != null) {
this.dataSource = this.nonTransactionalDataSource;
}
if (this.applicationContext != null && this.resourceLoader == null) {
this.resourceLoader = this.applicationContext;
}
// Create SchedulerFactory instance...
SchedulerFactory schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
initSchedulerFactory(schedulerFactory);
if (this.resourceLoader != null) {
// Make given ResourceLoader available for SchedulerFactory configuration.
configTimeResourceLoaderHolder.set(this.resourceLoader);
}
if (this.taskExecutor != null) {
// Make given TaskExecutor available for SchedulerFactory configuration.
configTimeTaskExecutorHolder.set(this.taskExecutor);
}
if (this.dataSource != null) {
// Make given DataSource available for SchedulerFactory configuration.
configTimeDataSourceHolder.set(this.dataSource);
}
if (this.nonTransactionalDataSource != null) {
// Make given non-transactional DataSource available for SchedulerFactory configuration.
configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);
}
// Get Scheduler instance from SchedulerFactory.
try {
this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
populateSchedulerContext();
if (!this.jobFactorySet && !(this.scheduler instanceof RemoteScheduler)) {
// Use AdaptableJobFactory as default for a local Scheduler, unless when
// explicitly given a null value through the "jobFactory" bean property.
this.jobFactory = new AdaptableJobFactory();
}
if (this.jobFactory != null) {
if (this.jobFactory instanceof SchedulerContextAware) {
((SchedulerContextAware) this.jobFactory).setSchedulerContext(this.scheduler.getContext());
}
this.scheduler.setJobFactory(this.jobFactory);
}
}
finally {
if (this.resourceLoader != null) {
configTimeResourceLoaderHolder.remove();
}
if (this.taskExecutor != null) {
configTimeTaskExecutorHolder.remove();
}
if (this.dataSource != null) {
configTimeDataSourceHolder.remove();
}
if (this.nonTransactionalDataSource != null) {
configTimeNonTransactionalDataSourceHolder.remove();
}
}
registerListeners();
registerJobsAndTriggers();
}
这里在afterPropertiesSet的时候,createScheduler(schedulerFactory, this.schedulerName);
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
throws SchedulerException {
// Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
Thread currentThread = Thread.currentThread();
ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
boolean overrideClassLoader = (this.resourceLoader != null &&
!this.resourceLoader.getClassLoader().equals(threadContextClassLoader));
if (overrideClassLoader) {
currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
}
try {
SchedulerRepository repository = SchedulerRepository.getInstance();
synchronized (repository) {
Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
Scheduler newScheduler = schedulerFactory.getScheduler();
if (newScheduler == existingScheduler) {
throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
"in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
}
if (!this.exposeSchedulerInRepository) {
// Need to remove it in this case, since Quartz shares the Scheduler instance by default!
SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
}
return newScheduler;
}
}
finally {
if (overrideClassLoader) {
// Reset original thread context ClassLoader.
currentThread.setContextClassLoader(threadContextClassLoader);
}
}
}
这里调用schedulerFactory.getScheduler()来创建
quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
sched = instantiate();
return sched;
}
这里调用instantiate来初始化。里头初始化了一个QuartzScheduler
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
quartz-2.3.0-sources.jar!/org/quartz/core/QuartzScheduler.java
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
这个又初始化了QuartzSchedulerThread
这个是调度的核心类 quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerThread.java 这里有几个关键的信息
就是qsRsrcs.getThreadPool().blockForAvailableThreads(),如果线程池满了的话,则会阻塞,因而会影响调度的准确性。
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())
这里进行等待
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
而getRandomizedIdleWaitTime如下
private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
private long getRandomizedIdleWaitTime() {
return idleWaitTime - random.nextInt(idleWaitVariablness);
}
idleWaitTime默认是30秒,通过org.quartz.scheduler.idleWaitTime这个来配置
// wait a bit, if reading from job store is consistently
// failing (e.g. DB is down or restarting)..
if (acquiresFailed > 1) {
try {
long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
Thread.sleep(delay);
} catch (Exception ignore) {
}
}
这里优化了对数据库挂的时候,避免频繁轮询的问题
private static final long MIN_DELAY = 20;
private static final long MAX_DELAY = 600000;
private static long computeDelayForRepeatedErrors(JobStore jobStore, int acquiresFailed) {
long delay;
try {
delay = jobStore.getAcquireRetryDelay(acquiresFailed);
} catch (Exception ignored) {
// we're trying to be useful in case of error states, not cause
// additional errors..
delay = 100;
}
// sanity check per getAcquireRetryDelay specification
if (delay < MIN_DELAY)
delay = MIN_DELAY;
if (delay > MAX_DELAY)
delay = MAX_DELAY;
return delay;
}
比如
org.quartz.jobStore.dataSource=myDataSourceName
org.quartz.dataSource.myDataSourceName.connectionProvider.class=org.quartz.utils.HikariCpPoolingConnectionProvider
org.quartz.dataSource.myDataSourceName.driver=org.postgresql.Driver
org.quartz.dataSource.myDataSourceName.URL=jdbc:postgresql://192.168.99.100:5432/postgres
org.quartz.dataSource.myDataSourceName.user:postgres
org.quartz.dataSource.myDataSourceName.password:postgres
org.quartz.dataSource.myDataSourceName.maxConnection:30
org.quartz.dataSource.myDataSourceName.validationQuery: select 1
这样设置,貌似报错
Caused by: org.quartz.SchedulerException: ConnectionProvider class 'org.quartz.utils.HikariCpPoolingConnectionProvider' could not be instantiated.
at org.quartz.impl.StdSchedulerFactory.instantiate(StdSchedulerFactory.java:936) ~[quartz-2.3.0.jar:na]
at org.quartz.impl.StdSchedulerFactory.getScheduler(StdSchedulerFactory.java:1559) ~[quartz-2.3.0.jar:na]
quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java
ConnectionProvider cp = null;
try {
cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("ConnectionProvider class '" + cpClass
+ "' could not be instantiated.", e);
throw initException;
}
不过我是在spring中直接配置连接池的,详见springboot集成quartz2.3.0,因而就不管了
采用jdbc store的话,定时轮询数据库,比较消耗时间,一种解决方案就是尽量批量查询,这里可以设置两个参数如下
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow=1 ## 时间窗口,默认0
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=5 ##这里默认是1,可以设置跟thread pool的core size相等
quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerResources.java
batchTriggerAcquisitionMaxCount对应的就是QuartzSchedulerResources中的maxBatchSize batchTriggerAcquisitionFireAheadTimeWindow对应的就是QuartzSchedulerResources中的batchTimeWindow
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
这里实际取availThreadCount与maxBatchSize的最小值作为maxCount 最后作用的是selectTriggerToAcquire quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/JobStoreSupport.java
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException {
if (timeWindow < 0) {
throw new IllegalArgumentException();
}
List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
do {
currentLoopCount ++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
// No trigger is ready to fire yet.
if (keys == null || keys.size() == 0)
return acquiredTriggers;
long batchEnd = noLaterThan;
// ...
// if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
continue;
}
// We are done with the while loop.
break;
} catch (Exception e) {
throw new JobPersistenceException(
"Couldn't acquire next trigger: " + e.getMessage(), e);
}
} while (true);
// Return the acquired trigger list
return acquiredTriggers;
}
quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
try {
ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));
// Set max rows to retrieve
if (maxCount < 1)
maxCount = 1; // we want at least one trigger back.
ps.setMaxRows(maxCount);
// Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
// Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
ps.setFetchSize(maxCount);
ps.setString(1, STATE_WAITING);
ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
rs = ps.executeQuery();
while (rs.next() && nextTriggers.size() <= maxCount) {
nextTriggers.add(triggerKey(
rs.getString(COL_TRIGGER_NAME),
rs.getString(COL_TRIGGER_GROUP)));
}
return nextTriggers;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}
这里的maxCount对应ps.setMaxRows(maxCount) noLaterThan = noLaterThan + timeWindow ; noEarlierThan = getMisfireTime() 分别作用在NEXT_FIRE_TIME的范围上 NEXT_FIRE_TIME <= noLaterThan && NEXT_FIRE_TIME >= noEarlierThan 查询sql实例
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ_TRIGGERS WHERE SCHED_NAME = 'schedulerFactoryBean' AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
由于是定时轮询trigger,那么在采用jdbc store的话,则这里的轮询可能是个不小的开销,对数据库有潜在的压力,另外在数据库挂掉的时候,可能也有问题,虽然做了延迟等待。另外尽量配置连接池,还有如果需要的话,调整batch参数。