以springboot为例看下elastic-job的执行器启动流程,启动配置类为elasticjob-lite-spring-boot-starter中的ElasticJobLiteAutoConfiguration,如下
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)
@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})
@EnableConfigurationProperties(ElasticJobProperties.class)
public class ElasticJobLiteAutoConfiguration {
@Configuration(proxyBeanMethods = false)
@Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
protected static class ElasticJobConfiguration {
}
}
该启动类需要在DataSourceAutoConfiguration配置完成之后再操作,主要初始化了五个比较重要的启动配置类
ElasticJobRegistryCenterConfiguration:初始化zookeeper客户端
ElasticJobTracingConfiguration:如果配置了trace的数据库,则初始化一个HikariDataSource数据库连接池用于Tracing,如果没有配置数据库就使用应用的数据库
ElasticJobSnapshotServiceConfiguration:初始化SnapshotService
ElasticJobBootstrapConfiguration:Job注册
ScheduleJobBootstrapStartupRunner:Job启动
主要看下Job的注册和启动
注册由ElasticJobBootstrapConfiguration类完成,该配置类实现了SmartInitializingSingleton接口,这个接口的作用是在Spring容器管理的所有单例对象(非懒加载)完成实例化之后执行其回调方法afterSingletonsInstantiated(在xxl-job中也有这种机制的使用)
public class ElasticJobBootstrapConfiguration implements SmartInitializingSingleton, ApplicationContextAware {
@Setter
private ApplicationContext applicationContext;
@Override
public void afterSingletonsInstantiated() {
log.info("creating Job Bootstrap Beans");
createJobBootstrapBeans();
log.info("Job Bootstrap Beans created.");
}
/**
* Create job bootstrap instances and register them into container.
*/
public void createJobBootstrapBeans() {
ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
TracingConfiguration<?> tracingConfig = getTracingConfiguration();
constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
}
}
在afterSingletonsInstantiated方法中调用了createJobBootstrapBeans方法,最终调用constructJobBootstraps方法
private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
|| !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
"Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
|| Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
"[elasticJobClass] and [elasticJobType] are mutually exclusive.");
if (null != jobConfigurationProperties.getElasticJobClass()) {
registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
} else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
}
}
}
在constructJobBootstraps方法中获取了应用配置的所有Job,依次进行处理,elastic的Job分为两种,一种是ClassJob,另一种是TypeJob,配置的示例如下
elasticjob:
jobs:
simpleJob:
elasticJobClass: org.apache.shardingsphere.elasticjob.lite.example.job.SpringBootSimpleJob
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
scriptJob:
elasticJobType: SCRIPT
cron: 0/10 * * * * ?
shardingTotalCount: 3
props:
script.command.line: "echo SCRIPT Job: "
这里主要看看ClassJob,TypeJob类似,看下registerClassedJob方法
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
jobExtraConfigurations(jobConfig, tracingConfig);
ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
if (Strings.isNullOrEmpty(jobConfig.getCron())) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
} else {
String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
}
}
可以看出,如果是定时任务则向spring的容器中注册一个ScheduleJobBootstrap的实例,如果是一次性Job(即没有配置cron字段)则向spring容器中注册一个OneOffJobBootstrap的实例。
一次性Job不会在启动时触发,需要用户在触发点注入OneOffJobBootstrap的实例并且手动调用execute方法执行,并且需要在配置文件中配置jobBootstrapBeanName,配置示例如下所示
elasticjob:
jobs:
manualScriptJob:
elasticJobType: SCRIPT
jobBootstrapBeanName: manualScriptJobBean
shardingTotalCount: 9
props:
script.command.line: "echo Manual SCRIPT Job: "
看看new一个OneOffJobBootstrap的实例时做了什么
public OneOffJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
Preconditions.checkArgument(Strings.isNullOrEmpty(jobConfig.getCron()), "Cron should be empty.");
jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
instanceService = new InstanceService(regCenter, jobConfig.getJobName());
}
分别new了一个JobScheduler和InstanceService实例,看下JobScheduler
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
this.regCenter = regCenter;
// 获取jobClassName
String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
// 将JobConfig和jobClassName保存到zookeeper中
// config的配置路径为/namespace/jobName/config
// jobClassName的配置路径为/namespace/jobName
// 如果节点已经存在,则需要根据是否覆盖原有配置来决定是否写入
this.jobConfig = setUpJobConfiguration(regCenter, jobClassName, jobConfig);
// 获取自定义的jobListeners,elastic-job允许自定义Listener在Job执行之前和之后进行自定义的业务逻辑
// 处理,可以在任务的配置中应用自定义的Listeners
Collection<ElasticJobListener> jobListeners = getElasticJobListeners(this.jobConfig);
// 初始化elastic-job的核心处理接口,后续的文章分析中会一一提到
setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners);
schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName());
jobFacade = new LiteJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
validateJobProperties();
// 新建jobExecutor,在新建quartz的Job时需要使用
jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
// 创建Job调度控制器
jobScheduleController = createJobScheduleController();
}
可以看出前面的步骤都是新建一些配置和处理类,最后创建了一个Job的调度控制器,看看调度控制器做了什么事情
private JobScheduleController createJobScheduleController() {
JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
registerStartUpInfo();
return result;
}
在新建JobScheduleController时,通过createScheduler和createJobDetail分别创建了quartz的调度器和Job(elastic-job基于quartz实现),然后将JobScheduleController的实例注册到了JobRegistry中(就是一个Map结构),看看createScheduler
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getQuartzProps());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
可以看到就是通过quartz的api新建了一个Scheduler,并且添加一个TriggerListener,代码如下,重写了triggerMisfired方法,在错过触发时执行自定义的逻辑
public final class JobTriggerListener extends TriggerListenerSupport {
private final ExecutionService executionService;
private final ShardingService shardingService;
@Override
public String getName() {
return "JobTriggerListener";
}
@Override
public void triggerMisfired(final Trigger trigger) {
if (null != trigger.getPreviousFireTime()) {
executionService.setMisfire(shardingService.getLocalShardingItems());
}
}
}
在创建Scheduler时,使用了如下的配置,可以看到时每个Scheduler使用一个单独的线程池来处理,并且线程数量只有一个
private Properties getQuartzProps() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", getJobConfig().getJobName());
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
至此Scheduler新建就完成了,再来看看createJobDetail方法
private JobDetail createJobDetail() {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(getJobConfig().getJobName()).build();
result.getJobDataMap().put(JOB_EXECUTOR_DATA_MAP_KEY, jobExecutor);
return result;
}
createJobDetail方法比较简单,就是使用LiteJob和jobName新建了一个JobDetail,并且设置它的执行器为jobExecutor,LiteJob实现了quartz的Job接口,elastic-job最终把所有的任务都封装成了一个LiteJob,只是为其设置了不同的执行器,在执行时就会调用jobExecutor的方法执行具体的逻辑
public final class LiteJob implements Job {
private ElasticJobExecutor jobExecutor;
@Override
public void execute(final JobExecutionContext context) {
jobExecutor.execute();
}
}
再看看创建JobScheduleController的最后一步registerStartUpInfo方法
private void registerStartUpInfo() {
// 将Job及其子树的节点信息缓存起来提高查询性能,使用CuratorCache来实现
// 最后以<cachePath, CuratorCache>的形式保存在map中
JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
// 创建一个JobInstance,并且以JobName为Key保存在Map中
JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
// 设置ShardingTotalCount,同样以JobName为Key保存在Map中
JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
// 开启所有的Listener,并向zookeeper写入各种节点信息
setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
}
来看下最后一步setUpFacade.registerStartUpInfo方法具体的逻辑
public void registerStartUpInfo(final boolean enabled) {
// 开启所有的Listener,后面一一分析
listenerManager.startAllListeners();
// 进行leader选举(在同一个Job的所有执行器中选择一个Leader)
leaderService.electLeader();
// 将该节点保存到zookeeper的servers列表中,路径为/namespace/jobName/servers/{ip}
// value为enable/disable
serverService.persistOnline(enabled);
// 将该节点保存到zookeeper的instances列表中,路径为/namespace/jobName/instances/{ip}-随机信息,
// 该路径节点为临时节点,服务器挂了,路径自动删除
// value为JobInstance实例的yaml字符串
instanceService.persistOnline();
if (!reconcileService.isRunning()) {
// 周期性检查是不是需要reshard
reconcileService.startAsync();
}
// 删除实现的server,将instances列表和servers列表对比,并删除无效的sever
serverService.removeOfflineServers();
}
先来看看listenerManager.startAllListeners方法,先看下启动了哪些Listener
public void startAllListeners() {
electionListenerManager.start();
shardingListenerManager.start();
failoverListenerManager.start();
monitorExecutionListenerManager.start();
shutdownListenerManager.start();
triggerListenerManager.start();
rescheduleListenerManager.start();
guaranteeListenerManager.start();
jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
再依次看看每个Listener的具体逻辑
electionListenerManager,添加了两个Listener,分别是LeaderElectionJobListener和LeaderAbdicationJobListener
public void start() {
addDataListener(new LeaderElectionJobListener());
addDataListener(new LeaderAbdicationJobListener());
}
// LeaderElectionJobListener在两种情况下触发重新选举
// 1.ActiveElection:如果本机节点的Ip变成了Enable状态,并且此时没有Leader信息,那么就参与选举
// 2.PassiveElection:如果LeaderCrashed了(故障或者人为),并且本机IP是Enable状态,那么就参与选举
class LeaderElectionJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(event.getKey(), event.getValue()) || isPassiveElection(event.getKey(), event.getType()))) {
leaderService.electLeader();
}
}
private boolean isActiveElection(final String path, final String data) {
return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
}
private boolean isPassiveElection(final String path, final Type eventType) {
JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName);
return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getServerIp());
}
}
// LeaderAbdicationJobListener的作用是如果本机是Leader,但是变成了DISABLE状态,则删除Leader信息,
// 后续LeaderElectionJobListener监听到Leader节点删除,则触发选举流程
class LeaderAbdicationJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (leaderService.isLeader() && isLocalServerDisabled(event.getKey(), event.getValue())) {
leaderService.removeLeader();
}
}
private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
}
},
shardingListenerManager,添加了两个Listener,ShardingTotalCountChangedJobListener和ListenServersChangedJobListener
public void start() {
addDataListener(new ShardingTotalCountChangedJobListener());
addDataListener(new ListenServersChangedJobListener());
}
// 如果/namespace/jobName/config的value发生变化,判断最新的ShardingTotalCount和本地的不一致
// 则创建/namespace/jobName/leader/sharding/necessary节点
class ShardingTotalCountChangedJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (configNode.isConfigPath(event.getKey()) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}
// 如果实例信息变化或者server信息变化,且不是静态sharding并且sharding不为空
// 则创建/namespace/jobName/leader/sharding/necessary节点
class ListenServersChangedJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(event.getType(), event.getKey()) || isServerChange(event.getKey())) && !(isStaticSharding() && hasShardingInfo())) {
shardingService.setReshardingFlag();
}
}
private boolean isStaticSharding() {
return configService.load(true).isStaticSharding();
}
private boolean hasShardingInfo() {
return !JobRegistry.getInstance().getRegCenter(jobName).getChildrenKeys(jobNodePath.getShardingNodePath()).isEmpty();
}
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.UPDATED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}
failoverListenerManager,添加了三个Listener,JobCrashedJobListener,FailoverSettingsChangedJobListener,LegacyCrashedRunningItemListener,主要是用来处理失效转移,先简单介绍下这个功能
elastic-job在本次任务执行过程中不会重新分片,而是在下次调度之前才开启重新分片流程,那么假如其中一个分片在执行的过程中失败了怎么办?只有等到下次调度执行?答案是否定的,elastic-job允许将本次失败的任务转移到其他分片补偿执行,前提是其他分片未处于running状态(需要配置任务的failover=true,默认不开启)。下面看看是如何实现的
public void start() {
addDataListener(new JobCrashedJobListener());
addDataListener(new FailoverSettingsChangedJobListener());
addDataListener(new LegacyCrashedRunningItemListener());
}
// 当Job执行过程中宕机,用来处理失效转移的Listener
class JobCrashedJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.DELETED == event.getType() && instanceNode.isInstancePath(event.getKey())) {
String jobInstanceId = event.getKey().substring(instanceNode.getInstanceFullPath().length() + 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
// 处理failoverItems这块儿代码稍微有点儿绕
// jobInstanceId是已经挂掉的服务器实例,先查询有没有已经转移到jobInstanceId这个实例的分片
// 如果有,那么需要把已经转移到该分片的任务转移到其他分片
// 比如三个分片0,1,2,0执行过程中失败了,然后转移到了1,但是1在执行0的任务时又失败了,所以
// 就需要将0再次转移到其他分片
List<Integer> failoverItems = failoverService.getFailoveringItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
failoverService.setCrashedFailoverFlagDirectly(each);
failoverService.failoverIfNecessary();
}
} else {
// 这一步的逻辑就比较简单了,就是获取已经失效的分片,将其失败的任务转移到其他分片
for (int each : shardingService.getCrashedShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
// 这个Listener就比较简单了,判断failover配置是关闭,关闭failover需要删除相关的节点信息
class FailoverSettingsChangedJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (configNode.isConfigPath(event.getKey()) && Type.UPDATED == event.getType() && !YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration().isFailover()) {
failoverService.removeFailoverInfo();
}
}
}
// 这个Listener主要用来处理历史失败的任务分片,当所有实例都失效的时候,如果监听到有实例上线
// 就把历史所有失败的任务(正常失败或者failover失败)转移到刚上线的节点执行
class LegacyCrashedRunningItemListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (!isCurrentInstanceOnline(event) || !isFailoverEnabled()) {
return;
}
Set<JobInstance> availableJobInstances = new HashSet<>(instanceService.getAvailableJobInstances());
if (!isTheOnlyInstance(availableJobInstances)) {
return;
}
Map<Integer, JobInstance> allRunningItems = executionService.getAllRunningItems();
Map<Integer, JobInstance> allFailoveringItems = failoverService.getAllFailoveringItems();
if (allRunningItems.isEmpty() && allFailoveringItems.isEmpty()) {
return;
}
for (Entry<Integer, JobInstance> entry : allFailoveringItems.entrySet()) {
if (!availableJobInstances.contains(entry.getValue())) {
int item = entry.getKey();
failoverService.setCrashedFailoverFlagDirectly(item);
failoverService.clearFailoveringItem(item);
executionService.clearRunningInfo(Collections.singletonList(item));
allRunningItems.remove(item);
}
}
for (Entry<Integer, JobInstance> entry : allRunningItems.entrySet()) {
if (!availableJobInstances.contains(entry.getValue())) {
failoverService.setCrashedFailoverFlag(entry.getKey());
executionService.clearRunningInfo(Collections.singletonList(entry.getKey()));
}
}
failoverService.failoverIfNecessary();
}
monitorExecutionListenerManager,添加了一个MonitorExecutionSettingsChangedJobListener,这个逻辑比较简单
如果config的monitorExecution属性变成false,则清空所有分片Item的running信息
对应的节点路径为/namespace/jobName/sharding/item/running
public void start() {
addDataListener(new MonitorExecutionSettingsChangedJobListener());
}
class MonitorExecutionSettingsChangedJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (configNode.isConfigPath(event.getKey()) && Type.UPDATED == event.getType()
&& !YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration().isMonitorExecution()) {
executionService.clearAllRunningInfo();
}
}
}
shutdownListenerManager,删除instance节点时,需要把本地的实例关掉,后续的任务操作,该节点都不参与
public void start() {
addDataListener(new InstanceShutdownStatusJobListener());
}
class InstanceShutdownStatusJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused()
&& isRemoveInstance(event.getKey(), event.getType()) && !isReconnectedRegistryCenter()) {
schedulerFacade.shutdownInstance();
}
}
private boolean isRemoveInstance(final String path, final Type eventType) {
return instanceNode.isLocalInstancePath(path) && Type.DELETED == eventType;
}
// 这里需要判断一下实例节点是不是有存在,如果网络不稳定,而instance节点又是临时节点,所以当监听到删除
// 事件处理时,节点可能又重新连上了
private boolean isReconnectedRegistryCenter() {
return instanceService.isLocalJobInstanceExisted();
}
}
triggerListenerManager,任务触发监听器
public void start() {
addDataListener(new JobTriggerStatusJobListener());
}
class JobTriggerStatusJobListener implements DataChangedEventListener {
// 如果添加了/namespace/jobName/trigger/{instanceId}节点,那么就执行触发任务的逻辑
@Override
public void onChange(final DataChangedEvent event) {
if (!triggerNode.isLocalTriggerPath(event.getKey()) || Type.ADDED != event.getType()) {
return;
}
triggerService.removeTriggerFlag();
if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().isJobRunning(jobName)) {
// 如果当前实例处于running任务, 则不触发任务,作者也说了,后续会将触发的任务放入到队列中
// 这点xxl-job已经实现了,可以在管控台动态的配置
// TODO At present, it cannot be triggered when the job is running, and it will be changed to a stacked trigger in the future.
JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob();
}
}
}
rescheduleListenerManager,当cron配置变更时,需要重新设置job的调度频率
public void start() {
addDataListener(new CronSettingAndJobEventChangedJobListener());
}
class CronSettingAndJobEventChangedJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
if (configNode.isConfigPath(event.getKey()) && Type.UPDATED == event.getType() && !JobRegistry.getInstance().isShutdown(jobName)) {
JobConfiguration jobConfiguration = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration();
if (StringUtils.isEmpty(jobConfiguration.getCron())) {
// 如果cron为空,则在rescheduleJob时将任务设置成OneOffJob
JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob();
} else {
// 如果cronb不为空,则在rescheduleJob时调用quartz的scheduler.rescheduleJob方法重新
// 设置调度频率
JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(jobConfiguration.getCron(), jobConfiguration.getTimeZone());
}
}
}
}
guaranteeListenerManager,添加了StartedNodeRemovedJobListener和CompletedNodeRemovedJobListener,
下面看看StartedNodeRemovedJobListener的作用
elatic-job提供了自定义的ElasticJobListener接口, 允许在单个实例任务执行之前和执行之后实现自定义的逻辑
那么如果需要在所有任务的分片都开始后执行某个逻辑,比如需要看看所有分片启动消耗的时间是否超时,如何做呢?
elastic-job提供了AbstractDistributeOnceElasticJobListener类来实现,看看其beforeJobExecuted方法,代码如下
public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
if (shardingItems.isEmpty()) {
return;
}
// 当前任务开始后,将开始的item信息写入/namespace/JobName/guarantee/started/{item}节点
guaranteeService.registerStart(shardingItems);
// 等待直到当前任务的item信息写入/namespace/JobName/guarantee/started/{item}节点
while (!guaranteeService.isRegisterStartSuccess(shardingItems)) {
BlockUtils.waitingShortTime();
}
// 判断所有item(包括其他实例)是否写入/namespace/JobName/guarantee/started/{item}节点
if (guaranteeService.isAllStarted()) {
// 这里肯定至少有一个实例能执行到这一步,因为只有当当前实例开始成功后(即将item信息写入
// /namespace/JobName/guarantee/started/{item}节点),才会进行上面的
// isAllStarted判断
guaranteeService.executeInLeaderForLastStarted(this, shardingContexts);
return;
}
// 如果isAllStarted=false,则当前实例进入等待,直到/namespace/JobName/guarantee/started节点删除(由
// guaranteeService.executeInLeaderForLastStarted选出的Leader删除),
// 通过StartedNodeRemovedJobListener监听/namespace/JobName/guarantee/started节点的删除事件notify
// 当前实例
long before = timeService.getCurrentMillis();
try {
synchronized (startedWait) {
startedWait.wait(startedTimeoutMilliseconds);
}
} catch (final InterruptedException ex) {
Thread.interrupted();
}
// 如果AllStarted过程超时,则抛出异常,上层再对异常做具体的处理逻辑
if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {
guaranteeService.clearAllStartedInfo();
handleTimeout(startedTimeoutMilliseconds);
}
}
CompletedNodeRemovedJobListener的作用类似,就不重复阐述
public void start() {
addDataListener(new StartedNodeRemovedJobListener());
addDataListener(new CompletedNodeRemovedJobListener());
}
//
class StartedNodeRemovedJobListener implements DataChangedEventListener {
//
@Override
public void onChange(final DataChangedEvent event) {
if (Type.DELETED == event.getType() && guaranteeNode.isStartedRootNode(event.getKey())) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
}
}
}
}
}
class CompletedNodeRemovedJobListener implements DataChangedEventListener {
//
@Override
public void onChange(final DataChangedEvent event) {
if (Type.DELETED == event.getType() && guaranteeNode.isCompletedRootNode(event.getKey())) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete();
}
}
}
}
}
最后添加了一个regCenterConnectionStateListener来处理连接的状态变更事件,看下它的onStateChanged方法
public void onStateChanged(final CoordinatorRegistryCenter registryCenter, final State newState) {
if (JobRegistry.getInstance().isShutdown(jobName)) {
return;
}
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (State.UNAVAILABLE == newState) {
// 如果连接不可用,停止当前实例的Job
jobScheduleController.pauseJob();
} else if (State.RECONNECTED == newState) {
// 如果连接可用,上线实例并且恢复当前实例的Job
serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()));
instanceService.persistOnline();
executionService.clearRunningInfo(shardingService.getLocalShardingItems());
jobScheduleController.resumeJob();
}
}
终于把所有Listener的逻辑看完了,其实只要把Listener的逻辑全部搞明白,elastic-job的主要流程也就明白了。下面继续看看其他的逻辑。
着重说下ReconcileService,这个类继承自AbstractScheduledService,这个是guava的一个执行周期任务的工具类,要使用的化只需要实现其runOneIteration方法,该方法就是具体执行业务逻辑的方法,如下
protected void runOneIteration() {
// reconcileIntervalMinutes默认是0,所以默认不会周期性检查
int reconcileIntervalMinutes = configService.load(true).getReconcileIntervalMinutes();
if (reconcileIntervalMinutes > 0 && System.currentTimeMillis() - lastReconcileTime >= (long) reconcileIntervalMinutes * 60 * 1000) {
lastReconcileTime = System.currentTimeMillis();
// 周期性的检查是否需要reshard,判断条件是
// 1. /namespace/jobName/leader/sharding/necessary节点不存在
// 2. 当前shardingInfo中包含已经下线的机器
// 3. 在静态sharding且sharding信息已经存在时,不reshard
if (!shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers() && !(isStaticSharding() && hasShardingInfo())) {
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
// 设置reshard标记,即创建/namespace/jobName/leader/sharding/necessary节点
shardingService.setReshardingFlag();
}
}
}
// 每隔一分钟执行一次runOneIteration方法
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}
看下选举的逻辑,调用leaderService.electLeader方法进行leader选举,最终调用ZookeeperRegistryCenter.executeInLeader方法
public void executeInLeader(final String key, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(client, key)) {
latch.start();
latch.await();
callback.execute();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
handleException(ex);
}
}
通过设置curator框架的LeaderLatch进行选举,传入的一个回调函数,LeaderLatch其实是一种分布式锁机制,选举的逻辑简单来说就是在/namespace/leader/election/latch路径创建临时有序的节点,第一个创建成功的节点拿到分布式锁成为Leader,然后执行回调函数创建临时节点/namespace/leader/election/instance,value为JobInstanceId(IP+随机信息),设置完Leader信息之后,释放锁,然后其他的节点再依次获取到锁,执行回调函数,但是由于已经存在Leader信息,所以其他节点不会设置Leader信息,自然就成为了非Leader节点
看下回调函数的逻辑
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!hasLeader()) {
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
至此JobScheduler实例的构建就算完成了,能坚持阅读到这一步的同学不容易啊。
构建完成之后,还需要构建一个InstanceService实例,OneOffJobBootstrap需要用户手动调用execute方法执行
public final class OneOffJobBootstrap implements JobBootstrap {
......
private final InstanceService instanceService;
public void execute() {
// 触发时就是将所有实例写入/namespace/jobName/trigger/{instanceId}节点
// triggerListenerManager中的JobTriggerStatusJobListener监听到触发节点的创建,触发任务
instanceService.triggerAllInstances();
}
......
}
至此OneOffJobBootstrap的构建就分析完了。
ScheduleJobBootstrap和OneOffJobBootstrap类似,但是在新建时不需要像OneOffJobBootstrap那样需要一个instanceService,而是增加了一个schedule方法,调用对应的quartz的Scheduler的start方法开始周期性任务
public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
}
public void schedule() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron can not be empty.");
jobScheduler.getJobScheduleController().scheduleJob(jobScheduler.getJobConfig().getCron(), jobScheduler.getJobConfig().getTimeZone());
}
那schedule方法又是在什么地方被调用的呢?
回看ElasticJobLiteAutoConfiguration类,自动装配时还增加了一个ScheduleJobBootstrapStartupRunner,这个类实现了
CommandLineRunner接口,这个接口的作用是在所有Bean都初始化完成后,Spring容器会触发CommandLineRunner的
run方法
public class ScheduleJobBootstrapStartupRunner implements CommandLineRunner, ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void run(final String... args) {
log.info("Starting ElasticJob Bootstrap.");
applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
log.info("ElasticJob Bootstrap started.");
}
}
run方法中获取了SpringContext中所有注册的ScheduleJobBootstrap实例,调用其schedule方法。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。