前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elastic-Job系列二之调度全流程

Elastic-Job系列二之调度全流程

原创
作者头像
用户9511949
修改2024-07-09 15:16:48
920
修改2024-07-09 15:16:48

1 ElasticJobExecutor

elastic-job真正任务的执行时通过ElasticJobExecutor来执行,在新建JobScheduler实例时新建该实例,其内部构造函数如下

其中elasticJob属性为用户执行业务逻辑的实例,其他属性的作用在后续的分析中会一一提到

代码语言:javascript
复制
private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
    this.elasticJob = elasticJob;
    this.jobFacade = jobFacade;
    this.jobItemExecutor = jobItemExecutor;
    executorContext = new ExecutorContext(jobFacade.loadJobConfiguration(true));
    itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
}

接下来看看调度执行的入口,ElasticJobExecutor的无参execute方法,先整体看看执行的步骤

代码语言:javascript
复制
public void execute() {
    JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
    // 1 查看是否需要重新加载任务处理线程池和任务出错处理方式
    executorContext.reloadIfNecessary(jobConfig);
    JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
    try {
        // 2 校验Job执行的环境条件
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
    // 3 获取分片信息
    ShardingContexts shardingContexts = jobFacade.getShardingContexts();
    // 4 发布任务staging信息
    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
    // 5 如果当前需要执行的分片正在running,那么设置所有的分片misfire,然后直接返回
    if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
                shardingContexts.getShardingItemParameters().keySet()));
        return;
    }
    try {
        // 6 执行job之前的自定义Listener的逻辑
        jobFacade.beforeJobExecuted(shardingContexts);
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        // CHECKSTYLE:ON
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
    // 7 执行job
    execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
    // 8 如果需要执行misfire的任务,则在此处触发执行
    while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
    }
    // 9 任务执行完成之后看是否需要failover(查看/namespace/jobName/leader/failover/items下的子节点
    // 是否为空,如果不为空,当前节点参与failover的leader选举,成功则将执行失败的分片需要执行的任务
    // 在当前节点重新执行)
    jobFacade.failoverIfNecessary();
    try {
        // 10 job执行之后的自定义Listener的逻辑
        jobFacade.afterJobExecuted(shardingContexts);
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        // CHECKSTYLE:ON
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
}

执行步骤还是比较清晰的,下面依次看看各个步骤具体干了什么

第一步:executorContext.reloadIfNecessary(jobConfig),查看是否需要重新加载context中可以需要重新加载的项

代码语言:javascript
复制
public void reloadIfNecessary(final JobConfiguration jobConfiguration) {
    reloadableItems.values().forEach(each -> each.reloadIfNecessary(jobConfiguration));
}

reloadableItems中的类需要实现Reloadable接口,elastic-job利用SPI机制加载实现了Reloadable接口的类

代码语言:javascript
复制
public ExecutorContext(final JobConfiguration jobConfig) {
    ServiceLoader.load(Reloadable.class).forEach(each -> {
        ElasticJobServiceLoader.newTypedServiceInstance(Reloadable.class, each.getType(), new Properties())
                .ifPresent(reloadable -> reloadableItems.put(reloadable.getType(), reloadable));
    });
    initReloadable(jobConfig);
}

有两个类实现了Reloadable接口

ExecutorServiceReloadable:获取任务执行的线程池,自带两种线程数设置方式,单线程和根据CPU的核数设置,默认是根据CPU核数设置,所以其reloadIfNecessary方法主要就是根据config中配置的线程池获取方式加载对应的线程池

代码语言:javascript
复制
public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
    String newJobExecutorServiceHandlerType = Strings.isNullOrEmpty(jobConfig.getJobExecutorServiceHandlerType())
            ? JobExecutorServiceHandlerFactory.DEFAULT_HANDLER
            : jobConfig.getJobExecutorServiceHandlerType();
    if (newJobExecutorServiceHandlerType.equals(jobExecutorServiceHandlerType)) {
        return;
    }
    log.debug("JobExecutorServiceHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobExecutorServiceHandlerType, newJobExecutorServiceHandlerType);
    reload(newJobExecutorServiceHandlerType, jobConfig.getJobName());
}

