前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elastic-Job系列一之执行器注册启动

Elastic-Job系列一之执行器注册启动

原创
作者头像
用户9511949
修改2024-07-07 11:32:57
1170
修改2024-07-07 11:32:57

1 ElasticJobLiteAutoConfiguration

以springboot为例看下elastic-job的执行器启动流程,启动配置类为elasticjob-lite-spring-boot-starter中的ElasticJobLiteAutoConfiguration,如下

代码语言:javascript
复制
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)
@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})
@EnableConfigurationProperties(ElasticJobProperties.class)
public class ElasticJobLiteAutoConfiguration {
    
    @Configuration(proxyBeanMethods = false)
    @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
    protected static class ElasticJobConfiguration {
    }
}

该启动类需要在DataSourceAutoConfiguration配置完成之后再操作,主要初始化了五个比较重要的启动配置类

ElasticJobRegistryCenterConfiguration:初始化zookeeper客户端

ElasticJobTracingConfiguration:如果配置了trace的数据库,则初始化一个HikariDataSource数据库连接池用于Tracing,如果没有配置数据库就使用应用的数据库

ElasticJobSnapshotServiceConfiguration:初始化SnapshotService

ElasticJobBootstrapConfiguration:Job注册

ScheduleJobBootstrapStartupRunner:Job启动

主要看下Job的注册和启动

注册由ElasticJobBootstrapConfiguration类完成,该配置类实现了SmartInitializingSingleton接口,这个接口的作用是在Spring容器管理的所有单例对象(非懒加载)完成实例化之后执行其回调方法afterSingletonsInstantiated(在xxl-job中也有这种机制的使用)

代码语言:javascript
复制
public class ElasticJobBootstrapConfiguration implements SmartInitializingSingleton, ApplicationContextAware {
    
    @Setter
    private ApplicationContext applicationContext;
    
    @Override
    public void afterSingletonsInstantiated() {
        log.info("creating Job Bootstrap Beans");
        createJobBootstrapBeans();
        log.info("Job Bootstrap Beans created.");
    } 
    /**
     * Create job bootstrap instances and register them into container.
     */
    public void createJobBootstrapBeans() {
        ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
        SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
        CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
        TracingConfiguration<?> tracingConfig = getTracingConfiguration();
        constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
    }
}

在afterSingletonsInstantiated方法中调用了createJobBootstrapBeans方法,最终调用constructJobBootstraps方法

代码语言:javascript
复制
private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
                                    final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
    for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
        ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
        Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
                || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
        Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
                || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "[elasticJobClass] and [elasticJobType] are mutually exclusive.");
        if (null != jobConfigurationProperties.getElasticJobClass()) {
            registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
            registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        }
    }
}

在constructJobBootstraps方法中获取了应用配置的所有Job,依次进行处理,elastic的Job分为两种,一种是ClassJob,另一种是TypeJob,配置的示例如下

代码语言:javascript
复制
elasticjob:
  jobs:
    simpleJob:
      elasticJobClass: org.apache.shardingsphere.elasticjob.lite.example.job.SpringBootSimpleJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
    scriptJob:
      elasticJobType: SCRIPT
      cron: 0/10 * * * * ?
      shardingTotalCount: 3
      props:
        script.command.line: "echo SCRIPT Job: "

这里主要看看ClassJob,TypeJob类似,看下registerClassedJob方法

代码语言:javascript
复制
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
                                final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
    JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
    jobExtraConfigurations(jobConfig, tracingConfig);
    ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
    if (Strings.isNullOrEmpty(jobConfig.getCron())) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
        singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
    } else {
        String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
        singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
    }
}

可以看出,如果是定时任务则向spring的容器中注册一个ScheduleJobBootstrap的实例,如果是一次性Job(即没有配置cron字段)则向spring容器中注册一个OneOffJobBootstrap的实例。

2 OneOffJobBootstrap

