专栏首页芋道源码1024分布式作业系统 Elastic-Job-Cloud 源码分析 —— 本地运行模式

分布式作业系统 Elastic-Job-Cloud 源码分析 —— 本地运行模式

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/cloud-local-executor/

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述
  • 2. 配置
  • 3. 运行
  • 666. 彩蛋

1. 概述

本文主要分享 Elastic-Job-Cloud 本地运行模式,对应《官方文档 —— 本地运行模式》。

有什么用呢?引用官方解答:

在开发 Elastic-Job-Cloud 作业时,开发人员可以脱离 Mesos 环境,在本地运行和调试作业。可以利用本地运行模式充分的调试业务功能以及单元测试,完成之后再部署至 Mesos 集群。 本地运行作业无需安装 Mesos 环境。

? 是不是很赞 + 1024?!

本文涉及到主体类的类图如下( 打开大图 ):

2. 配置

LocalCloudJobConfiguration,本地云作业配置,在《Elastic-Job-Cloud 源码分析 —— 作业配置》「3.2 本地云作业配置」有详细解析。

创建本地云作业配置示例代码如下(来自官方):

LocalCloudJobConfiguration config = new LocalCloudJobConfiguration(
    new SimpleJobConfiguration(
    // 配置作业类型和作业基本信息
    JobCoreConfiguration.newBuilder("FooJob", "*/2 * * * * ?", 3) 
        .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
        .jobParameter("dbName=dangdang").build(), "com.dangdang.foo.FooJob"),
        // 配置当前运行的作业是第几个分片 
        1,  
        // 配置Spring相关参数。如果不配置,代表不使用 Spring 配置。
        "testSimpleJob" , "applicationContext.xml"); 

3. 运行

LocalTaskExecutor,本地作业执行器。

创建本地作业执行器示例代码如下(来自官方):

new LocalTaskExecutor(localJobConfig).execute();

可以看到,调用 LocalTaskExecutor#execute() 方法,执行作业逻辑,实现代码如下:

// LocalTaskExecutor.java

public void execute() {
   AbstractElasticJobExecutor jobExecutor;
   CloudJobFacade jobFacade = new CloudJobFacade(getShardingContexts(), getJobConfigurationContext(), new JobEventBus());
   // 创建执行器
   switch (localCloudJobConfiguration.getTypeConfig().getJobType()) {
       case SIMPLE:
           jobExecutor = new SimpleJobExecutor(getJobInstance(SimpleJob.class), jobFacade);
           break;
       case DATAFLOW:
           jobExecutor = new DataflowJobExecutor(getJobInstance(DataflowJob.class), jobFacade);
           break;
       case SCRIPT:
           jobExecutor = new ScriptJobExecutor(jobFacade);
           break;
       default:
           throw new UnsupportedOperationException(localCloudJobConfiguration.getTypeConfig().getJobType().name());
   }
   // 执行作业
   jobExecutor.execute();
}
  • 调用 #getShardingContexts() 方法,创建分片上下文集合( ShardingContexts ),实现代码如下:
private ShardingContexts getShardingContexts() {
   JobCoreConfiguration coreConfig = localCloudJobConfiguration.getTypeConfig().getCoreConfig();
   Map<Integer, String> shardingItemMap = new HashMap<>(1, 1);
   shardingItemMap.put(localCloudJobConfiguration.getShardingItem(),
           new ShardingItemParameters(coreConfig.getShardingItemParameters()).getMap().get(localCloudJobConfiguration.getShardingItem()));
   return new ShardingContexts(
           // taskId ?
           Joiner.on("@-@").join(localCloudJobConfiguration.getJobName(), localCloudJobConfiguration.getShardingItem(), "READY", "foo_slave_id", "foo_uuid"),
           localCloudJobConfiguration.getJobName(), coreConfig.getShardingTotalCount(), coreConfig.getJobParameter(), shardingItemMap);
}
  • 调用 #getJobConfigurationContext() 方法,创建内部的作业配置上下文( JobConfigurationContext ),实现代码如下:
