前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式作业系统 Elastic-Job-Cloud 源码分析 —— 作业调度(一)

分布式作业系统 Elastic-Job-Cloud 源码分析 —— 作业调度(一)

作者头像
芋道源码
发布2019-10-29 17:52:55
7610
发布2019-10-29 17:52:55
举报
文章被收录于专栏:芋道源码1024

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/cloud-job-scheduler-and-executor-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述
  • 2. 作业执行类型
  • 3. Producer 发布任务
  • 4. TaskLaunchScheduledService 提交任务
  • 5. TaskExecutor 执行任务
  • 6. SchedulerEngine 处理任务的状态变更

1. 概述

本文主要分享 Elastic-Job-Cloud 调度主流程。对应到 Elastic-Job-Lite 源码解析文章如下:

  • 《Elastic-Job-Lite 源码分析 —— 作业初始化》
  • 《Elastic-Job-Lite 源码分析 —— 作业执行》
  • 《Elastic-Job-Lite 源码分析 —— 作业分片》

如果你阅读过以下文章,有助于对本文的理解:

  • 《基于Mesos的当当作业云Elastic Job Cloud》
  • 《由浅入深 | 如何优雅地写一个Mesos Framework》

? 另外,笔者假设你已经对 《Elastic-Job-Lite 源码分析系列》 有一定的了解。

本文涉及到主体类的类图如下( 打开大图 ):

你行好事会因为得到赞赏而愉悦 同理,开源项目贡献者会因为 Star 而更加有动力 为 Elastic-Job 点赞!传送门

Elastic-Job-Cloud 基于 Mesos 实现分布式作业调度,或者说 Elastic-Job-Cloud 是 Mesos 上的 框架( Framework )。

一个 Mesos 框架由两部分组成:

  • 控制器部分,称为调度器( Scheduler )。
  • 工作单元部分,称为执行器( Executor )。

Elastic-Job-Cloud 由两个项目组成:

  • Elastic-Job-Cloud-Scheduler,实现调度器,实现类为 com.dangdang.ddframe.job.cloud.scheduler.mesos.SchedulerEngine
  • Elastic-Job-Cloud-Executor,实现执行器,实现类为 com.dangdang.ddframe.job.cloud.executor.TaskExecutor

本文略微“啰嗦”,请保持耐心。搭配《用Mesos框架构建分布式应用》一起阅读,理解难度降低 99%。OK,开始我们的 Cloud 之旅。

2. 作业执行类型

在 Elastic-Job-Cloud,作业执行分成两种类型:

  • 常驻作业

常驻作业是作业一旦启动,无论运行与否均占用系统资源; 常驻作业适合初始化时间长、触发间隔短、实时性要求高的作业,要求资源配备充足。

  • 瞬时作业

瞬时作业是在作业启动时占用资源,运行完成后释放资源。 瞬时作业适合初始化时间短、触发间隔长、允许延迟的作业,一般用于资源不太充分,或作业要求的资源多,适合资源错峰使用的场景。

Elastic-Job-Cloud 不同于 Elastic-Job-Lite 去中心化执行调度,转变为 Mesos Framework 的中心节点调度。这里不太理解,没关系,下文看到具体代码就能明白了。

常驻作业、瞬时作业在调度中会略有不同,大体粗略流程如下:

下面,我们针对每个过程一节一节解析。

3. Producer 发布任务

在上文《Elastic-Job-Cloud 源码分析 —— 作业配置》的「3.1.1 操作云作业配置」可以看到添加云作业配置后,Elastic-Job-Cloud-Scheduler 会执行作业调度,实现代码如下:

代码语言:javascript
复制
// ProducerManager.java
/**
* 调度作业.
* 
* @param jobConfig 作业配置
*/
public void schedule(final CloudJobConfiguration jobConfig) {
   // 应用 或 作业 被禁用,不调度
   if (disableAppService.isDisabled(jobConfig.getAppName()) || disableJobService.isDisabled(jobConfig.getJobName())) {
       return;
   }
   if (CloudJobExecutionType.TRANSIENT == jobConfig.getJobExecutionType()) { // 瞬时作业
       transientProducerScheduler.register(jobConfig);
   } else if (CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) { // 常驻作业
       readyService.addDaemon(jobConfig.getJobName());
   }
}
  • 瞬时作业和常驻作业在调度上会有一定的不同。

3.1 常驻作业

常驻作业在调度时,直接添加到待执行作业队列。What?岂不是马上就运行了!No No No,答案在「5. TaskExecutor 执行任务」,这里先打住。