一次性Job不会在启动时触发,需要用户在触发点注入OneOffJobBootstrap的实例并且手动调用execute方法执行,并且需要在配置文件中配置jobBootstrapBeanName,配置示例如下所示

代码语言:javascript
复制
elasticjob:
  jobs:
    manualScriptJob:
      elasticJobType: SCRIPT
      jobBootstrapBeanName: manualScriptJobBean
      shardingTotalCount: 9
      props:
        script.command.line: "echo Manual SCRIPT Job: "

看看new一个OneOffJobBootstrap的实例时做了什么

代码语言:javascript
复制
public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
    Preconditions.checkArgument(Strings.isNullOrEmpty(jobConfig.getCron()), "Cron should be empty.");
    jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
    instanceService = new InstanceService(regCenter, jobConfig.getJobName());
}

分别new了一个JobScheduler和InstanceService实例,看下JobScheduler

代码语言:javascript
复制
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
    Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
    this.regCenter = regCenter;
    // 获取jobClassName
    String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
    // 将JobConfig和jobClassName保存到zookeeper中
    // config的配置路径为/namespace/jobName/config
    // jobClassName的配置路径为/namespace/jobName
    // 如果节点已经存在,则需要根据是否覆盖原有配置来决定是否写入
    this.jobConfig = setUpJobConfiguration(regCenter, jobClassName, jobConfig);
    // 获取自定义的jobListeners,elastic-job允许自定义Listener在Job执行之前和之后进行自定义的业务逻辑
    // 处理,可以在任务的配置中应用自定义的Listeners
    Collection<ElasticJobListener> jobListeners = getElasticJobListeners(this.jobConfig);
    // 初始化elastic-job的核心处理接口,后续的文章分析中会一一提到
    setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners);
    schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName());
    jobFacade = new LiteJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
    validateJobProperties();
    // 新建jobExecutor,在新建quartz的Job时需要使用
    jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
    setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
    // 创建Job调度控制器
    jobScheduleController = createJobScheduleController();
}

可以看出前面的步骤都是新建一些配置和处理类,最后创建了一个Job的调度控制器,看看调度控制器做了什么事情

代码语言:javascript
复制
private JobScheduleController createJobScheduleController() {
    JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
    JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
    registerStartUpInfo();
    return result;
}

在新建JobScheduleController时,通过createScheduler和createJobDetail分别创建了quartz的调度器和Job(elastic-job基于quartz实现),然后将JobScheduleController的实例注册到了JobRegistry中(就是一个Map结构),看看createScheduler

代码语言:javascript
复制
private Scheduler createScheduler() {
    Scheduler result;
    try {
        StdSchedulerFactory factory = new StdSchedulerFactory();
        factory.initialize(getQuartzProps());
        result = factory.getScheduler();
        result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
    } catch (final SchedulerException ex) {
        throw new JobSystemException(ex);
    }
    return result;
}

可以看到就是通过quartz的api新建了一个Scheduler,并且添加一个TriggerListener,代码如下,重写了triggerMisfired方法,在错过触发时执行自定义的逻辑

代码语言:javascript
复制
public final class JobTriggerListener extends TriggerListenerSupport {
    
    private final ExecutionService executionService;
    private final ShardingService shardingService;
    @Override
    public String getName() {
        return "JobTriggerListener";
    }
    @Override
    public void triggerMisfired(final Trigger trigger) {
        if (null != trigger.getPreviousFireTime()) {
            executionService.setMisfire(shardingService.getLocalShardingItems());
        }
    }
}

在创建Scheduler时,使用了如下的配置,可以看到时每个Scheduler使用一个单独的线程池来处理,并且线程数量只有一个

代码语言:javascript
复制
private Properties getQuartzProps() {
    Properties result = new Properties();
    result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
    result.put("org.quartz.threadPool.threadCount", "1");
    result.put("org.quartz.scheduler.instanceName", getJobConfig().getJobName());
    result.put("org.quartz.jobStore.misfireThreshold", "1");
    result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
    result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
    return result;
}