JobErrorHandlerReloadable:获取任务出错处理类,elastic-job支持的错误处理方式有6中,钉钉通知、邮箱通知、忽略、记录日志,微信通知、直接排除异常,reloadIfNecessary方法如下

代码语言:javascript
复制
public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
    String newJobErrorHandlerType = Strings.isNullOrEmpty(jobConfig.getJobErrorHandlerType()) ? JobErrorHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobErrorHandlerType();
    if (newJobErrorHandlerType.equals(jobErrorHandlerType) && props.equals(jobConfig.getProps())) {
        return;
    }
    log.debug("JobErrorHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobErrorHandlerType, newJobErrorHandlerType);
    reload(newJobErrorHandlerType, jobConfig.getProps());
}

第二步:jobFacade.checkJobExecutionEnvironment(),主要就是判断下当前server和注册中心服务器的时间差有没有超过限制

代码语言:javascript
复制
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
    configService.checkMaxTimeDiffSecondsTolerable();
}
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
        int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
        if (0 > maxTimeDiffSeconds) {
            return;
        }
        long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
        if (timeDiff > maxTimeDiffSeconds * 1000L) {
            throw new JobExecutionEnvironmentException(
                    "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
        }
    }

第三步:jobFacade.getShardingContexts(),获取分片信息

代码语言:javascript
复制
public ShardingContexts getShardingContexts() {
    boolean isFailover = configService.load(true).isFailover();
    if (isFailover) {
        List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
        // 如果开启了失效转移且转移到当前节点的item不为空,那么直接调用
        // executionContextService.getJobShardingContext方法获取当前节点需要处理的item
        if (!failoverShardingItems.isEmpty()) {
            return executionContextService.getJobShardingContext(failoverShardingItems);
        }
    }
    // 如果没有开启失效转移或者转移到当前节点的item为空,在执行之前先reshard(如果需要)
    shardingService.shardingIfNecessary();
    // 从Zookeeper中获取当前节点需要执行的item信息
    List<Integer> shardingItems = shardingService.getLocalShardingItems();
    if (isFailover) {
        // 如果开启了失效转移,那么需要移除已经转移到其他节点的item
        // 比如当前实例本来需要执行0和1,但是0已经转移到了其他的节点,则需要把0删除掉
        shardingItems.removeAll(failoverService.getLocalTakeOffItems());
    }
    shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
    return executionContextService.getJobShardingContext(shardingItems);
}

重点看看reshard的流程

代码语言:javascript
复制
public void shardingIfNecessary() {
    // 获取可用的instance
    List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
    // 如果不需要sharding(判断/namespace/jobName/leader/sharding/necessary节点是否存在)
    // 或者无可用实例,直接返回
    if (!isNeedSharding() || availableJobInstances.isEmpty()) {
        return;
    }
    // 判断当前节点是否是leader,只有leader才能reshard
    // 这里如果有leader,直接判断即可,如果没有,则当前节点需要参与选举,选举完成之后再判断
    if (!leaderService.isLeaderUntilBlock()) {
        // 如果不是leader节点则需要等到sharding完成之后直接返回
        blockUntilShardingCompleted();
        return;
    }
    // sharding之前等待当前正在运行的item结束
    waitingOtherShardingItemCompleted();
    JobConfiguration jobConfig = configService.load(false);
    int shardingTotalCount = jobConfig.getShardingTotalCount();
    log.debug("Job '{}' sharding begin.", jobName);
    // 标记正在sahrding
    jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
    // 重置shardingInfo,设置好最新的分片信息,但是不设置分片的处理实例
    resetShardingInfo(shardingTotalCount);
    // 获取job的sharding策略,elastic默认支持三种策略,平均分片、奇偶分片、轮询分片,默认平均分片
    JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
    // 设置分片实例
    jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
    log.debug("Job '{}' sharding complete.", jobName);
}

第四步:jobFacade.postJobStatusTraceEvent,通过jobTracingEventBus发布了一条信息

