前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊quartz的调度及性能

聊聊quartz的调度及性能

作者头像
code4it
发布2018-09-17 15:17:52
2.2K0
发布2018-09-17 15:17:52
举报
文章被收录于专栏:码匠的流水账

本文主要研究下quartz的QuartzSchedulerThread的调度以及quartz的性能问题。

SchedulerFactoryBean

spring-context-support-4.3.7.RELEASE-sources.jar!/org/springframework/scheduling/quartz/SchedulerFactoryBean.java

代码语言:javascript
复制
    @Override
    public void afterPropertiesSet() throws Exception {
        if (this.dataSource == null && this.nonTransactionalDataSource != null) {
            this.dataSource = this.nonTransactionalDataSource;
        }

        if (this.applicationContext != null && this.resourceLoader == null) {
            this.resourceLoader = this.applicationContext;
        }

        // Create SchedulerFactory instance...
        SchedulerFactory schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
        initSchedulerFactory(schedulerFactory);

        if (this.resourceLoader != null) {
            // Make given ResourceLoader available for SchedulerFactory configuration.
            configTimeResourceLoaderHolder.set(this.resourceLoader);
        }
        if (this.taskExecutor != null) {
            // Make given TaskExecutor available for SchedulerFactory configuration.
            configTimeTaskExecutorHolder.set(this.taskExecutor);
        }
        if (this.dataSource != null) {
            // Make given DataSource available for SchedulerFactory configuration.
            configTimeDataSourceHolder.set(this.dataSource);
        }
        if (this.nonTransactionalDataSource != null) {
            // Make given non-transactional DataSource available for SchedulerFactory configuration.
            configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);
        }

        // Get Scheduler instance from SchedulerFactory.
        try {
            this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
            populateSchedulerContext();

            if (!this.jobFactorySet && !(this.scheduler instanceof RemoteScheduler)) {
                // Use AdaptableJobFactory as default for a local Scheduler, unless when
                // explicitly given a null value through the "jobFactory" bean property.
                this.jobFactory = new AdaptableJobFactory();
            }
            if (this.jobFactory != null) {
                if (this.jobFactory instanceof SchedulerContextAware) {
                    ((SchedulerContextAware) this.jobFactory).setSchedulerContext(this.scheduler.getContext());
                }
                this.scheduler.setJobFactory(this.jobFactory);
            }
        }

        finally {
            if (this.resourceLoader != null) {
                configTimeResourceLoaderHolder.remove();
            }
            if (this.taskExecutor != null) {
                configTimeTaskExecutorHolder.remove();
            }
            if (this.dataSource != null) {
                configTimeDataSourceHolder.remove();
            }
            if (this.nonTransactionalDataSource != null) {
                configTimeNonTransactionalDataSourceHolder.remove();
            }
        }

        registerListeners();
        registerJobsAndTriggers();
    }

这里在afterPropertiesSet的时候,createScheduler(schedulerFactory, this.schedulerName);

SchedulerFactoryBean.createScheduler

代码语言:javascript
复制
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
            throws SchedulerException {

        // Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
        Thread currentThread = Thread.currentThread();
        ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
        boolean overrideClassLoader = (this.resourceLoader != null &&
                !this.resourceLoader.getClassLoader().equals(threadContextClassLoader));
        if (overrideClassLoader) {
            currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
        }
        try {
            SchedulerRepository repository = SchedulerRepository.getInstance();
            synchronized (repository) {
                Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
                Scheduler newScheduler = schedulerFactory.getScheduler();
                if (newScheduler == existingScheduler) {
                    throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
                            "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
                }
                if (!this.exposeSchedulerInRepository) {
                    // Need to remove it in this case, since Quartz shares the Scheduler instance by default!
                    SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
                }
                return newScheduler;
            }
        }
        finally {
            if (overrideClassLoader) {
                // Reset original thread context ClassLoader.
                currentThread.setContextClassLoader(threadContextClassLoader);
            }
        }
    }

这里调用schedulerFactory.getScheduler()来创建

quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java

代码语言:javascript
复制
public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }

这里调用instantiate来初始化。里头初始化了一个QuartzScheduler

代码语言:javascript
复制
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

QuartzScheduler

quartz-2.3.0-sources.jar!/org/quartz/core/QuartzScheduler.java

代码语言:javascript
复制
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);

        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

这个又初始化了QuartzSchedulerThread

QuartzSchedulerThread

这个是调度的核心类 quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerThread.java 这里有几个关键的信息

blockForAvailableThreads

就是qsRsrcs.getThreadPool().blockForAvailableThreads(),如果线程池满了的话,则会阻塞,因而会影响调度的准确性。

acquireNextTriggers

代码语言:javascript
复制
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())

timeUntilContinue

这里进行等待

代码语言:javascript
复制
                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }

而getRandomizedIdleWaitTime如下

代码语言:javascript
复制
    private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
    private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
    private long getRandomizedIdleWaitTime() {
        return idleWaitTime - random.nextInt(idleWaitVariablness);
    }

idleWaitTime默认是30秒,通过org.quartz.scheduler.idleWaitTime这个来配置

computeDelayForRepeatedErrors

代码语言:javascript
复制
                // wait a bit, if reading from job store is consistently
                // failing (e.g. DB is down or restarting)..
                if (acquiresFailed > 1) {
                    try {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    } catch (Exception ignore) {
                    }
                }

这里优化了对数据库挂的时候,避免频繁轮询的问题

代码语言:javascript
复制
    private static final long MIN_DELAY = 20;
    private static final long MAX_DELAY = 600000;

    private static long computeDelayForRepeatedErrors(JobStore jobStore, int acquiresFailed) {
        long delay;
        try {
            delay = jobStore.getAcquireRetryDelay(acquiresFailed);
        } catch (Exception ignored) {
            // we're trying to be useful in case of error states, not cause
            // additional errors..
            delay = 100;
        }

        // sanity check per getAcquireRetryDelay specification
        if (delay < MIN_DELAY)
            delay = MIN_DELAY;
        if (delay > MAX_DELAY)
            delay = MAX_DELAY;

        return delay;
    }

connectionProvider.class

比如

代码语言:javascript
复制
org.quartz.jobStore.dataSource=myDataSourceName
org.quartz.dataSource.myDataSourceName.connectionProvider.class=org.quartz.utils.HikariCpPoolingConnectionProvider
org.quartz.dataSource.myDataSourceName.driver=org.postgresql.Driver
org.quartz.dataSource.myDataSourceName.URL=jdbc:postgresql://192.168.99.100:5432/postgres
org.quartz.dataSource.myDataSourceName.user:postgres
org.quartz.dataSource.myDataSourceName.password:postgres
org.quartz.dataSource.myDataSourceName.maxConnection:30
org.quartz.dataSource.myDataSourceName.validationQuery: select 1

这样设置,貌似报错

代码语言:javascript
复制
Caused by: org.quartz.SchedulerException: ConnectionProvider class 'org.quartz.utils.HikariCpPoolingConnectionProvider' could not be instantiated.
    at org.quartz.impl.StdSchedulerFactory.instantiate(StdSchedulerFactory.java:936) ~[quartz-2.3.0.jar:na]
    at org.quartz.impl.StdSchedulerFactory.getScheduler(StdSchedulerFactory.java:1559) ~[quartz-2.3.0.jar:na]

quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java

代码语言:javascript
复制
                ConnectionProvider cp = null;
                try {
                    cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException("ConnectionProvider class '" + cpClass
                            + "' could not be instantiated.", e);
                    throw initException;
                }

不过我是在spring中直接配置连接池的,详见springboot集成quartz2.3.0,因而就不管了

性能问题

批量参数

采用jdbc store的话,定时轮询数据库,比较消耗时间,一种解决方案就是尽量批量查询,这里可以设置两个参数如下