至此Scheduler新建就完成了,再来看看createJobDetail方法

代码语言:javascript
复制
private JobDetail createJobDetail() {
    JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(getJobConfig().getJobName()).build();
    result.getJobDataMap().put(JOB_EXECUTOR_DATA_MAP_KEY, jobExecutor);
    return result;
}

createJobDetail方法比较简单,就是使用LiteJob和jobName新建了一个JobDetail,并且设置它的执行器为jobExecutor,LiteJob实现了quartz的Job接口,elastic-job最终把所有的任务都封装成了一个LiteJob,只是为其设置了不同的执行器,在执行时就会调用jobExecutor的方法执行具体的逻辑

代码语言:javascript
复制
public final class LiteJob implements Job {
    
    private ElasticJobExecutor jobExecutor;
    
    @Override
    public void execute(final JobExecutionContext context) {
        jobExecutor.execute();
    }
}

再看看创建JobScheduleController的最后一步registerStartUpInfo方法

代码语言:javascript
复制
private void registerStartUpInfo() {
    // 将Job及其子树的节点信息缓存起来提高查询性能,使用CuratorCache来实现
    // 最后以<cachePath, CuratorCache>的形式保存在map中
    JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
    // 创建一个JobInstance,并且以JobName为Key保存在Map中
    JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
    // 设置ShardingTotalCount,同样以JobName为Key保存在Map中
    JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
    // 开启所有的Listener,并向zookeeper写入各种节点信息
    setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
}

来看下最后一步setUpFacade.registerStartUpInfo方法具体的逻辑

代码语言:javascript
复制
public void registerStartUpInfo(final boolean enabled) {
    // 开启所有的Listener,后面一一分析
    listenerManager.startAllListeners();
    // 进行leader选举(在同一个Job的所有执行器中选择一个Leader)
    leaderService.electLeader();
    // 将该节点保存到zookeeper的servers列表中,路径为/namespace/jobName/servers/{ip}
    // value为enable/disable
    serverService.persistOnline(enabled);
    // 将该节点保存到zookeeper的instances列表中,路径为/namespace/jobName/instances/{ip}-随机信息,
    // 该路径节点为临时节点,服务器挂了,路径自动删除
    // value为JobInstance实例的yaml字符串
    instanceService.persistOnline();
    if (!reconcileService.isRunning()) {
        // 周期性检查是不是需要reshard
        reconcileService.startAsync();
    }
    // 删除实现的server,将instances列表和servers列表对比,并删除无效的sever
    serverService.removeOfflineServers();
}

先来看看listenerManager.startAllListeners方法,先看下启动了哪些Listener

代码语言:javascript
复制
public void startAllListeners() {
    electionListenerManager.start();
    shardingListenerManager.start();
    failoverListenerManager.start();
    monitorExecutionListenerManager.start();
    shutdownListenerManager.start();
    triggerListenerManager.start();
    rescheduleListenerManager.start();
    guaranteeListenerManager.start();
    jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}

再依次看看每个Listener的具体逻辑

electionListenerManager,添加了两个Listener,分别是LeaderElectionJobListener和LeaderAbdicationJobListener

