作者:付政委
选摘 · 苏辙 | 浮云有意藏山顶,流水无声入稻田
公众号:bugstack虫洞栈 | 沉淀、分享、成长,让自己和他人都能有所收获! 分布式任务DcsSchedule中间件,Github地址: https://github.com/fuzhengwei/schedule-spring-boot-starter 分布式任务DcsSchedule控制台,Github地址: https://github.com/fuzhengwei/itstack-middleware-control 欢迎⭐Star和使用,你用剑?、我用刀?,好的代码都很骚?,望你不吝出招?
微信公众号:bugstack虫洞栈 | 演示分布式中间件使用视频
1@SpringBootApplication
2@EnableScheduling
3public class Application{
4 public static void mian(String[] args){
5 SpringApplication.run(Application.class,args);
6 }
7
8 @Scheduled(cron = "0/3 * * * * *")
9 public void demoTask() {
10 //...
11 }
12}
咔咔,上面这段代码很熟悉吧,他就是SpringBoot的Schedule定时任务,简单易用。在我们开发中如果需要做一些定时或指定时刻循环执行逻辑时候,基本都会使用到Schedule。
但是,如果我们的任务是比较大型的,比如;定时跑批T+1结算、商品秒杀前状态变更、刷新数据预热到缓存等等,这些定时任务都相同的特点;作业量大、实时性强、可用率高。而这时候如果只是单纯使用Schedule就显得不足以控制。
那么,我们产品需求就出来了,分布式DcsSchedule任务;
嗯?有人憋半天了想说可以用Quertz,嗯可以的,但这不是本篇文章的重点。难道你不想看看一个自言开源中间件是怎么诞生的吗,怎么推到中心Maven仓的吗?比如下图;
真香不!
首页监控
微信公众号:bugstack虫洞栈 & 首页监控
任务列表
微信公众号:bugstack虫洞栈 & 任务列表
好了,接下来开始介绍这个中间件如何使用和怎么开发的了!
版本 | 发布日期 | 备注 | |
---|---|---|---|
1 | 1.0.0-RELEASE | 2019-12-07 | 基本功能实现;任务接入、分布式启停 |
2 | 1.0.1-RELEASE | 2019-12-07 | 上传测试版本 |
1<dependency>
2 <groupId>org.itstack.middleware</groupId>
3 <artifactId>schedule-spring-boot-starter</artifactId>
4 <version>1.0.0-RELEASE</version>
5</dependency>
1@SpringBootApplication
2@EnableDcsScheduling
3public class HelloWorldApplication {
4
5 public static void main(String[] args) {
6 SpringApplication.run(HelloWorldApplication.class, args);
7 }
8
9}
1@Component("demoTaskThree")
2public class DemoTaskThree {
3
4 @DcsScheduled(cron = "0 0 9,13 * * *", desc = "03定时任务执行测试:taskMethod01", autoStartup = false)
5 public void taskMethod01() {
6 System.out.println("03定时任务执行测试:taskMethod01");
7 }
8
9 @DcsScheduled(cron = "0 0/30 8-10 * * *", desc = "03定时任务执行测试:taskMethod02", autoStartup = false)
10 public void taskMethod02() {
11 System.out.println("03定时任务执行测试:taskMethod02");
12 }
13
14}
微信公众号:bugstack虫洞栈 & 任务列表
以SpringBoot为基础开发一款中间件我也是第一次,因为接触SpringBoot也刚刚1个月左右。虽然SpringBoot已经出来挺久的了,但由于我们项目开发并不使用SpringBoot的一套东西,所以一直依赖没有接触。直到上个月开始考虑领域驱动设计才接触,嗯!真的不错,那么就开始了夯实技能、学习思想用到项目里。
按照我的产品需求,开发这么一款分布式任务的中间件,我脑袋中的模型已经存在了。另外就是需要开发过程中去探索我需要的知识工具,简单包括;
1schedule-spring-boot-starter
2└── src
3 ├── main
4 │ ├── java
5 │ │ └── org.itstack.middleware.schedule
6 │ │ ├── annotation
7 │ │ │ ├── DcsScheduled.java
8 │ │ │ └── EnableDcsScheduling.java
9 │ │ ├── annotation
10 │ │ │ └── InstructStatus.java
11 │ │ ├── config
12 │ │ │ ├── DcsSchedulingConfiguration.java
13 │ │ │ ├── StarterAutoConfig.java
14 │ │ │ └── StarterServiceProperties.java
15 │ │ ├── domain
16 │ │ │ ├── DataCollect.java
17 │ │ │ ├── DcsScheduleInfo.java
18 │ │ │ ├── DcsServerNode.java
19 │ │ │ ├── ExecOrder.java
20 │ │ │ └── Instruct.java
21 │ │ ├── export
22 │ │ │ └── DcsScheduleResource.java
23 │ │ ├── service
24 │ │ │ ├── HeartbeatService.java
25 │ │ │ └── ZkCuratorServer.java
26 │ │ ├── task
27 │ │ │ ├── TaskScheduler.java
28 │ │ │ ├── ScheduledTask.java
29 │ │ │ ├── SchedulingConfig.java
30 │ │ │ └── SchedulingRunnable.java
31 │ │ ├── util
32 │ │ │ └── StrUtil.java
33 │ │ └── DoJoinPoint.java
34 │ └── resources
35 │ └── META_INF
36 │ └── spring.factories
37 └── test
38 └── java
39 └── org.itstack.demo.test
40 └── ApiTest.java
annotation/EnableDcsScheduling.java & 自定义注解
这个注解一堆的圈A,这些配置都是为了开始启动执行我们的中间件;
1@Target({ElementType.TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Import({DcsSchedulingConfiguration.class})
4@ImportAutoConfiguration({SchedulingConfig.class, CronTaskRegister.class, DoJoinPoint.class})
5@ComponentScan("org.itstack.middleware.*")
6public @interface EnableDcsScheduling {
7}
config/DcsSchedulingConfiguration.java & 初始化配置/服务、启动任务、挂在节点
1@Override
2public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
3 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
4 if (this.nonAnnotatedClasses.contains(targetClass)) return bean;
5 Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
6 if (methods == null) return bean;
7 for (Method method : methods) {
8 DcsScheduled dcsScheduled = AnnotationUtils.findAnnotation(method, DcsScheduled.class);
9 if (null == dcsScheduled || 0 == method.getDeclaredAnnotations().length) continue;
10 List<ExecOrder> execOrderList = Constants.execOrderMap.computeIfAbsent(beanName, k -> new ArrayList<>());
11 ExecOrder execOrder = new ExecOrder();
12 execOrder.setBean(bean);
13 execOrder.setBeanName(beanName);
14 execOrder.setMethodName(method.getName());
15 execOrder.setDesc(dcsScheduled.desc());
16 execOrder.setCron(dcsScheduled.cron());
17 execOrder.setAutoStartup(dcsScheduled.autoStartup());
18 execOrderList.add(execOrder);
19 this.nonAnnotatedClasses.add(targetClass);
20 }
21 return bean;
22}
1private void init_server(ApplicationContext applicationContext) {
2 try {
3 //获取zk连接
4 CuratorFramework client = ZkCuratorServer.getClient(Constants.Global.zkAddress);
5 //节点组装
6 path_root_server = StrUtil.joinStr(path_root, LINE, "server", LINE, schedulerServerId);
7 path_root_server_ip = StrUtil.joinStr(path_root_server, LINE, "ip", LINE, Constants.Global.ip);
8 //创建节点&递归删除本服务IP下的旧内容
9 ZkCuratorServer.deletingChildrenIfNeeded(client, path_root_server_ip);
10 ZkCuratorServer.createNode(client, path_root_server_ip);
11 ZkCuratorServer.setData(client, path_root_server, schedulerServerName);
12 //添加节点&监听
13 ZkCuratorServer.createNodeSimple(client, Constants.Global.path_root_exec);
14 ZkCuratorServer.addTreeCacheListener(applicationContext, client, Constants.Global.path_root_exec);
15 } catch (Exception e) {
16 logger.error("itstack middleware schedule init server error!", e);
17 throw new RuntimeException(e);
18 }
19}
1private void init_task(ApplicationContext applicationContext) {
2 CronTaskRegister cronTaskRegistrar = applicationContext.getBean("itstack-middlware-schedule-cronTaskRegister", CronTaskRegister.class);
3 Set<String> beanNames = Constants.execOrderMap.keySet();
4 for (String beanName : beanNames) {
5 List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName);
6 for (ExecOrder execOrder : execOrderList) {
7 if (!execOrder.getAutoStartup()) continue;
8 SchedulingRunnable task = new SchedulingRunnable(execOrder.getBean(), execOrder.getBeanName(), execOrder.getMethodName());
9 cronTaskRegistrar.addCronTask(task, execOrder.getCron());
10 }
11 }
12}
1private void init_node() throws Exception {
2 Set<String> beanNames = Constants.execOrderMap.keySet();
3 for (String beanName : beanNames) {
4 List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName);
5 for (ExecOrder execOrder : execOrderList) {
6 String path_root_server_ip_clazz = StrUtil.joinStr(path_root_server_ip, LINE, "clazz", LINE, execOrder.getBeanName());
7 String path_root_server_ip_clazz_method = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName());
8 String path_root_server_ip_clazz_method_status = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName(), "/status");
9 //添加节点
10 ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz);
11 ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method);
12 ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method_status);
13 //添加节点数据[临时]
14 ZkCuratorServer.appendPersistentData(client, path_root_server_ip_clazz_method + "/value", JSON.toJSONString(execOrder));
15 //添加节点数据[永久]
16 ZkCuratorServer.setData(client, path_root_server_ip_clazz_method_status, execOrder.getAutoStartup() ? "1" : "0");
17 }
18 }
19}
service/ZkCuratorServer.java & zk服务
1public static void addTreeCacheListener(final ApplicationContext applicationContext, final CuratorFramework client, String path) throws Exception {
2 TreeCache treeCache = new TreeCache(client, path);
3 treeCache.start();
4 treeCache.getListenable().addListener((curatorFramework, event) -> {
5 //...
6 switch (event.getType()) {
7 case NODE_ADDED:
8 case NODE_UPDATED:
9 if (Constants.Global.ip.equals(instruct.getIp()) && Constants.Global.schedulerServerId.equals(instruct.getSchedulerServerId())) {
10 //执行命令
11 Integer status = instruct.getStatus();
12 switch (status) {
13 case 0: //停止任务
14 cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
15 setData(client, path_root_server_ip_clazz_method_status, "0");
16 logger.info("itstack middleware schedule task stop {} {}", instruct.getBeanName(), instruct.getMethodName());
17 break;
18 case 1: //启动任务
19 cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
20 setData(client, path_root_server_ip_clazz_method_status, "1");
21 logger.info("itstack middleware schedule task start {} {}", instruct.getBeanName(), instruct.getMethodName());
22 break;
23 case 2: //刷新任务
24 cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
25 cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
26 setData(client, path_root_server_ip_clazz_method_status, "1");
27 logger.info("itstack middleware schedule task refresh {} {}", instruct.getBeanName(), instruct.getMethodName());
28 break;
29 }
30 }
31 break;
32 case NODE_REMOVED:
33 break;
34 default:
35 break;
36 }
37 });
38}
1public void addCronTask(SchedulingRunnable task, String cronExpression) {
2 if (null != Constants.scheduledTasks.get(task.taskId())) {
3 removeCronTask(task.taskId());
4 }
5 CronTask cronTask = new CronTask(task, cronExpression);
6 Constants.scheduledTasks.put(task.taskId(), scheduleCronTask(cronTask));
7}
8public void removeCronTask(String taskId) {
9 ScheduledTask scheduledTask = Constants.scheduledTasks.remove(taskId);
10 if (scheduledTask == null) return;
11 scheduledTask.cancel();
12}
1@Pointcut("@annotation(org.itstack.middleware.schedule.annotation.DcsScheduled)")
2public void aopPoint() {
3}
4
5@Around("aopPoint()")
6public Object doRouter(ProceedingJoinPoint jp) throws Throwable {
7 long begin = System.currentTimeMillis();
8 Method method = getMethod(jp);
9 try {
10 return jp.proceed();
11 } finally {
12 long end = System.currentTimeMillis();
13 logger.info("\nitstack middleware schedule method:{}.{} take time(m):{}", jp.getTarget().getClass().getSimpleName(), method.getName(), (end - begin));
14 }
15}
开发完成后还是需要将Jar包发布到manven中心仓库的,这个过程较长单独写了博客;《发布Jar包到Maven中央仓库(为开发开源中间件做准备)》