前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂Elastic-Job(内附源码解析)

一文搞懂Elastic-Job(内附源码解析)

作者头像
胖虎
发布2019-06-26 17:16:11
3.2K2
发布2019-06-26 17:16:11
举报
文章被收录于专栏:晏霖晏霖

前言

Elastic-Job是当当基于Zookepper,Quartz开发并且开源的Java分布式定时任务,解决Quartz不支持分布式的弊端。它由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

题外话,当当的Sharding-JDBC已经加入Apache管理了,而这个Elastic-Job没有。

还有一句题外话,私下和好友交流,当初要使用elastic 后来也被换了,具体原因也没说,咱也不知道,咱也不敢问。我贴个图大家应该会明白,这是Elastic-Job 在github的地址:https://github.com/elasticjob/elastic-job-lite

来个图更能说明问题。我什么也没说啊。。。。

采用的公司还是蛮多的,像 36氪、国美、唯品会……链接:http://elasticjob.io/docs/elastic-job-lite/00-overview/company/

网上也是对Elastic-Job众多好评,目前国内使用定时作业中间件也就xxl-job能跟他竞争。

再不更新代码都落灰了……

正文

正文我就不扯没用的了,好多人也是冲着标题写着 ‘源码解析’ 来的吧,好,请看下面就是我们老艿艿总结的源码解析专栏

我也是老艿艿的不锈钢粉丝哦!!!

微信公众号Elastic-Job源码解析地址:https://mp.weixin.qq.com/s/m1VRIzeFfa_6Ly_gEDNK-w

老艿艿微信公众号:芋道源码

一定有你想看的

不是有意打广告,而是老艿艿文章确实不错,也从不搞一些吸粉的商业活动,不做盗版。文章怎么样,看一下你就懂了。

友情提醒:看源码前要会使用并理解简单的原理,这样吸收比较好,上来就看源码直接就从入门到放弃。

Elastic-Job作为分布式作业中间件有一个重要的概念就是-分片。

分片概念: 这里的分片是指将一个任务拆分成多个任务执行,有点类似 java里的Fork-Join 框架思想。

举个例子:我现在要对一些数据进行处理,首先把数据筛选出来,为了快速的执行作业,我们用2台服务器,想让每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。

作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直结果就是服务器A遍历ID以0-4结尾的数据,服务器B遍历ID以5-9结尾的数据。

如何使用Elastic-Job

Elastic-Job 提供了3种作业类型

  1. Simple类型作业
  2. DataFlow类型作业
  3. Script类型作业

Elastic-Job 提供了2种配置方式

  1. JavaCode配置
  2. Spring命名空间配置

这里我只用代码讲解 用JavaCode配置方式实现Simple作业类型。

首先提供一下官网详细的配置手册:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/

准备工作

本机安装一个 Zookepper ,不搞集群,下载,安装 ,启动 so easy,我的版本是 3.4.14

引入Maven依赖

代码语言:javascript
复制
代码语言:javascript
复制
<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <relativePath/>
    </parent>
    <name>springboot-elastic-job</name>
    <description>基于 Spring Boot 2.1.5 使用elastic-job</description>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
代码语言:javascript
复制

Simple作业

代码语言:javascript
复制
public class JavaSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date())
                + " 分片项 : " + shardingContext.getShardingItem()
                + " 总片数 : " + shardingContext.getShardingTotalCount());

        switch (shardingContext.getShardingItem()) {
            case 0:
                System.out.println("do something by sharding item 0");
                break;
            case 1:
                System.out.println("do something by sharding item 1");
                break;
            case 2:
                System.out.println("do something by sharding item 2");
                break;
            // case n: ...
            //动态查询该分片下要执行的用户
            //SELECT * FROM lfp_user WHERE mod(id,#{shardingTotalCount})=#{shardingItem};
        }

    }
}

启动类和配置作业

代码语言:javascript
复制
public class Application {

    public static void main(String[] args) throws UnknownHostException {
        System.out.println("Start...");
        System.out.println(InetAddress.getLocalHost());
        new JobScheduler(createRegistryCenter(), createSimpleJobConfiguration()).init();

    }