代码语言:javascript
复制
public void start() {
    addDataListener(new LeaderElectionJobListener());
    addDataListener(new LeaderAbdicationJobListener());
}
// LeaderElectionJobListener在两种情况下触发重新选举
// 1.ActiveElection:如果本机节点的Ip变成了Enable状态,并且此时没有Leader信息,那么就参与选举
// 2.PassiveElection:如果LeaderCrashed了(故障或者人为),并且本机IP是Enable状态,那么就参与选举
class LeaderElectionJobListener implements DataChangedEventListener {    
        @Override
        public void onChange(final DataChangedEvent event) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(event.getKey(), event.getValue()) || isPassiveElection(event.getKey(), event.getType()))) {
                leaderService.electLeader();
            }
        }        
        private boolean isActiveElection(final String path, final String data) {
            return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
        }        
        private boolean isPassiveElection(final String path, final Type eventType) {
            JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName);
            return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getServerIp());
        }
}
// LeaderAbdicationJobListener的作用是如果本机是Leader,但是变成了DISABLE状态,则删除Leader信息,
// 后续LeaderElectionJobListener监听到Leader节点删除,则触发选举流程
class LeaderAbdicationJobListener implements DataChangedEventListener {        
        @Override
        public void onChange(final DataChangedEvent event) {
            if (leaderService.isLeader() && isLocalServerDisabled(event.getKey(), event.getValue())) {
                leaderService.removeLeader();
            }
        }        
        private boolean isLocalServerDisabled(final String path, final String data) {
            return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
        }
},

shardingListenerManager,添加了两个Listener,ShardingTotalCountChangedJobListener和ListenServersChangedJobListener

代码语言:javascript
复制
public void start() {
    addDataListener(new ShardingTotalCountChangedJobListener());
    addDataListener(new ListenServersChangedJobListener());
}
// 如果/namespace/jobName/config的value发生变化,判断最新的ShardingTotalCount和本地的不一致
// 则创建/namespace/jobName/leader/sharding/necessary节点
class ShardingTotalCountChangedJobListener implements DataChangedEventListener {
    @Override
    public void onChange(final DataChangedEvent event) {
        if (configNode.isConfigPath(event.getKey()) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
            int newShardingTotalCount = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
            if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                shardingService.setReshardingFlag();
                JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
            }
        }
    }
}
// 如果实例信息变化或者server信息变化,且不是静态sharding并且sharding不为空
// 则创建/namespace/jobName/leader/sharding/necessary节点
class ListenServersChangedJobListener implements DataChangedEventListener {
    @Override
    public void onChange(final DataChangedEvent event) {
        if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(event.getType(), event.getKey()) || isServerChange(event.getKey())) && !(isStaticSharding() && hasShardingInfo())) {
            shardingService.setReshardingFlag();
        }
    }
    private boolean isStaticSharding() {
        return configService.load(true).isStaticSharding();
    } 
    private boolean hasShardingInfo() {
        return !JobRegistry.getInstance().getRegCenter(jobName).getChildrenKeys(jobNodePath.getShardingNodePath()).isEmpty();
    }
    private boolean isInstanceChange(final Type eventType, final String path) {
        return instanceNode.isInstancePath(path) && Type.UPDATED != eventType;
    } 
    private boolean isServerChange(final String path) {
        return serverNode.isServerPath(path);
    }
}

failoverListenerManager,添加了三个Listener,JobCrashedJobListener,FailoverSettingsChangedJobListener,LegacyCrashedRunningItemListener,主要是用来处理失效转移,先简单介绍下这个功能

elastic-job在本次任务执行过程中不会重新分片,而是在下次调度之前才开启重新分片流程,那么假如其中一个分片在执行的过程中失败了怎么办?只有等到下次调度执行?答案是否定的,elastic-job允许将本次失败的任务转移到其他分片补偿执行,前提是其他分片未处于running状态(需要配置任务的failover=true,默认不开启)。下面看看是如何实现的

代码语言:javascript
复制
public void start() {
    addDataListener(new JobCrashedJobListener());
    addDataListener(new FailoverSettingsChangedJobListener());
    addDataListener(new LegacyCrashedRunningItemListener());
}
// 当Job执行过程中宕机,用来处理失效转移的Listener
class JobCrashedJobListener implements DataChangedEventListener {
        
