前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elastic-Job2.1.5源码-图解错过作业重新触发执行功能

Elastic-Job2.1.5源码-图解错过作业重新触发执行功能

作者头像
宋小生
发布2022-12-14 18:41:14
4610
发布2022-12-14 18:41:14
举报
文章被收录于专栏:新技术新技术

大家好,本文给大家介绍一下Elastic-Job 中错过作业重新触发的概念,配置与原理

错过作业重新触发执行功能

文 | 宋小生

7.5 错过重触发功能

7.5.1 错过执行作业概念

错过作业重触发是什么意思呢:

要弄清楚作业的misfire,首先需要了解几个重要的概念:

触发器超时

举个例子说明这个概念。比如调度引擎中有5个工作线程,然后在某天的下午2点 有6个任务需要执行,那么由于调度引擎中只有5个工作线程,所以在2点的时候会有5个任务会按照之前设定的时间正常执行,有1个任务因为没有线程资源而被延迟执行,这个就叫触发器超时。下面这些情况会造成触发器超时:

  • 系统因为某些原因被重启。在系统关闭到重新启动之间的一段时间里,可能有些任务会被 misfire。
  • Trigger 被暂停(suspend)的一段时间里,有些任务可能会被 misfire。
  • 线程池中所有线程都被占用,导致任务无法被触发执行,造成 misfire。
  • 有状态任务在下次触发时间到达时,上次执行还没有结束。

接下来我们可以看下几种执行的场景:

正常执行

举例来说,现在有个作业每个小时执行一次,在12:00,13:00,14:00:

执行时长都在1个小时之内,则每个时间点都可以正常执行,示例图如下:

图 7.5.1 正常执行作业

错过执行作业

12:00的执行时长过长(可能是处理业务数据过大,也可能其他原因),执行了1个多小时,当未开启错过作业重新触发功能则在13:00的时候作业是无法执行将被错过,执行示例图如下:

图 7.5.2 错过执行作业

错过作业执行重新触发

12:00的执行时长过长(可能是处理业务数据过大,也可能其他原因),执行了1个多小时,当开启错过作业重新触发功能后在12:00执行之后为13:00错过的执行补偿一次执行,执行示例图如下:

图 7.5.3 错过执行重新触发作业

7.5.2 错过执行作业配置

在Quartz内部具有个属性为作业的misfire的阈值,单位是秒,

org.quartz.jobStore.misfireThreshold

回顾下我们初始化StdSchedulerFactory的时候将这个参数设置为了1S也就是说存在作业触发时间超过了这个时间则被认为是错过作业执行,有个参数org.quartz.threadPool.threadCount配置了工作线程数也就是同时并发执行作业数,如下代码:

代码语言:javascript
复制
private Properties getBaseQuartzProperties() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        //作业线程数设置为1无法并行执行作业,容易错过执行
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
        //错过作业重新触发时间配置
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
        return result;
    }

7.5.3 错过执行作业监听器写入

如果作业错过执行Quartz定时线程则会调用我们JobTriggerListener类中重写的triggerMisfired方法,回到triggerMisfired方法对于错过执行的作业我们会写入对应节点 sharding/当前实例对应分片项/misfire来标记哪个分片被错过执行。

作业错过执行监听器代码如下:

代码语言:javascript
复制
@RequiredArgsConstructor
public final class JobTriggerListener extends TriggerListenerSupport {
    
    private final ExecutionService executionService;
    
    private final ShardingService shardingService;
    
    @Override
    public String getName() {
        return "JobTriggerListener";
    }
    /**
    * 超过阈值,错过作业重新触发
    */
    @Override
    public void triggerMisfired(final Trigger trigger) {
        if (null != trigger.getPreviousFireTime()) {
            executionService.setMisfire(shardingService.getLocalShardingItems());
        }
    }
}

写入错过执行节点sharding/当前实例对应分片项/misfire,代码如下:

代码语言:javascript
复制
 public void setMisfire(final Collection<Integer> items) {
        for (int each : items) {
            jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
        }
    }

7.5.4 作业运行时幂等导致的错过执行

接下来看AbstractElasticJobExecutor中是如何对幂等作业进行错过补偿的,这里如果幂等配置开启则会进行幂等控制,幂等配置开启后作业执行的时候发现存在了running节点则说明上次对应作业分片仍旧未执行结束,幂等需要保证上次作业分片执行结束时候本次才开始执行,不能在同一个分片上并行执行,这里判断了上次作业分片下如果存在running节点也就是还有作业分片在执行,那本次执行所对应作业分片则无法执行,这种情况是Elastic-Job对作业分片的幂等的处理,如果出现这样本次无法执行的作业分片也是要为当前分片设置错过作业重触发标记,设置完毕之后则返回。

代码语言:javascript
复制
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
    if (shardingContexts.isAllowSendJobEvent()) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                shardingContexts.getShardingItemParameters().keySet()));
    }
    return;
}
代码语言:javascript
复制
ExecutionService类型中错过执行节点写入逻辑:
代码语言:javascript
复制
 public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
       //当前作业正在运行
        if (!hasRunningItems(items)) {
            return false;
        }
        //写入错过执行标记
        setMisfire(items);
        return true;
    }

7.5.5 错过作业重新补偿执行

在后面作业正常执行业务执行完毕之后,将会执行错过执行作业:

代码语言:javascript
复制
  //如果当前分片项中存在错过执行作业
  while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
          //删除错过执行节点
            jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
            //执行作业
            execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
  }

错过作业重触发,这个就是如果我们作业在执行的时候执行时间过长导致到了下次执行的时候,第一次作业执行还未结束,在这种单线程情况下,导致下次作业不能触发,容易错过执行,这里在作业第一次正常执行完毕之后查看是否有错过执行节点存在,如果存在错过执行节点则先清理错过执行节点然后立即补偿触发一次。

具体什么节点满足补偿执行呢:

  • 作业开启了错过作业重触发配置,
  • 当前分片项存在sharding/%s/misfire节点

- END -

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-11-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 中间件源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档