代码语言:javascript
复制
// ReadyService.java
/**
* 将常驻作业放入待执行队列.
*
* @param jobName 作业名称
*/
public void addDaemon(final String jobName) {
   if (regCenter.getNumChildren(ReadyNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
       log.warn("Cannot add daemon job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
       return;
   }
   Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
   if (!cloudJobConfig.isPresent() || CloudJobExecutionType.DAEMON != cloudJobConfig.get().getJobExecutionType() || runningService.isJobRunning(jobName)) {
       return;
   }
   // 添加到待执行队列
   regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
}

// ReadyNode.java
final class ReadyNode {

    static final String ROOT = StateNode.ROOT + "/ready";

    private static final String READY_JOB = ROOT + "/%s"; // %s = ${JOB_NAME}
}
  • ReadyService,待执行作业队列服务,提供对待执行作业队列的各种操作方法。
  • 待执行作业队列存储在注册中心( Zookeeper )的持久数据节点 /${NAMESPACE}/state/ready/${JOB_NAME},存储值为待执行次数。例如此处,待执行次数为 1。使用 zkClient 查看如下: [zk: localhost:2181(CONNECTED) 4] ls /elastic-job-cloud/state/ready [test_job_simple] [zk: localhost:2181(CONNECTED) 5] get /elastic-job-cloud/state/ready/test_job_simple 1
  • 在运维平台,我们可以看到待执行作业队列:
  • 从官方的 RoadMap 来看,待执行作业队列未来会使用 Redis 存储以提高性能。 FROM http://elasticjob.io/docs/elastic-job-cloud/03-design/roadmap/ Redis Based Queue Improvement

3.2 瞬时作业

瞬时作业在调度时,使用发布瞬时作业任务的调度器( TransientProducerScheduler )调度作业。当瞬时作业到达作业执行时间,添加到待执行作业队列。

3.2.1 TransientProducerScheduler

TransientProducerScheduler,发布瞬时作业任务的调度器,基于 Quartz 实现对瞬时作业的调度。初始化代码如下:

代码语言:javascript
复制
// TransientProducerScheduler.java
void start() {
   scheduler = getScheduler();
   try {
       scheduler.start();
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}

private Scheduler getScheduler() {
   StdSchedulerFactory factory = new StdSchedulerFactory();
   try {
       factory.initialize(getQuartzProperties());
       return factory.getScheduler();
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}

private Properties getQuartzProperties() {
   Properties result = new Properties();
   result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
   result.put("org.quartz.threadPool.threadCount", Integer.toString(Runtime.getRuntime().availableProcessors() * 2)); // 线程池数量
   result.put("org.quartz.scheduler.instanceName", "ELASTIC_JOB_CLOUD_TRANSIENT_PRODUCER");
   result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName());
   result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
   return result;
}

3.2.2 注册瞬时作业

调用 TransientProducerScheduler#register(...) 方法,注册瞬时作业。实现代码如下:

代码语言:javascript
复制
// TransientProducerScheduler.java
private final TransientProducerRepository repository;

synchronized void register(final CloudJobConfiguration jobConfig) {
   String cron = jobConfig.getTypeConfig().getCoreConfig().getCron();
   // 添加 cron 作业集合
   JobKey jobKey = buildJobKey(cron);
   repository.put(jobKey, jobConfig.getJobName());
   // 调度 作业
   try {
       if (!scheduler.checkExists(jobKey)) {
           scheduler.scheduleJob(buildJobDetail(jobKey), buildTrigger(jobKey.getName()));
       }
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}
  • 调用 #buildJobKey(…) 方法,创建 Quartz JobKey。你会发现很有意思的使用的是 cron 参数作为主键。Why?在看下 !scheduler.checkExists(jobKey) 处,相同 JobKey( cron ) 的作业不重复注册到 Quartz Scheduler。Why?此处是一个优化,相同 cron 使用同一个 Quartz Job,Elastic-Job-Cloud-Scheduler 可能会注册大量的瞬时作业,如果一个瞬时作业创建一个 Quartz Job 太过浪费,特别是 cron每分钟、每5分钟、每小时、每天已经覆盖了大量的瞬时作业的情况。因此,相同 cron 使用同一个 Quartz Job。
代码语言:javascript
复制
调用 TransientProducerRepository#put(...) 以 Quartz JobKey 为主键聚合作业。
final class TransientProducerRepository {/**
 * cron 作业集合
 * key:作业Key
 */
private final ConcurrentHashMap&lt;JobKey, List&lt;String&gt;&gt; cronTasks = new ConcurrentHashMap&lt;&gt;(256, 1);

synchronized void put(final JobKey jobKey, final String jobName) {
    remove(jobName);
    List&lt;String&gt; taskList = cronTasks.get(jobKey);
 if (null == taskList) {
        taskList = new CopyOnWriteArrayList&lt;&gt;();
        taskList.add(jobName);
        cronTasks.put(jobKey, taskList);
 return;
    }
 if (!taskList.contains(jobName)) {
        taskList.add(jobName);
    }
}

}
代码语言:javascript
复制
调用 #buildJobDetail(...) 创建 Quartz Job 信息。实现代码如下:
private JobDetail buildJobDetail(final JobKey jobKey) {
    JobDetail result = JobBuilder.newJob(ProducerJob.class) // ProducerJob.java
            .withIdentity(jobKey).build();
    result.getJobDataMap().put("repository", repository);
    result.getJobDataMap().put("readyService", readyService);
 return result;
}

JobBuilder#newJob(…) 的参数是 ProducerJob,下文会讲解到。
调用 #buildTrigger(...) 创建 Quartz Trigger。实现代码如下:
private Trigger buildTrigger(final String cron) {
 return TriggerBuilder.newTrigger()
           .withIdentity(cron)
           .withSchedule(CronScheduleBuilder.cronSchedule(cron) // cron
           .withMisfireHandlingInstructionDoNothing())
           .build();
}

3.2.3 ProducerJob

ProducerJob,当 Quartz Job 到达 cron 执行时间( 即作业执行时间),将相应的瞬时作业添加到待执行作业队列。实现代码如下:

代码语言:javascript
复制
public static final class ProducerJob implements Job {

   private TransientProducerRepository repository;

   private ReadyService readyService;

   @Override
   public void execute(final JobExecutionContext context) throws JobExecutionException {
       List<String> jobNames = repository.get(context.getJobDetail().getKey());
       for (String each : jobNames) {
           readyService.addTransient(each);
       }
   }
}
代码语言:javascript
复制
调用 TransientProducerRepository#get(...) 方法,获得该 Job 对应的作业集合。实现代码如下:
final class TransientProducerRepository {/**
 * cron 作业集合
 * key:作业Key
 */
private final ConcurrentHashMap&lt;JobKey, List&lt;String&gt;&gt; cronTasks = new ConcurrentHashMap&lt;&gt;(256, 1);

List&lt;String&gt; get(final JobKey jobKey) {
    List&lt;String&gt; result = cronTasks.get(jobKey);
 return null == result ? Collections.&lt;String&gt;emptyList() : result;
}

}

调用 ReadyService#addTransient(...) 方法,添加瞬时作业到待执行作业队列。实现代码如下:
/**
* 将瞬时作业放入待执行队列.
* 
* @param jobName 作业名称
*/
public void addTransient(final String jobName) {
 //
 if (regCenter.getNumChildren(ReadyNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
       log.warn("Cannot add transient job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
 return;
   }
 //
   Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
 if (!cloudJobConfig.isPresent() || CloudJobExecutionType.TRANSIENT != cloudJobConfig.get().getJobExecutionType()) {
 return;
   }
 // 
   String readyJobNode = ReadyNode.getReadyJobNodePath(jobName);
   String times = regCenter.getDirectly(readyJobNode);
 if (cloudJobConfig.get().getTypeConfig().getCoreConfig().isMisfire()) {
       regCenter.persist(readyJobNode, Integer.toString(null == times ? 1 : Integer.parseInt(times) + 1));
   } else {
       regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
   }
}

3.3 小结

无论是常驻作业还是瞬时作业,都会加入到待执行作业队列。目前我们看到瞬时作业的每次调度是 TransientProducerScheduler 负责。那么常驻作业的每次调度呢?「5. TaskExecutor 执行任务」会看到它的调度,这是 Elastic-Job-Cloud 设计巧妙有趣的地方。

艿艿:因为本文实在有点太长了,微信有文章长度限制,麻烦胖友访问 http://www.iocoder.cn/Elastic-Job/cloud-job-scheduler-and-executor-first/ 进行继续阅读。 啊啊啊啊,我当初是怎么写完的。。。。。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-01-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 芋道源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 作业执行类型
  • 3. Producer 发布任务
    • 3.1 常驻作业
      • 3.2 瞬时作业
        • 3.2.1 TransientProducerScheduler
        • 3.2.2 注册瞬时作业
        • 3.2.3 ProducerJob
      • 3.3 小结
      相关产品与服务
      云数据库 Redis
      腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档