        @Override
        public void onChange(final DataChangedEvent event) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.DELETED == event.getType() && instanceNode.isInstancePath(event.getKey())) {
                String jobInstanceId = event.getKey().substring(instanceNode.getInstanceFullPath().length() + 1);
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                // 处理failoverItems这块儿代码稍微有点儿绕
                // jobInstanceId是已经挂掉的服务器实例,先查询有没有已经转移到jobInstanceId这个实例的分片
                // 如果有,那么需要把已经转移到该分片的任务转移到其他分片
                // 比如三个分片0,1,2,0执行过程中失败了,然后转移到了1,但是1在执行0的任务时又失败了,所以
                // 就需要将0再次转移到其他分片
                List<Integer> failoverItems = failoverService.getFailoveringItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlagDirectly(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    // 这一步的逻辑就比较简单了,就是获取已经失效的分片,将其失败的任务转移到其他分片
                    for (int each : shardingService.getCrashedShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }
// 这个Listener就比较简单了,判断failover配置是关闭,关闭failover需要删除相关的节点信息    
class FailoverSettingsChangedJobListener implements DataChangedEventListener {
        @Override
        public void onChange(final DataChangedEvent event) {
            if (configNode.isConfigPath(event.getKey()) && Type.UPDATED == event.getType() && !YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration().isFailover()) {
                failoverService.removeFailoverInfo();
            }
        }
    }
// 这个Listener主要用来处理历史失败的任务分片,当所有实例都失效的时候,如果监听到有实例上线
// 就把历史所有失败的任务(正常失败或者failover失败)转移到刚上线的节点执行   
class LegacyCrashedRunningItemListener implements DataChangedEventListener {
        
        @Override
        public void onChange(final DataChangedEvent event) {
            if (!isCurrentInstanceOnline(event) || !isFailoverEnabled()) {
                return;
            }
            Set<JobInstance> availableJobInstances = new HashSet<>(instanceService.getAvailableJobInstances());
            if (!isTheOnlyInstance(availableJobInstances)) {
                return;
            }
            Map<Integer, JobInstance> allRunningItems = executionService.getAllRunningItems();
            Map<Integer, JobInstance> allFailoveringItems = failoverService.getAllFailoveringItems();
            if (allRunningItems.isEmpty() && allFailoveringItems.isEmpty()) {
                return;
            }
            for (Entry<Integer, JobInstance> entry : allFailoveringItems.entrySet()) {
                if (!availableJobInstances.contains(entry.getValue())) {
                    int item = entry.getKey();
                    failoverService.setCrashedFailoverFlagDirectly(item);
                    failoverService.clearFailoveringItem(item);
                    executionService.clearRunningInfo(Collections.singletonList(item));
                    allRunningItems.remove(item);
                }
            }
            for (Entry<Integer, JobInstance> entry : allRunningItems.entrySet()) {
                if (!availableJobInstances.contains(entry.getValue())) {
                    failoverService.setCrashedFailoverFlag(entry.getKey());
                    executionService.clearRunningInfo(Collections.singletonList(entry.getKey()));
                }
            }
            failoverService.failoverIfNecessary();
        }
    

monitorExecutionListenerManager,添加了一个MonitorExecutionSettingsChangedJobListener,这个逻辑比较简单

如果config的monitorExecution属性变成false,则清空所有分片Item的running信息

对应的节点路径为/namespace/jobName/sharding/item/running

代码语言:javascript
复制
public void start() {
    addDataListener(new MonitorExecutionSettingsChangedJobListener());
}

class MonitorExecutionSettingsChangedJobListener implements DataChangedEventListener {
    @Override
    public void onChange(final DataChangedEvent event) {
        if (configNode.isConfigPath(event.getKey()) && Type.UPDATED == event.getType()
                && !YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration().isMonitorExecution()) {
            executionService.clearAllRunningInfo();
        }
    }
}

shutdownListenerManager,删除instance节点时,需要把本地的实例关掉,后续的任务操作,该节点都不参与

代码语言:javascript
复制
public void start() {
    addDataListener(new InstanceShutdownStatusJobListener());
}

class InstanceShutdownStatusJobListener implements DataChangedEventListener {
    @Override
    public void onChange(final DataChangedEvent event) {
        if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused()
                && isRemoveInstance(event.getKey(), event.getType()) && !isReconnectedRegistryCenter()) {
            schedulerFacade.shutdownInstance();
        }
    }
    
    private boolean isRemoveInstance(final String path, final Type eventType) {
        return instanceNode.isLocalInstancePath(path) && Type.DELETED == eventType;
    }
    // 这里需要判断一下实例节点是不是有存在,如果网络不稳定,而instance节点又是临时节点,所以当监听到删除
    // 事件处理时,节点可能又重新连上了
    private boolean isReconnectedRegistryCenter() {
        return instanceService.isLocalJobInstanceExisted();
    }
}

triggerListenerManager,任务触发监听器

代码语言:javascript
复制
public void start() {
    addDataListener(new JobTriggerStatusJobListener());
}

class JobTriggerStatusJobListener implements DataChangedEventListener {
    
    // 如果添加了/namespace/jobName/trigger/{instanceId}节点,那么就执行触发任务的逻辑
    @Override
    public void onChange(final DataChangedEvent event) {
        if (!triggerNode.isLocalTriggerPath(event.getKey()) || Type.ADDED != event.getType()) {
            return;
        }
        triggerService.removeTriggerFlag();
        if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().isJobRunning(jobName)) {
            // 如果当前实例处于running任务, 则不触发任务,作者也说了,后续会将触发的任务放入到队列中
            // 这点xxl-job已经实现了,可以在管控台动态的配置
            // TODO At present, it cannot be triggered when the job is running, and it will be changed to a stacked trigger in the future.
            JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob();
        }
    }
}

rescheduleListenerManager,当cron配置变更时,需要重新设置job的调度频率

代码语言:javascript
复制
public void start() {
    addDataListener(new CronSettingAndJobEventChangedJobListener());
}

class CronSettingAndJobEventChangedJobListener implements DataChangedEventListener {
    
    @Override
    public void onChange(final DataChangedEvent event) {
        if (configNode.isConfigPath(event.getKey()) && Type.UPDATED == event.getType() && !JobRegistry.getInstance().isShutdown(jobName)) {
            JobConfiguration jobConfiguration = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration();
            if (StringUtils.isEmpty(jobConfiguration.getCron())) {
                // 如果cron为空,则在rescheduleJob时将任务设置成OneOffJob
                JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob();
            } else {
                // 如果cronb不为空,则在rescheduleJob时调用quartz的scheduler.rescheduleJob方法重新
                // 设置调度频率
                JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(jobConfiguration.getCron(), jobConfiguration.getTimeZone());
            }
        }
    }
}

guaranteeListenerManager,添加了StartedNodeRemovedJobListener和CompletedNodeRemovedJobListener,

下面看看StartedNodeRemovedJobListener的作用

elatic-job提供了自定义的ElasticJobListener接口, 允许在单个实例任务执行之前和执行之后实现自定义的逻辑

那么如果需要在所有任务的分片都开始后执行某个逻辑,比如需要看看所有分片启动消耗的时间是否超时,如何做呢?

elastic-job提供了AbstractDistributeOnceElasticJobListener类来实现,看看其beforeJobExecuted方法,代码如下

代码语言:javascript
复制
public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
    Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
    if (shardingItems.isEmpty()) {
        return;
    }
    // 当前任务开始后,将开始的item信息写入/namespace/JobName/guarantee/started/{item}节点
    guaranteeService.registerStart(shardingItems);
    // 等待直到当前任务的item信息写入/namespace/JobName/guarantee/started/{item}节点
    while (!guaranteeService.isRegisterStartSuccess(shardingItems)) {
        BlockUtils.waitingShortTime();
    }
    // 判断所有item(包括其他实例)是否写入/namespace/JobName/guarantee/started/{item}节点
    if (guaranteeService.isAllStarted()) {
        // 这里肯定至少有一个实例能执行到这一步,因为只有当当前实例开始成功后(即将item信息写入
        // /namespace/JobName/guarantee/started/{item}节点),才会进行上面的
        // isAllStarted判断
        guaranteeService.executeInLeaderForLastStarted(this, shardingContexts);
        return;
    }
    // 如果isAllStarted=false,则当前实例进入等待,直到/namespace/JobName/guarantee/started节点删除(由
    // guaranteeService.executeInLeaderForLastStarted选出的Leader删除),
    // 通过StartedNodeRemovedJobListener监听/namespace/JobName/guarantee/started节点的删除事件notify
    // 当前实例
    long before = timeService.getCurrentMillis();
    try {
        synchronized (startedWait) {
            startedWait.wait(startedTimeoutMilliseconds);
        }
    } catch (final InterruptedException ex) {
        Thread.interrupted();
    }
    // 如果AllStarted过程超时,则抛出异常,上层再对异常做具体的处理逻辑
    if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {
        guaranteeService.clearAllStartedInfo();
        handleTimeout(startedTimeoutMilliseconds);
    }
}