代码语言:javascript
复制
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow=1 ## 时间窗口,默认0
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=5 ##这里默认是1,可以设置跟thread pool的core size相等

quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerResources.java

batchTriggerAcquisitionMaxCount对应的就是QuartzSchedulerResources中的maxBatchSize batchTriggerAcquisitionFireAheadTimeWindow对应的就是QuartzSchedulerResources中的batchTimeWindow

代码语言:javascript
复制
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

这里实际取availThreadCount与maxBatchSize的最小值作为maxCount 最后作用的是selectTriggerToAcquire quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

代码语言:javascript
复制
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
        throws JobPersistenceException {
        if (timeWindow < 0) {
          throw new IllegalArgumentException();
        }

        List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
        Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
        final int MAX_DO_LOOP_RETRY = 3;
        int currentLoopCount = 0;
        do {
            currentLoopCount ++;
            try {
                List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

                // No trigger is ready to fire yet.
                if (keys == null || keys.size() == 0)
                    return acquiredTriggers;

                long batchEnd = noLaterThan;

                // ...

                // if we didn't end up with any trigger to fire from that first
                // batch, try again for another batch. We allow with a max retry count.
                if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
                    continue;
                }

                // We are done with the while loop.
                break;
            } catch (Exception e) {
                throw new JobPersistenceException(
                          "Couldn't acquire next trigger: " + e.getMessage(), e);
            }
        } while (true);

        // Return the acquired trigger list
        return acquiredTriggers;
    }

sql实例

quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java

代码语言:javascript
复制
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
        throws SQLException {
        PreparedStatement ps = null;
        ResultSet rs = null;
        List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
        try {
            ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));

            // Set max rows to retrieve
            if (maxCount < 1)
                maxCount = 1; // we want at least one trigger back.
            ps.setMaxRows(maxCount);

            // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
            // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
            ps.setFetchSize(maxCount);

            ps.setString(1, STATE_WAITING);
            ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
            ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
            rs = ps.executeQuery();

            while (rs.next() && nextTriggers.size() <= maxCount) {
                nextTriggers.add(triggerKey(
                        rs.getString(COL_TRIGGER_NAME),
                        rs.getString(COL_TRIGGER_GROUP)));
            }

            return nextTriggers;
        } finally {
            closeResultSet(rs);
            closeStatement(ps);
        }      
    }

这里的maxCount对应ps.setMaxRows(maxCount) noLaterThan = noLaterThan + timeWindow ; noEarlierThan = getMisfireTime() 分别作用在NEXT_FIRE_TIME的范围上 NEXT_FIRE_TIME <= noLaterThan && NEXT_FIRE_TIME >= noEarlierThan 查询sql实例

代码语言:javascript
复制
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ_TRIGGERS WHERE SCHED_NAME = 'schedulerFactoryBean' AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

小结

由于是定时轮询trigger,那么在采用jdbc store的话,则这里的轮询可能是个不小的开销,对数据库有潜在的压力,另外在数据库挂掉的时候,可能也有问题,虽然做了延迟等待。另外尽量配置连接池,还有如果需要的话,调整batch参数。

doc

  • Performance Tuning on Quartz Scheduler
  • [译]如何优化Quartz调度器性能
  • Quartz应用与集群原理分析
  • Quartz源码分析(一)——— 以线程等待的方式实现按时间调度
  • Quartz causes 120 to 1500 connection attempts per second when DB is not available
  • Quartz causes 120 to 1500 connection attempts per second when DB is not available #71
  • Quartz学习笔记(五) quartz扩展druid连接池
  • quartz jdbc datasource connection pooling
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-11-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SchedulerFactoryBean
  • SchedulerFactoryBean.createScheduler
  • QuartzScheduler
  • QuartzSchedulerThread
    • blockForAvailableThreads
      • acquireNextTriggers
        • timeUntilContinue
          • computeDelayForRepeatedErrors
          • connectionProvider.class
          • 性能问题
            • 批量参数
              • sql实例
              • 小结
              • doc
              相关产品与服务
              云数据库 MySQL
              腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档