private <T extends ElasticJob> T getJobInstance(final Class<T> clazz) {
   Object result;
   if (Strings.isNullOrEmpty(localCloudJobConfiguration.getApplicationContext())) { // 直接创建 ElasticJob
       String jobClass = localCloudJobConfiguration.getTypeConfig().getJobClass();
       try {
           result = Class.forName(jobClass).newInstance();
       } catch (final ReflectiveOperationException ex) {
           throw new JobSystemException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", jobClass, ex.getMessage());
       }
   } else { // Spring 环境获得 ElasticJob
       result = new ClassPathXmlApplicationContext(localCloudJobConfiguration.getApplicationContext()).getBean(localCloudJobConfiguration.getBeanName());
   }
   return clazz.cast(result);
}
  • 调用 #getJobInstance(...) 方法, 获得分布式作业( ElasticJob )实现实例,实现代码如下:
private JobConfigurationContext getJobConfigurationContext() {
   Map<String, String> jobConfigurationMap = new HashMap<>();
   jobConfigurationMap.put("jobClass", localCloudJobConfiguration.getTypeConfig().getJobClass());
   jobConfigurationMap.put("jobType", localCloudJobConfiguration.getTypeConfig().getJobType().name());
   jobConfigurationMap.put("jobName", localCloudJobConfiguration.getJobName());
   jobConfigurationMap.put("beanName", localCloudJobConfiguration.getBeanName());
   jobConfigurationMap.put("applicationContext", localCloudJobConfiguration.getApplicationContext());
   if (JobType.DATAFLOW == localCloudJobConfiguration.getTypeConfig().getJobType()) { // 数据流作业
       jobConfigurationMap.put("streamingProcess", Boolean.toString(((DataflowJobConfiguration) localCloudJobConfiguration.getTypeConfig()).isStreamingProcess()));
   } else if (JobType.SCRIPT == localCloudJobConfiguration.getTypeConfig().getJobType()) { // 脚本作业
       jobConfigurationMap.put("scriptCommandLine", ((ScriptJobConfiguration) localCloudJobConfiguration.getTypeConfig()).getScriptCommandLine());
   }
   return new JobConfigurationContext(jobConfigurationMap);
}
  • 调用 AbstractElasticJobExecutor#execute() 方法,执行作业逻辑。 Elastic-Job-Lite 和 Elastic-Job-Cloud 作业执行基本一致,在《Elastic-Job-Lite 源码分析 —— 作业执行》有详细解析。

本文分享自微信公众号 - 芋道源码(javayuanma),作者:老艿艿

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-01-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 如何 1 分钟定位工作中 80% 的 Linux 高频问题?

    回想下你用的 Google 搜索,淘宝购物,用 QQ、微信聊天的时候,其实这些软件和服务的背后,都是成千上万的 Linux 服务器在支撑。

    芋道源码
  • 芋道 Spring Boot MongoDB 入门

    MongoDB 中的许多概念在 MySQL 中具有相近的类比。本表概述了每个系统中的一些常见概念。

    芋道源码
  • 芋道 Spring Boot JdbcTemplate 入门

    虽然说,我们现在项目的 DAL 数据访问层,大多使用 MyBatis 或者 JPA ,但是可能极少部分情况下也会使用 JDBC 。而使用的 JDBC 一般来说,...

    芋道源码
  • (八十五)c#Winform自定义控件-引用区块

    GitHub:https://github.com/kwwwvagaa/NetWinformControl

    冰封一夏
  • 【董天一】什么是IPFS?(二)

            整个IPFS系统是一个分布式的文件存储系统, 那么在下载相关数据的时候, 将从多个节点同时下载, 相比于HTTP从中心服务器的下载速度要快很多,...

    圆方圆学院
  • shell(二)

    The death of a dream is the day that you stop believing in the work it takes to ...

    小闫同学啊
  • 聊聊storm的direct grouping

    direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个task去接收它发射出来的tuple。direct gr...

    codecraft
  • 聊聊storm的direct grouping

    direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个task去接收它发射出来的tuple。direct gr...

    codecraft
  • 大会活动|报名-2019世界人工智能大会-腾讯论坛:8月,在上海等你!

    购物逛街、预订外卖、缴纳水电费、挂号看病,社保登记……这些寻常琐事如今都可以在指尖上办理。科技总是充满力量,不断提升我们的生活品质。当人们还在感叹移动互联网大潮...

    优图实验室
  • [UWP]抄抄《CSS 故障艺术》的动画

    上个月看到CSS 故障艺术这篇文章,最近想转换心情于是开始抄它的动画了(顺便为博客园的UWP板块吊命)。CSS的mix-blend-mode好像很好用,这次用U...

    dino.c

扫码关注云+社区

领取腾讯云代金券