CompletedNodeRemovedJobListener的作用类似,就不重复阐述

代码语言:javascript
复制
public void start() {
    addDataListener(new StartedNodeRemovedJobListener());
    addDataListener(new CompletedNodeRemovedJobListener());
}
//
class StartedNodeRemovedJobListener implements DataChangedEventListener {
    // 
    @Override
    public void onChange(final DataChangedEvent event) {
        if (Type.DELETED == event.getType() && guaranteeNode.isStartedRootNode(event.getKey())) {
            for (ElasticJobListener each : elasticJobListeners) {
                if (each instanceof AbstractDistributeOnceElasticJobListener) {
                    ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
                }
            }
        }
    }
}

class CompletedNodeRemovedJobListener implements DataChangedEventListener {
    // 
    @Override
    public void onChange(final DataChangedEvent event) {
        if (Type.DELETED == event.getType() && guaranteeNode.isCompletedRootNode(event.getKey())) {
            for (ElasticJobListener each : elasticJobListeners) {
                if (each instanceof AbstractDistributeOnceElasticJobListener) {
                    ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete();
                }
            }
        }
    }
}

最后添加了一个regCenterConnectionStateListener来处理连接的状态变更事件,看下它的onStateChanged方法

代码语言:javascript
复制
public void onStateChanged(final CoordinatorRegistryCenter registryCenter, final State newState) {
        if (JobRegistry.getInstance().isShutdown(jobName)) {
            return;
        }
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (State.UNAVAILABLE == newState) {
            // 如果连接不可用,停止当前实例的Job
            jobScheduleController.pauseJob();
        } else if (State.RECONNECTED == newState) {
            // 如果连接可用,上线实例并且恢复当前实例的Job
            serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()));
            instanceService.persistOnline();
            executionService.clearRunningInfo(shardingService.getLocalShardingItems());
            jobScheduleController.resumeJob();
        }
    }

