前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈Spring中定时任务@Scheduled源码的解析(一)

浅谈Spring中定时任务@Scheduled源码的解析(一)

原创
作者头像
半月无霜
发布2024-07-24 14:41:41
940
发布2024-07-24 14:41:41
举报
文章被收录于专栏:半月无霜

浅谈Spring中定时任务@Scheduled源码的解析(一)

一、介绍

上一篇文章中,我们介绍了@scheduled注解的使用,添加上就可以使用定时任务了

本篇文章,简单解析一下它的源码,看看是如何工作的,怎么就能跑起定时任务

二、代码解析

1)@EnableScheduling

首当其冲的就是这个注解,在开启定时任务第一步就是这个

代码语言:javascript
复制
package org.springframework.scheduling.annotation;
​
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.Executor;
​
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
​
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
​
}

注释我没有贴出来,其中有一点很感兴趣,就是线程池。看看昨天的定时任务启动,它的线程名长这样,scheduling

但这边,它可以允许我们手动指定一个线程池,代码如下

代码语言:javascript
复制
package com.banmoon.config;
​
import cn.hutool.core.thread.ThreadUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
​
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
​
/**
 * @author banmoon
 * @date 2024/07/24 11:10:28
 */
@Configuration
public class ScheduingConfig implements SchedulingConfigurer {
​
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskExecutor());
    }
​
    @Bean(destroyMethod = "shutdown")
    public Executor taskExecutor() {
        return Executors.newScheduledThreadPool(100, ThreadUtil.newNamedThreadFactory("定时任务", true));
    }
​
}

运行查看结果,线程名字已改

请注意,这个注解上有个@Import(SchedulingConfiguration.class)

那么下一个类,便是SchedulingConfiguration.java

至于@Import 使用它,可以将指定的类进行导入,使得另一个配置类中定义的bean可以被当前配置类中的bean使用

2)SchedulingConfiguration

好的,先看源码

代码语言:java
复制
package org.springframework.scheduling.annotation;
​
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;
import org.springframework.scheduling.config.TaskManagementConfigUtils;
​
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
​
  @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
    return new ScheduledAnnotationBeanPostProcessor();
  }
​
}

这个类没什么好说 的,创建了ScheduledAnnotationBeanPostProcessor.javabean

3)ScheduledAnnotationBeanPostProcessor

这个类的源码就不全贴了,我们关注主要的几个方法

这个类实现了BeanPostProcessor.java这个接口,想必大家都知道这个是干什么的了吧

代码语言:java
复制
public class ScheduledAnnotationBeanPostProcessor
    implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
    Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
    SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
​
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
        bean instanceof ScheduledExecutorService) {
      // Ignore AOP infrastructure such as scoped proxies.
      return bean;
    }
​
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (!this.nonAnnotatedClasses.contains(targetClass) &&
        AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
      Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
          (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
            Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                method, Scheduled.class, Schedules.class);
            return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
          });
      if (annotatedMethods.isEmpty()) {
        this.nonAnnotatedClasses.add(targetClass);
        if (logger.isTraceEnabled()) {
          logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
        }
      }
      else {
        // Non-empty set of methods
        annotatedMethods.forEach((method, scheduledAnnotations) ->
            scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
        if (logger.isTraceEnabled()) {
          logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
              "': " + annotatedMethods);
        }
      }
    }
    return bean;
  }
​
}

这个方法,就是每一个bean在初始化完成之后,都会进行调用

检查bean方法上面,有无@Scheduled或者@Schedules

并生成一个列表集合annotatedMethods

如果为空,就结束了

要是不为空,就进入到后续的逻辑,看这一行

annotatedMethods.forEach((method, scheduledAnnotations) -> scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));

所以,我们就都看看这个processScheduled(scheduled, method, bean)方法里面做了什么

代码语言:java
复制
    protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
        try {
            Runnable runnable = createRunnable(bean, method);
            boolean processedSchedule = false;
            String errorMessage =
                    "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
​
            Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
​
            // Determine initial delay
            long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
            String initialDelayString = scheduled.initialDelayString();
            if (StringUtils.hasText(initialDelayString)) {
                Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                if (this.embeddedValueResolver != null) {
                    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                }
                if (StringUtils.hasLength(initialDelayString)) {
                    try {
                        initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                    }
                }
            }
​
            // Check cron expression
            String cron = scheduled.cron();
            if (StringUtils.hasText(cron)) {
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                if (StringUtils.hasLength(cron)) {
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    if (!Scheduled.CRON_DISABLED.equals(cron)) {
                        TimeZone timeZone;
                        if (StringUtils.hasText(zone)) {
                            timeZone = StringUtils.parseTimeZoneString(zone);
                        }
                        else {
                            timeZone = TimeZone.getDefault();
                        }
                        tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                    }
                }
            }
​
            // At this point we don't need to differentiate between initial delay set or not anymore
            if (initialDelay < 0) {
                initialDelay = 0;
            }
​
            // Check fixed delay
            long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
​
            String fixedDelayString = scheduled.fixedDelayString();
            if (StringUtils.hasText(fixedDelayString)) {
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                if (StringUtils.hasLength(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                }
            }
​
            // Check fixed rate
            long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
            String fixedRateString = scheduled.fixedRateString();
            if (StringUtils.hasText(fixedRateString)) {
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                if (StringUtils.hasLength(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                }
            }
​
            // Check whether we had any attribute set
            Assert.isTrue(processedSchedule, errorMessage);
​
            // Finally register the scheduled tasks
            synchronized (this.scheduledTasks) {
                Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
                regTasks.addAll(tasks);
            }
        }
        catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                    "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
        }
    }

首先是转换了下initialDelay

然后检查了cron表达式,fixedDelayfixedRate

无论使用上面哪种方式指定,都会创建一个ScheduledTask加入到tasks的列表中,如下

  • cron表达式:tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
  • fixedDelaytasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
  • fixedRatetasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));

当然啦,如果 tasks为空,则代表上面的三种属性你都没有指定,那么就会报错了

  • Assert.isTrue(processedSchedule, errorMessage);

最后,将收集到的tasks放入scheduledTasksMap容器中

三、待续

只讲了Spring是如何解析注解,生成任务的

还差如何对这些任务进行执行,下篇再见

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 浅谈Spring中定时任务@Scheduled源码的解析(一)
    • 一、介绍
      • 二、代码解析
        • 1)@EnableScheduling
        • 2)SchedulingConfiguration
        • 3)ScheduledAnnotationBeanPostProcessor
      • 三、待续
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档