首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

整合Elastic-Job(支持动态任务)

最近公司的项目需要用到分布式任务调度,在结合多款开源框架后决定使用当当的Elastic-job。不知道大家有没有这样的需求,就是动态任务。之前比较了xxl-job和elastic-job发现,都只是支持注解或者配置以及后台添加现有的任务,不支持动态添加。比如:类似订单半小时后自动取消的场景。

xxl-job理论上来说是可以支持的,但是需要高度整合admin端的程序,然后开放对应的接口才可以给其他服务调用,这样本质直接改源码对后期的升级十分不便,最后放弃了xxl-job。elastic-job在移交Apache后的版本规划中,有提到API的开放,但是目前还没有稳定版,所以只能使用之前的2.1.5的版本来做。在Github搜了很多整合方案,最后决定选择下面的来实现。


<dependency>
    <groupId>com.github.xjzrc.spring.boot</groupId>
    <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
    <version>${lasted.release.version}</version>
</dependency>

因为要做的是动态的,所以这里没有直接使用maven坐标引入,直接将源码全部接入项目来使用,这样比较灵活,因为底层本质上还是用elastic-job的东西。下面引入elastic-job坐标


 <dependency>
 		<groupId>com.dangdang</groupId>
 		<artifactId>elastic-job-lite-spring</artifactId>
 		<version>${elastic-job.version}</version>
 </dependency>
 <dependency>
 		<groupId>com.dangdang</groupId>
 		<artifactId>elastic-job-lite-lifecycle</artifactId>
 		<version>${elastic-job.version}</version>
 <exclusions>
    <exclusion>
       <groupId>org.eclipse.jetty.aggregate</groupId>
       <artifactId>jetty-all-server</artifactId>
    </exclusion>
    <exclusion>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-framework</artifactId>
    </exclusion>
    <exclusion>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-recipes</artifactId>
    </exclusion>
 </exclusions>
 </dependency>

在整合完上面的源码后,就可以直接支持配置式的定时任务了,只需要修改服务的config即可生效,但是要做到动态式的添加和删除就必须在实现一个动态的实现。

首先在ElasticJobAutoConfiguration新增一个Bean


/**
     * 动态任务初始化
     * @return
     */
    @Bean(initMethod = "init")
    @ConditionalOnMissingBean
    public DynamicJobInitialization dynamicJobInitialization() {
        return new DynamicJobInitialization(this.regCenter());
    }

然后实现动态的类


/**
 * 动态任务初始化(支持简单、流式任务)
 * @author Zzq
 * @date 2020/9/14 19:22
 */
@Slf4j
public class DynamicJobInitialization extends AbstractJobInitialization {

    private JobStatisticsAPI jobStatisticsAPI;
    private JobSettingsAPI jobSettingsAPI;

    public DynamicJobInitialization(ZookeeperRegistryCenter zookeeperRegistryCenter) {
        this.jobStatisticsAPI = new JobStatisticsAPIImpl(zookeeperRegistryCenter);
        this.jobSettingsAPI = new JobSettingsAPIImpl(zookeeperRegistryCenter);
    }