代码语言:javascript
复制
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
    TaskContext taskContext = TaskContext.from(taskId);
    jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
            taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType().name(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
    if (!Strings.isNullOrEmpty(message)) {
        log.trace(message);
    }
}

第五步:jobFacade.misfireIfRunning,这个方法比较简单,上文代码注释中仪解释其作用,此处省略

第六步:jobFacade.beforeJobExecuted,这个也没有什么说的,就是获取自定义Listener列表,依次执行即可

第七步:execute执行job,包括第八步misifre的执行,每次执行完成之后,如果之前存在未执行的任务且当前不需要重新shard且开启了misfire的执行,则需要重新执行

代码语言:javascript
复制
private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
    // item为空,直接返回
    if (shardingContexts.getShardingItemParameters().isEmpty()) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
        return;
    }
    // 如果开启了monitorExecution,为item设置running节点(/namespace/jobName/sharding/{item}/running)
    // 节点的类型根据是否开启failover有关
    // 如果开启了failover,则创建的是持久化节点(当实例宕机时,running节点不能丢,因为有Listener要监听
    // 实例宕机事件,从而设置failOver的item)
    // 如果failover没开,则创建的是临时节点,实例宕机,running信息自动删除
    jobFacade.registerJobBegin(shardingContexts);
    String taskId = shardingContexts.getTaskId();
    // 发布task running信息
    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
    try {
        // 执行任务,具体逻辑看下面的代码
        process(jobConfig, shardingContexts, executionSource);
    } finally {
        // 执行完成之后删除running信息和failover信息(如果有)
        jobFacade.registerJobCompleted(shardingContexts);
        // 根据是否有错误信息,发布不同的信息
        if (itemErrorMessages.isEmpty()) {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
        } else {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
            itemErrorMessages.clear();
        }
    }
}
代码语言:javascript
复制
// 如果是一个item,直接执行即可,如果是多个item,则需要将所有任务执行完成之后再返回
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
    Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
    if (1 == items.size()) {
        int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
        JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
        // 这里的process方法就是调用JobItemExecutor的process方法处理
        // 比如如果是SimpleJob,则调用用户定义的Job实例的Executor方法
        // 如果是其他类型的Job,都有对应的JobItemExecutor处理
        process(jobConfig, shardingContexts, item, jobExecutionEvent);
        return;
    }
    CountDownLatch latch = new CountDownLatch(items.size());
    for (int each : items) {
        JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
        ExecutorService executorService = executorContext.get(ExecutorService.class);
        if (executorService.isShutdown()) {
            return;
        }
        executorService.submit(() -> {
            try {
                process(jobConfig, shardingContexts, each, jobExecutionEvent);
            } finally {
                latch.countDown();
            }
        });
    }
    try {
        latch.await();
    } catch (final InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

第九步:jobFacade.failoverIfNecessary(),任务执行完成之后看是否需要failover,

查看/namespace/jobName/leader/failover/items下的子节点是否为空,如果不为空,当前节点参与failover的leader选举,成功则将执行失败的分片需要执行的任务在当前节点重新执行,调用failoverService.failoverIfNecessary方法

代码语言:javascript
复制
public void failoverIfNecessary() {
    if (needFailover()) {
        jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
    }
}

看下选举成功之后的回调函数FailoverLeaderExecutionCallback

代码语言:javascript
复制
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
    
    @Override
    public void execute() {
        if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
            return;
        }
        // 获取需要failover的子节点,/namespace/jobName/leader/failover/items下的子节点
        // 这里为什么只取一个任务?因为肯定会有多个分片失效的场景,只取一个任务防止当前实例压力过大?
        int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
        log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
        // 设置crashedItem 的failover信息(即crashedItem分片的任务转移到当前实例上来)
        jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        jobNodeStorage.fillJobNode(FailoverNode.getExecutingFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
        // TODO Instead of using triggerJob, use executor for unified scheduling
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (null != jobScheduleController) {
            // 触发任务
            jobScheduleController.triggerJob();
        }
    }
}

第十步:jobFacade.afterJobExecuted(shardingContexts),执行任务执行完成之后的自定义Listener逻辑

至此elastic-job的整个执行流程结束

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 ElasticJobExecutor
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档