    private static CoordinatorRegistryCenter createRegistryCenter() {
        //ZookeeperConfiguration构造方法两个参数,serverLists(连接Zookeeper服务器的列表,包括IP地址和端口号,,多个地址用逗号分隔)和namespace(命名空间)
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
                new ZookeeperConfiguration("127.0.0.1:2181", "new-elastic-job-demo"));
        regCenter.init();
        return regCenter;
    }

    private static LiteJobConfiguration createSimpleJobConfiguration() {
        //创建简单作业配置构建器,三个参数为:jobName(作业名称),cron(作业启动时间的cron表达式),shardingTotalCount(作业分片总数)
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("SimpleJobDemo", "0/15 * * * * ?", 2).build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, JavaSimpleJob.class.getCanonicalName());
        // 定义Lite作业根配置
        JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();

        return (LiteJobConfiguration) simpleJobRootConfig;

    }

    //配置DataflowJob
    private static void setUpDataflowJob(final CoordinatorRegistryCenter registryCenter) {
        JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder("JavaDataflowJob", "0/10 * * * * ?", 2).build();
        //数据流作业配置,第三个参数为streamingProcess(是否为流式处理)
        DataflowJobConfiguration dataflowJobConfiguration = new DataflowJobConfiguration(coreConfiguration, JavaDataflowJob.class.getCanonicalName(), true);
        new JobScheduler(registryCenter, LiteJobConfiguration.newBuilder(dataflowJobConfiguration).overwrite(true).build()).init();


    }
}

启动测试

我截取了一段控制台日志

github 地址:https://github.com/362460453/springboot-elastic-job

现在搞什么配置如果非必须情况下最好不要用xml形式,好古老,真的,如果有想学Spring命名空间配置 请看下面的链接:

https://www.cnblogs.com/yushangzuiyue/p/9655847.html

现实场景怎么用?

我们假设一个场景是处理一批数据,这批数据,一台机器的话分多个片执行相当于多线程执行,实际情况我们都是多台服务器执行的,使用2台机器,那么就把压力平均分摊到两台服务器上去了,而且也能更快执行完成。 如果,以前只要两台机器在1个小时就能跑完的,现在5个小时也跑不完,怎么办呢?加机器?加机器肯定是必须的,但是我们发现代码里写死了,分2片,我们总不能去改成3、4、5,万一以后还有更多呢,所以我们可以对sql进一步优化。我们把分片数和当前分片项传到sql,这样sql可以动态去查询对应分片后的用户了

代码语言:javascript
复制
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        int shardingTotalCount = shardingContext.getShardingTotalCount();
        int shardingItem = shardingContext.getShardingItem();
        //1 查询改分片下要执行的用户,带参数shardingTotalCount和shardingItem 
        //筛选出来的数据
        //2 处理筛选的数据
        
    }
}

sql改写
//动态查询该分片下要执行的用户
SELECT * FROM lfp_user WHERE mod(id,#{shardingTotalCount})=#{shardingItem};
代码语言:javascript
复制

当shardingTotalCount 有5片,并且5台机器 机器0:查询用户id能整除5的,如:5、10、15、20…… 机器1:查询用户id除以5余1的,如:1、6、11、16…… 机器2:查询用户id除以5余2的,如:2、7、12、17…… 机器3:查询用户id除以5于余3的,如:3、8、13、18…… 机器4:查询用户id除以5余4的,如:4、9、14、19……

注意事项

这里要说一个我碰到的问题,LiteJobConfiguration里有一个属性是 boolean overwrite; 默认为false,如果为false的话,第一次启动的时候,会在zookeeper中保存了一份作业信息(调度时间、参数等),后面即使修改了作业信息,无论重新启动服务或者zookeeper,还是会使用第一次启动时候的作业信息(根据作业名字)。 因此需要设为true,这样每次启动,作业信息都会覆盖zookeeper中的保存的配置信息,这样可以保证修改了配置信息可以马上使用。

代码启动中,实现类的execute方法中不能使用spring注入的对象:elastic-job是封装的quartz框架,这个特性也存留下来,execute方法中只能用static对象。

如有其他问题请查看:

https://blog.csdn.net/name_z/article/details/81274029

https://blog.csdn.net/tanga842428/article/details/52398982

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 正文
    • 如何使用Elastic-Job
      • 准备工作
        • 引入Maven依赖
          • Simple作业
            • 启动类和配置作业
              • 启动测试
                • 现实场景怎么用?
                  • 注意事项
                  相关产品与服务
                  消息队列 TDMQ
                  消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档