终于把所有Listener的逻辑看完了,其实只要把Listener的逻辑全部搞明白,elastic-job的主要流程也就明白了。下面继续看看其他的逻辑。

着重说下ReconcileService,这个类继承自AbstractScheduledService,这个是guava的一个执行周期任务的工具类,要使用的化只需要实现其runOneIteration方法,该方法就是具体执行业务逻辑的方法,如下

代码语言:javascript
复制
protected void runOneIteration() {
    // reconcileIntervalMinutes默认是0,所以默认不会周期性检查
    int reconcileIntervalMinutes = configService.load(true).getReconcileIntervalMinutes();
    if (reconcileIntervalMinutes > 0 && System.currentTimeMillis() - lastReconcileTime >= (long) reconcileIntervalMinutes * 60 * 1000) {
        lastReconcileTime = System.currentTimeMillis();
        // 周期性的检查是否需要reshard,判断条件是
        // 1. /namespace/jobName/leader/sharding/necessary节点不存在
        // 2. 当前shardingInfo中包含已经下线的机器
        // 3. 在静态sharding且sharding信息已经存在时,不reshard
        if (!shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers() && !(isStaticSharding() && hasShardingInfo())) {
            log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
            // 设置reshard标记,即创建/namespace/jobName/leader/sharding/necessary节点
            shardingService.setReshardingFlag();
        }
    }
}
// 每隔一分钟执行一次runOneIteration方法
protected Scheduler scheduler() {
        return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}

