前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式调度Elastic-Job攻略

分布式调度Elastic-Job攻略

作者头像
算法之名
发布2019-08-20 16:06:45
8040
发布2019-08-20 16:06:45
举报
文章被收录于专栏:算法之名算法之名

昨天虽然试用了一下唯品会的“土星”,但是我实在没想明白他的Job该怎么用Spring来托管,所以没有使用。今天来说一下当当的Elastic-Job.

安装管理平台

先说一下Elastic-Job的管理平台跟Java的Job开发没有半毛钱关系,他只是把注册进Zookeeper的信息读取出来,进行控制。

先下载Elastic-Job的源代码git clone https://github.com/elasticjob/elastic-job-lite.git,编译之后找到elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz上传到服务器,解压之后进入/bin目录。

先不要急着执行start.sh,因为这个文件里面有Windows字符,linux无法识别。

所以先执行 sed -i 's/\r$//' start.sh 改掉Windows字符。

然后执行nohup ./start.sh &

在浏览器打开服务器的8899端口

先配置好zookeeper的注册中心地址,命名空间,基本上这个就装好了。

编写Java Job

pom

<dependency>
   <groupId>com.github.kuhn-he</groupId>
   <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
   <version>2.1.5</version>
   <exclusions>
      <exclusion>
         <artifactId>curator-client</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
      <exclusion>
         <artifactId>curator-framework</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
      <exclusion>
         <artifactId>curator-recipes</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
   </exclusions>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-framework</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-client</artifactId>
   <version>2.12.0</version>
</dependency>

资源文件添加

elaticjob:
  zookeeper:
    server-lists: 192.168.5.129:2188
    namespace: elastic-job-lite-springboot

这里要跟你管理平台的保持一致

配置文件

@Configuration
@ConditionalOnExpression("'${elatic.zookeeper.server-lists}'.length() >0")
public class ElasticConfig {
    /**
     * 初始化配置
     * @param serverList
     * @param namespace
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${elaticjob.zookeeper.server-lists}") String serverList
            , @Value("${elaticjob.zookeeper.namespace}") String namespace) {

        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

    /**
     * 设置活动监听,前提是已经设置好了监听,见下一个目录
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        return new ElasticJobListener(100, 100);
    }
}

监听器

@Component
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
    /**
     * 设置间隔时间
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    /**
     * 任务开始
     * @param shardingContexts
     */
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("任务开始");
    }

    /**
     * 任务结束
     * @param shardingContexts
     */
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.err.println("任务结束");
    }

}

Job

@Component
@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="jobTask",shardingTotalCount=2,jobParameter="测试参数",shardingItemParameters="0=Chengdu0,1=Chengdu1")
public class StockSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        switch (shardingContext.getShardingItem()) {
            case 0:
                System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, " +
                                "当前分片项: %s.当前参数: %s," +
                                "当前任务名称: %s.当前任务参数: %s"
                        ,
                        Thread.currentThread().getId(),
                        shardingContext.getShardingTotalCount(),
                        shardingContext.getShardingItem(),
                        shardingContext.getShardingParameter(),
                        shardingContext.getJobName(),
                        shardingContext.getJobParameter()

                        )
                );
                break;
            case 1:
                System.out.println("啦啦啦");
                break;
            default:
                break;
        }
    }
}

启动项目可以看到

2018-11-07 19:29:08.900 INFO [schedule-center,,,] 21244 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8011 (http) with context path '/api-sc' 2018-11-07 19:29:08.901 INFO [schedule-center,,,] 21244 --- [ main] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 8011 2018-11-07 19:29:08.903 INFO [schedule-center,,,] 21244 --- [ main] c.c.schedule.ScheduleCenterApplication : Started ScheduleCenterApplication in 16.618 seconds (JVM running for 17.331) 啦啦啦 ------Thread ID: 96, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: ------Thread ID: 100, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 啦啦啦 ------Thread ID: 102, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 啦啦啦

查看平台的作业

我们可以在这里面修改他的配置

经启动多个实例,我们可以看到分片会被多个实例均摊,相同的分片只会在一个进程内执行,多个Job也是一样,不会重复执行。退出一个进程,单个进程就会执行全部分片,实现了高可用。

动态添加Job

//@Component
//@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="testTask",shardingTotalCount=2,jobParameter="测试参数",shardingItemParameters="0=A,1=B")
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("TestJob");
    }
}

我们只写了一个Job,不进行配置

我们可以在Restful中进行动态添加

@RestController
public class TestController {

    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;


    /**
     * 动态添加任务逻辑
     */
    @GetMapping("/test")
    public void test(@RequestParam("cron") String cron) {
        int shardingTotalCount = 2;
        String jobName = UUID.randomUUID().toString() + "-test123";

        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder(jobName, cron, shardingTotalCount)
                .shardingItemParameters("0=A,1=B")
                .build();

        SimpleJobConfiguration simpleJobConfiguration =
                new SimpleJobConfiguration(jobCoreConfiguration, TestJob.class.getCanonicalName());
        JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());



        try {
            jobScheduler.init();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("定时任务创建失败");
        }
    }
}

通过@RequestParam("cron") String cron参数,我们可以动态给该Job添加可变的时间配置。

在zookeeper中我们可以看到他的注册信息

WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2188(CONNECTED) 0] ls / [zookeeper, elastic-job-lite-springboot, spark, hadoop-ha, dubbo, zoo, pushservers, guanjian] [zk: localhost:2188(CONNECTED) 1] ls /elastic-job-lite-springboot [com.guanjian.job.TestJob, e5d89fde-a665-46e2-8cac-7560a48812c9-test123, com.cloud.schedule.jobs.MyJob, com.cloud.schedule.jobs.StockSimpleJob, com.guanjian.job.StockSimpleJob] [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.

com.guanjian.job.TestJob com.cloud.schedule.jobs.MyJob com.cloud.schedule.jobs.StockSimpleJob com.guanjian.job.StockSimpleJob [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.

com.cloud.schedule.jobs.MyJob com.cloud.schedule.jobs.StockSimpleJob [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.StockSimpleJob [leader, servers, config, instances, sharding] [zk: localhost:2188(CONNECTED) 3]

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档