    public void init() {
        Collection<JobBriefInfo> allJob = jobStatisticsAPI.getAllJobsBriefInfo();
        if (CollUtil.isNotEmpty(allJob)) {
            allJob.forEach(jobInfo -> {
                // 已下线的任务
                if (JobBriefInfo.JobStatus.CRASHED.equals(jobInfo.getStatus())) {
                    try {
                        Date currentDate = new Date();
                        CronExpression cronExpression = new CronExpression(jobInfo.getCron());
                        Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(currentDate);
                        // 表达式还生效的任务
                        if (ObjectUtil.isNotNull(nextValidTimeAfter)) {
                            this.initJobHandler(jobInfo.getJobName());
                        }
                    } catch (ParseException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }

    /**
     * 初始化任务操作
     * @param jobName  任务名
     */
    private void initJobHandler(String jobName) {
        try {
            JobSettings jobSetting = jobSettingsAPI.getJobSettings(jobName);
            if (ObjectUtil.isNotNull(jobSetting)) {
                String jobCode = StrUtil.subBefore(jobSetting.getJobName(), StrUtil.UNDERLINE, false);
                JobClassEnum jobClassEnum = JobClassEnum.convert(jobCode);
                if (ObjectUtil.isNotNull(jobClassEnum)) {
                    ElasticJobProperties.JobConfiguration configuration = new ElasticJobProperties.JobConfiguration();
                    configuration.setCron(jobSetting.getCron());
                    configuration.setJobParameter(jobSetting.getJobParameter());
                    configuration.setShardingTotalCount(jobSetting.getShardingTotalCount());
                    configuration.setDescription(jobSetting.getDescription());
                    configuration.setShardingItemParameters(jobSetting.getShardingItemParameters());
                    configuration.setJobClass(jobClassEnum.getClazz().getCanonicalName());
                    super.initJob(jobName, JobType.valueOf(jobSetting.getJobType()), configuration);
                }
            }
        } catch (Exception e) {
            log.error("初始化任务操作失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 保存/更新任务
     * @param job
     * @param jobClass
     */
    public void addOrUpdateJob(Job job, Class<? extends ElasticJob> jobClass) {
        ElasticJobProperties.JobConfiguration configuration = new ElasticJobProperties.JobConfiguration();
        configuration.setCron(job.getCron());
        configuration.setJobParameter(job.getJobParameter());
        configuration.setShardingTotalCount(job.getShardingTotalCount());
        configuration.setShardingItemParameters(job.getShardingItemParameters());
        configuration.setJobClass(jobClass.getCanonicalName());
        super.initJob(job.getJobName(), JobType.valueOf(job.getJobType()), configuration);
    }

    @Override
    public JobTypeConfiguration getJobTypeConfiguration(String jobName, JobType jobType, JobCoreConfiguration jobCoreConfiguration) {
        String jobCode = StrUtil.subBefore(jobName, StrUtil.UNDERLINE, false);
        JobClassEnum jobClassEnum = JobClassEnum.convert(jobCode);
        if (ObjectUtil.isNotNull(jobClassEnum)) {
            if (JobType.SIMPLE.equals(jobType)) {
                return new SimpleJobConfiguration(jobCoreConfiguration, jobClassEnum.getClazz().getCanonicalName());
            } else if (JobType.DATAFLOW.equals(jobType)) {
                return new DataflowJobConfiguration(jobCoreConfiguration, jobClassEnum.getClazz().getCanonicalName(), false);
            }
        }
        return null;
    }
}

为什么是这样的实现?我发现每次重新发布服务后,现在的未执行的任务都会变成“已下线”,这可能跟Zookeeper有关,需要重新初始化才行,对于注解和配置式的,会自动初始化,但是动态添加的不会自动初始化。所以必须自己初始化,之前有个思路是自己建张表来维护定时,每次启动时进行初始化,但是这样太麻烦,后来实现使用elastic-job现有的API来实现,即启动时,遍历Zookeeper已有的节点,然后判断Cron表达式是否过期,如果还没有过期,则重新初始化任务,初始化时配置设置了会覆盖原来的配置,所以不会有影响。然后外层可以通过MQ来新增任务,在通过服务调用去指定对应的定时逻辑即可。

(不知道大家有没有更好的实现方案,可以初始化动态任务的)

而配置式的,可以直接在配置文件指定并实现即可


spring:
  elasticjob:
    #注册中心配置
    zookeeper:
      server-lists: 127.0.0.1:6181
      namespace: elastic-job-spring-boot-stater-demo
    #简单作业配置
    simples:
      #spring简单作业示例配置
      spring-simple-job:
        #配置简单作业,必须实现com.dangdang.ddframe.job.api.simple.SimpleJob
        job-class: com.zen.spring.boot.demo.elasticjob.job.SpringSimpleJob
        cron: 0/2 * * * * ?
        sharding-total-count: 3
        sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
        #配置监听器
        listener:
          #配置每台作业节点均执行的监听,必须实现com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener
          listener-class: com.zen.spring.boot.demo.elasticjob.listener.MyElasticJobListener
    #流式作业配置
    dataflows:
      #spring简单作业示例配置
      spring-dataflow-job:
        #配置简单作业,必须实现com.dangdang.ddframe.job.api.dataflow.DataflowJob<T>
        job-class: com.zen.spring.boot.demo.elasticjob.job.SpringDataflowJob
        cron: 0/2 * * * * ?
        sharding-total-count: 3
        sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
        streaming-process: true
        #配置监听器
        listener:
          #配置分布式场景中仅单一节点执行的监听,必须实现com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener
          distributed-listener-class: com.zen.spring.boot.demo.elasticjob.listener.MyDistributeElasticJobListener
          started-timeout-milliseconds: 5000
          completed-timeout-milliseconds: 10000
    #脚本作业配置
    scripts:
      #脚本作业示例配置
      script-job:
        cron: 0/2 * * * * ?
        sharding-total-count: 3
        sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
        script-command-line: youPath/spring-boot-starter-demo/elastic-job-spring-boot-starter-demo/src/main/resources/script/demo.bat

以上整合基本可以满足现在的使用,比较期待移交Apache后的3的版本,这样可以有更多API的支持,而不用自己造轮子。

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/0b0cd01fe77d4da058bd7bc87
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券