看下选举的逻辑,调用leaderService.electLeader方法进行leader选举,最终调用ZookeeperRegistryCenter.executeInLeader方法

代码语言:javascript
复制
public void executeInLeader(final String key, final LeaderExecutionCallback callback) {
    try (LeaderLatch latch = new LeaderLatch(client, key)) {
        latch.start();
        latch.await();
        callback.execute();
        // CHECKSTYLE:OFF
    } catch (final Exception ex) {
        // CHECKSTYLE:ON
        handleException(ex);
    }
}

通过设置curator框架的LeaderLatch进行选举,传入的一个回调函数,LeaderLatch其实是一种分布式锁机制,选举的逻辑简单来说就是在/namespace/leader/election/latch路径创建临时有序的节点,第一个创建成功的节点拿到分布式锁成为Leader,然后执行回调函数创建临时节点/namespace/leader/election/instance,value为JobInstanceId(IP+随机信息),设置完Leader信息之后,释放锁,然后其他的节点再依次获取到锁,执行回调函数,但是由于已经存在Leader信息,所以其他节点不会设置Leader信息,自然就成为了非Leader节点

看下回调函数的逻辑

代码语言:javascript
复制
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
    @Override
    public void execute() {
        if (!hasLeader()) {
            jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        }
    }
}

至此JobScheduler实例的构建就算完成了,能坚持阅读到这一步的同学不容易啊。

构建完成之后,还需要构建一个InstanceService实例,OneOffJobBootstrap需要用户手动调用execute方法执行

代码语言:javascript
复制
public final class OneOffJobBootstrap implements JobBootstrap {
    ......
    private final InstanceService instanceService;
    public void execute() {
        // 触发时就是将所有实例写入/namespace/jobName/trigger/{instanceId}节点
        // triggerListenerManager中的JobTriggerStatusJobListener监听到触发节点的创建,触发任务
        instanceService.triggerAllInstances();
    }
    
    ......
}

至此OneOffJobBootstrap的构建就分析完了。

3 ScheduleJobBootstrap

ScheduleJobBootstrap和OneOffJobBootstrap类似,但是在新建时不需要像OneOffJobBootstrap那样需要一个instanceService,而是增加了一个schedule方法,调用对应的quartz的Scheduler的start方法开始周期性任务

代码语言:javascript
复制
public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
    jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
}

public void schedule() {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron can not be empty.");
    jobScheduler.getJobScheduleController().scheduleJob(jobScheduler.getJobConfig().getCron(), jobScheduler.getJobConfig().getTimeZone());
}

那schedule方法又是在什么地方被调用的呢?

回看ElasticJobLiteAutoConfiguration类,自动装配时还增加了一个ScheduleJobBootstrapStartupRunner,这个类实现了

CommandLineRunner接口,这个接口的作用是在所有Bean都初始化完成后,Spring容器会触发CommandLineRunner的

run方法

代码语言:javascript
复制
public class ScheduleJobBootstrapStartupRunner implements CommandLineRunner, ApplicationContextAware {

    private ApplicationContext applicationContext;
    
    @Override
    public void run(final String... args) {
        log.info("Starting ElasticJob Bootstrap.");
        applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
        log.info("ElasticJob Bootstrap started.");
    }
}

run方法中获取了SpringContext中所有注册的ScheduleJobBootstrap实例,调用其schedule方法。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 ElasticJobLiteAutoConfiguration
  • 2 OneOffJobBootstrap
  • 3 ScheduleJobBootstrap
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档