前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring异步编程

Spring异步编程

作者头像
叔牙
修改2021-04-27 20:14:17
1.8K0
修改2021-04-27 20:14:17
举报

一、背景

在很多场景中,业务操作完成后会完成一些收尾操作,并不希望实时等待其实时返回结果,甚至不关心执行成功与否,比如:

  • 下单完成后给用户发送短信
  • 流程审批完成后发送邮件通知

或者一些查询操作需要调用多个二方或者三方服务组装返回结果,并且这些调用之前没有依赖关系,比如某电商平台退货详情需要展示订单信息、商品信息、用户详细信息等.

这些场景都可以考虑使用异步编程,所谓异步编程,就是不使用业务主线程,利用线程池或者其他套件开启新的线程完成后续操作,针对不关心执行结果的场景直接使用新线程完成后续业务,主线程直接返回调用,对于关心执行结果的场景,调用后返回多线程句柄,等多线程执行完成后由业务主线程统一组装结果并返回.

二、Spring异步编程介绍

spring3.1版本开始提供了开箱即用的异步编程套件,相关实现都放在spring-context模块,不需要引入其他额外的包,在配置类或者应用启动门面类上添加@EnableAsync即可开启异步化能力.

spring异步编程的实现依赖于Aop和动态代理,其具体实现此处不做赘述,简单描述一下spring异步编程用到的几个核心概念:

  • 切入点(Pointcut):用白话来说,spring要对哪些功能做增强处理,要么是表达式,要么是注解,他们所代表的位置就是切入点,就本篇而言就是做异步化的位置.
  • 通知(Advice):对于满足切入点的程序做个性化增强处理的动作,spring异步编程中就是用线程池处理@Async注解的方法.
  • 增强器(Advisor): 切入点和通知一起组成了增强器,也就是知道了在哪切入,也知道怎么切入,还需要一个角色去做这件事.
  • 动态代理: 基于被代理的类,在程序启动时生成代理对象并将增强逻辑添加进去,常用的有jdk动态代理和cglib动态代理.

基于前边几个概念,spring异步即是在应用启动时扫描@Async注解的类或者方法,生成代理类,然后将多线程处理能力嵌入进去.

三、异步编程接入

1.开启异步能力

在应用启动类添加@EnableAsync注解:

@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2.添加异步注解

在需要实现异步化的方法上添加@Async注解:

@Slf4j
@Service
public class TestBuzz {
    @Async
    public void testAsync() {
        log.info("TestBuzz.testAsync thread={}",Thread.currentThread().getName());
    }
}

3.模拟异步调用

@GetMapping("/test_async")
public IResp testAsync() {
    log.info("TestController.testAsync thread={}",Thread.currentThread().getName());
    //异步化调用
    this.testBuzz.testAsync();
    return IResp.getSuccessResult();
}

启动并模拟请求:

curl http://localhost:8088/test_async

就这么简单,我们通过两个注解就完成了异步编程.

四、原理&源码解析

从前两节的介绍中我们知道,spring利用Aop和动态代理在@Async标注的类生成代理并织入了多线程处理能力,那么接下来我们从源码层面分析一下其实现原理.

开启异步化能力时序图:

按照时序图从头到尾分析一下,并重点介绍其中涉及的几个类的实现.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

  Class<? extends Annotation> annotation() default Annotation.class;

  boolean proxyTargetClass() default false;

  AdviceMode mode() default AdviceMode.PROXY;

  int order() default Ordered.LOWEST_PRECEDENCE;
}

annotation表示使用异步的注解,默认是@Async和EJB 3.1的@javax.ejb.Asynchronou,当然用户也可以提供自定义注解.

proxyTargetClass表示是否基于需要代理的类创建子类,仅在模式设置为AdviceMode.PROXY时适用,默认是false,需要注意的是将此属性设置为true将影响所有需要代理的Spring托管bean,而不仅仅是标记有@Async的bean。例如,其他标有Spring的@Transactional批注的bean将同时升级为子类代理.

mode表示使用哪种通知模式,默认是AdviceMode.PROXY,需要注意代理模式仅允许通过代理拦截调用,同一类中的本地调用无法以这种方式被拦截;在本地调用中,此类方法上的Async注释将被忽略,因为Spring的拦截器甚至不会在这种运行时场景中起作用.如果需要拦截本地调用或者其他更高级的拦截诉求,可以考虑考虑将其切换为AdviceMode.ASPECTJ.

order代表AsyncAnnotationBeanPostProcessor的顺序,默认值是最低,以便生成代理的时候最贴近代理目标.

最重要的是该注解导入了AsyncConfigurationSelector类,毫无疑问AsyncConfigurationSelector是开启异步能力配置的入口.

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

  private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
      "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
  @Override
  @Nullable
  public String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
      case PROXY:
        return new String[] {ProxyAsyncConfiguration.class.getName()};
      case ASPECTJ:
        return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
      default:
        return null;
    }
  }
}

AsyncConfigurationSelector继承自AdviceModeImportSelector,根据代理模式选择不同的配置,默认我们使用AdviceMode.PROXY,直接看ProxyAsyncConfiguration实现.

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
  @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
    Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
    AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
    bpp.configure(this.executor, this.exceptionHandler);
    Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
    if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
      bpp.setAsyncAnnotationType(customAsyncAnnotation);
    }
    bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
    bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
    return bpp;
  }
}

ProxyAsyncConfiguration继承自AbstractAsyncConfiguration,其将@EnableAsync注解属性解析出来备用,并将异步化配置注入进来.

@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
  if (CollectionUtils.isEmpty(configurers)) {
    return;
  }
  if (configurers.size() > 1) {
    throw new IllegalStateException("Only one AsyncConfigurer may exist");
  }
  AsyncConfigurer configurer = configurers.iterator().next();
  this.executor = configurer::getAsyncExecutor;
  this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}

这里可以看出,用户可以实现AsyncConfigurer接口来使用自定义线程池和异常处理器,回到AbstractAsyncConfiguration,创建了一个AsyncAnnotationBeanPostProcessor类型的bean并注入容器,并且把角色定义成基础设施,不向外提供服务,看一下AsyncAnnotationBeanPostProcessor继承关系:

从继承关系来看,这个类有很多身份信息并且拥有很多能力,实现了BeanPostProcessor接口我们暂且将其定义成一个后置处理器,实现了AopInfrastructBean接口将不会被Aop处理,继承了ProxyProcessorSuppor又拥有了代理处理相关能力,实现了BeanFactoryAware拥有了bean管理能力,看一下其代码实现:

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
  public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
      AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
  @Nullable
  private Supplier<Executor> executor;
  @Nullable
  private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
  @Nullable
  private Class<? extends Annotation> asyncAnnotationType;
  public AsyncAnnotationBeanPostProcessor() {
    setBeforeExistingAdvisors(true);
  }
  public void configure(
      @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    this.executor = executor;
    this.exceptionHandler = exceptionHandler;
  }
  public void setExecutor(Executor executor) {
    this.executor = SingletonSupplier.of(executor);
  }
  public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
    this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
  }
  public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
    Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
    this.asyncAnnotationType = asyncAnnotationType;
  }
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
      advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
  }
}

spring管理的bean初始化过程执行顺序BeanFactoryAware是在后置处理器BeanPostProcessor之前,我们先分析setBeanFactory方法,该方法调用父类实现先把BeanFactory注入进来,然后创建了一个增强器AsyncAnnotationAdvisor(给后置处理器postProcessAfterInitialization方法备用),看一下继承关系:

接着看AsyncAnnotationAdvisor构造器:

public AsyncAnnotationAdvisor(
    @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

  Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
  asyncAnnotationTypes.add(Async.class);
  try {
    asyncAnnotationTypes.add((Class<? extends Annotation>)
        ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
  }
  catch (ClassNotFoundException ex) {
    // If EJB 3.1 API not present, simply ignore.
  }
  this.advice = buildAdvice(executor, exceptionHandler);
  this.pointcut = buildPointcut(asyncAnnotationTypes);
}

如同我们前边所说,增强器由advice和pointcut组成,这里分别构建了通知和切入点,先看构造通知:

protected Advice buildAdvice(
    @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
  AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
  interceptor.configure(executor, exceptionHandler);
  return interceptor;
}

构建通知用的是AnnotationAsyncExecutionInterceptor,看一下继承关系:

本质上是一个MethodInterceptor,执行拦截操作的时候调用invoke方法:

public Object invoke(final MethodInvocation invocation) throws Throwable {
  Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
  final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

  AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
  if (executor == null) {
    throw new IllegalStateException(
        "No executor specified and no default executor set on AsyncExecutionInterceptor either");
  }
  Callable<Object> task = () -> {
    try {
      Object result = invocation.proceed();
      if (result instanceof Future) {
        return ((Future<?>) result).get();
      }
    }
    catch (ExecutionException ex) {
      handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
    }
    catch (Throwable ex) {
      handleError(ex, userDeclaredMethod, invocation.getArguments());
    }
    return null;
  };
  return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

该方法先获取AsyncTaskExecutor异步任务执行器,简单理解为线程池,然后在线程池中执行异步逻辑,继续看determineAsyncExecutor获取线程池:

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
  Executor targetExecutor;
  String qualifier = getExecutorQualifier(method);
  if (StringUtils.hasLength(qualifier)) {
    targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
  }
  else {
    targetExecutor = this.defaultExecutor.get();
  }
  if (targetExecutor == null) {
    return null;
  }
  executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
      (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
  this.executors.put(method, executor);
}
return executor;
}

先从缓存中获取,如果获取到直接返回,否则如果@Async注解有指定线程池就根据名字获取,否则获取默认线程池.

接着看线程池提交异步操作doSubmit:

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
  if (CompletableFuture.class.isAssignableFrom(returnType)) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        return task.call();
      }
      catch (Throwable ex) {
        throw new CompletionException(ex);
      }
    }, executor);
  }
  else if (ListenableFuture.class.isAssignableFrom(returnType)) {
    return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
  }
  else if (Future.class.isAssignableFrom(returnType)) {
    return executor.submit(task);
  }
  else {
    executor.submit(task);
    return null;
  }
}

可以看出支持异步方法返回结果为CompletableFuture、ListenableFuture和Future的有返回值的操作,其他返回类型或者返回类型为void都当做无返回值异步提交.

回到前边构造切入点操作:

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
  ComposablePointcut result = null;
  for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
    Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
    Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
    if (result == null) {
      result = new ComposablePointcut(cpc);
    }
    else {
      result.union(cpc);
    }
    result = result.union(mpc);
  }
  return (result != null ? result : Pointcut.TRUE);
}

方法中构造了两个AnnotationMatchingPointcut,一个匹配方法切入点,另一个是匹配类切入点,然后做了union操作构造了一个ComposablePointcut混合切入点,只要满足类或者方法上带有@Async注解都符合切入规则,这个切入点在AsyncAnnotationBeanPostProcessor后置处理器构造代理类会用到.

前边分析了setBeanFactory构造增强器的操作,我们继续分析后置处理器的postProcessAfterInitialization操作,先看代码实现:

public Object postProcessAfterInitialization(Object bean, String beanName) {
  if (this.advisor == null || bean instanceof AopInfrastructureBean) {
    // Ignore AOP infrastructure such as scoped proxies.
    return bean;
  }
  if (bean instanceof Advised) {
    Advised advised = (Advised) bean;
    if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
      // Add our local Advisor to the existing proxy's Advisor chain...
      if (this.beforeExistingAdvisors) {
        advised.addAdvisor(0, this.advisor);
      }
      else {
        advised.addAdvisor(this.advisor);
      }
      return bean;
    }
  }
  if (isEligible(bean, beanName)) {
    ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
    if (!proxyFactory.isProxyTargetClass()) {
      evaluateProxyInterfaces(bean.getClass(), proxyFactory);
    }
    proxyFactory.addAdvisor(this.advisor);
    customizeProxyFactory(proxyFactory);
    return proxyFactory.getProxy(getProxyClassLoader());
  }
  // No proxy needed.
  return bean;
}

如果增强器为null或者目标bean是AopInfrastructureBean基础组件类型直接放过,如果bean是待通知对象切满足该Advisor的通知条件,直接将该增强器添加到待通知对象的增强器列表中,否则如果目标bean满足该增强器的切入条件,利用动态代理生成代理类并将该Advisor添加到其增强器列表返回.

这段代码是动态代理生成代理类并织入通知逻辑的核心点,我们主要分析isEligible和生成代理的逻辑,先分析是否满足切入逻辑的方法isEligible:

protected boolean isEligible(Class<?> targetClass) {
  Boolean eligible = this.eligibleBeans.get(targetClass);
  if (eligible != null) {
    return eligible;
  }
  if (this.advisor == null) {
    return false;
  }
  eligible = AopUtils.canApply(this.advisor, targetClass);
  this.eligibleBeans.put(targetClass, eligible);
  return eligible;
}

先从缓存中获取改bean是否有被增强的资格,如果已被缓存直接返回缓存结果,否则如果增强器为null,则返回无资格,最后调用AopUtils.canApply检查目标类是否满足Advisor切入的规则,继续看AopUtils.canApply实现:

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
  if (advisor instanceof IntroductionAdvisor) {
    return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
  }
  else if (advisor instanceof PointcutAdvisor) {
    PointcutAdvisor pca = (PointcutAdvisor) advisor;
    return canApply(pca.getPointcut(), targetClass, hasIntroductions);
  }
  else {
    // It doesn't have a pointcut so we assume it applies.
    return true;
  }
}

根据Advisor的类型检查目标类是否满足切入资格,和明显前边AsyncAnnotationBeanPostProcessor构造的是PointcutAdvisor类型的增强器,继续看canApply实现:

public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
  Assert.notNull(pc, "Pointcut must not be null");
  if (!pc.getClassFilter().matches(targetClass)) {
    return false;
  }
  MethodMatcher methodMatcher = pc.getMethodMatcher();
  if (methodMatcher == MethodMatcher.TRUE) {
    // No need to iterate the methods if we're matching any method anyway...
    return true;
  }
  IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
  if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
    introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
  }
  Set<Class<?>> classes = new LinkedHashSet<>();
  if (!Proxy.isProxyClass(targetClass)) {
    classes.add(ClassUtils.getUserClass(targetClass));
  }
  classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
  for (Class<?> clazz : classes) {
    Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
    for (Method method : methods) {
      if (introductionAwareMethodMatcher != null ?
          introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
          methodMatcher.matches(method, targetClass)) {
        return true;
      }
    }
  }
  return false;
}

其实简单来说,就是检查目标类上或者方法上是否有@Async注解,如果有就返回满足切入规则,否则返回不符合切入规则.

回到前边后置处理器postProcessAfterInitialization方法,如果目标bean满足切入规则,则使用代理工厂ProxyFactory生成代理对象并返回:

if (isEligible(bean, beanName)) {
  ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
  if (!proxyFactory.isProxyTargetClass()) {
    evaluateProxyInterfaces(bean.getClass(), proxyFactory);
  }
  proxyFactory.addAdvisor(this.advisor);
  customizeProxyFactory(proxyFactory);
  return proxyFactory.getProxy(getProxyClassLoader());
}

先生成代理工厂,然后检查给定bean类上的接口,并将它们应用于ProxyFactory(如果不适用,退化成直接代理目标类),将增强器添加到代理工厂中,最后由代理工厂生成代理对象,接着看生成代理类的实现:

public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
  if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
    Class<?> targetClass = config.getTargetClass();
    if (targetClass == null) {
      throw new AopConfigException("TargetSource cannot determine target class: " +
          "Either an interface or a target is required for proxy creation.");
    }
    if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
      return new JdkDynamicAopProxy(config);
    }
    return new ObjenesisCglibAopProxy(config);
  }
  else {
    return new JdkDynamicAopProxy(config);
  }
}

先创建Aop代理,如果目标类是接口或者目标类是代理类,使用jdk动态代理,否则使用cglib动态代理,两种代理区别这里不展开细讲,简单分析一下其构造代理类的原理,先看JdkDynamicAopProxy:

public Object getProxy(@Nullable ClassLoader classLoader) {
  if (logger.isTraceEnabled()) {
    logger.trace("Creating JDK dynamic proxy: " + this.advised.getTargetSource());
  }
  Class<?>[] proxiedInterfaces = AopProxyUtils.completeProxiedInterfaces(this.advised, true);
  findDefinedEqualsAndHashCodeMethods(proxiedInterfaces);
  return Proxy.newProxyInstance(classLoader, proxiedInterfaces, this);
}

到这里我们看到了熟悉的jdk动态代理实现Proxy.newProxyInstance,寻找需要代理的接口,然后生成接口的动态代理对象,这里需要注意一下,JdkDynamicAopProxy实现了InvocationHandler接口,JDK动态代理会在内存中生成一个类名为Proxy0形式的代理类,调用Proxy0方法,jvm内部调用类Proxy.InvocationHandler.invoke方法,也就是JdkDynamicAopProxy实现InvocationHandler接口的invoke方法:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  MethodInvocation invocation;
  Object oldProxy = null;
  boolean setProxyContext = false;
  TargetSource targetSource = this.advised.targetSource;
  Object target = null;
  try {
    if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
      return equals(args[0]);
    }
    else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
      return hashCode();
    }
    else if (method.getDeclaringClass() == DecoratingProxy.class) {
      return AopProxyUtils.ultimateTargetClass(this.advised);
    }
    else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
        method.getDeclaringClass().isAssignableFrom(Advised.class)) {
      return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
    }
    Object retVal;
    if (this.advised.exposeProxy) {
      oldProxy = AopContext.setCurrentProxy(proxy);
      setProxyContext = true;
    }
    target = targetSource.getTarget();
    Class<?> targetClass = (target != null ? target.getClass() : null);
    List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
    if (chain.isEmpty()) {
      Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
      retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
    }
    else {
      invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
      retVal = invocation.proceed();
    }
    Class<?> returnType = method.getReturnType();
    if (retVal != null && retVal == target &&
        returnType != Object.class && returnType.isInstance(proxy) &&
        !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
      retVal = proxy;
    }
    else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
      throw new AopInvocationException(
          "Null return value from advice does not match primitive return type for: " + method);
    }
    return retVal;
  }
  finally {
    if (target != null && !targetSource.isStatic()) {
      targetSource.releaseTarget(target);
    }
    if (setProxyContext) {
      AopContext.setCurrentProxy(oldProxy);
    }
  }
}

先取出被织入的拦截逻辑,本篇中就是AnnotationAsyncExecutionInterceptor,然后指定方法调用,也就是代理类的调用,本质上就是先调用增强逻辑和最原始被代理类的方法的调用.

然后我们再看一下cglib动态代理实现CglibAopProxy:

public Object getProxy(@Nullable ClassLoader classLoader) {
  try {
    Class<?> rootClass = this.advised.getTargetClass();
    Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy");
    Class<?> proxySuperClass = rootClass;
    if (ClassUtils.isCglibProxyClass(rootClass)) {
      proxySuperClass = rootClass.getSuperclass();
      Class<?>[] additionalInterfaces = rootClass.getInterfaces();
      for (Class<?> additionalInterface : additionalInterfaces) {
        this.advised.addInterface(additionalInterface);
      }
    }
    validateClassIfNecessary(proxySuperClass, classLoader);
    Enhancer enhancer = createEnhancer();
    if (classLoader != null) {
      enhancer.setClassLoader(classLoader);
      if (classLoader instanceof SmartClassLoader &&
          ((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) {
        enhancer.setUseCache(false);
      }
    }
    enhancer.setSuperclass(proxySuperClass);
    enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised));
    enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE);
    enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader));
    Callback[] callbacks = getCallbacks(rootClass);
    Class<?>[] types = new Class<?>[callbacks.length];
    for (int x = 0; x < types.length; x++) {
      types[x] = callbacks[x].getClass();
    }
    enhancer.setCallbackFilter(new ProxyCallbackFilter(
        this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset));
    enhancer.setCallbackTypes(types);
    return createProxyClassAndInstance(enhancer, callbacks);
  }
  catch (CodeGenerationException | IllegalArgumentException ex) {
    throw new AopConfigException("Could not generate CGLIB subclass of " + this.advised.getTargetClass() +
        ": Common causes of this problem include using a final class or a non-visible class",
        ex);
  }
  catch (Throwable ex) {
    // TargetSource.getTarget() failed
    throw new AopConfigException("Unexpected AOP exception", ex);
  }
}

我们也看到了熟悉的cglib动态代理实现Enhancer,CGLB动态代理会在内存生成一个类名为?EnhancerByCGLIB?b3361405形式的代理类,调用xxx?EnhancerByCGLIB?b3361405代理类方法,内部调用MethodInterceptor.intercept(),看一下getCallbacks方法,也即是将被代理类的拦截调用装配成MethodInterceptor的逻辑:

private Callback[] getCallbacks(Class<?> rootClass) throws Exception {
  boolean exposeProxy = this.advised.isExposeProxy();
  boolean isFrozen = this.advised.isFrozen();
  boolean isStatic = this.advised.getTargetSource().isStatic();
  Callback aopInterceptor = new DynamicAdvisedInterceptor(this.advised);
  Callback targetInterceptor;
  if (exposeProxy) {
    targetInterceptor = (isStatic ?
        new StaticUnadvisedExposedInterceptor(this.advised.getTargetSource().getTarget()) :
        new DynamicUnadvisedExposedInterceptor(this.advised.getTargetSource()));
  }
  else {
    targetInterceptor = (isStatic ?
        new StaticUnadvisedInterceptor(this.advised.getTargetSource().getTarget()) :
        new DynamicUnadvisedInterceptor(this.advised.getTargetSource()));
  }
  Callback targetDispatcher = (isStatic ?
      new StaticDispatcher(this.advised.getTargetSource().getTarget()) : new SerializableNoOp());
  Callback[] mainCallbacks = new Callback[] {
      aopInterceptor,  // for normal advice
      targetInterceptor,  // invoke target without considering advice, if optimized
      new SerializableNoOp(),  // no override for methods mapped to this
      targetDispatcher, this.advisedDispatcher,
      new EqualsInterceptor(this.advised),
      new HashCodeInterceptor(this.advised)
  };
  Callback[] callbacks;
  if (isStatic && isFrozen) {
    Method[] methods = rootClass.getMethods();
    Callback[] fixedCallbacks = new Callback[methods.length];
    this.fixedInterceptorMap = new HashMap<>(methods.length);

    // TODO: small memory optimization here (can skip creation for methods with no advice)
    for (int x = 0; x < methods.length; x++) {
      List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(methods[x], rootClass);
      fixedCallbacks[x] = new FixedChainStaticTargetInterceptor(
          chain, this.advised.getTargetSource().getTarget(), this.advised.getTargetClass());
      this.fixedInterceptorMap.put(methods[x].toString(), x);
    }
    callbacks = new Callback[mainCallbacks.length + fixedCallbacks.length];
    System.arraycopy(mainCallbacks, 0, callbacks, 0, mainCallbacks.length);
    System.arraycopy(fixedCallbacks, 0, callbacks, mainCallbacks.length, fixedCallbacks.length);
    this.fixedInterceptorOffset = mainCallbacks.length;
  }
  else {
    callbacks = mainCallbacks;
  }
  return callbacks;
}

在此篇幅异步编程场景,调用代理类方法会直接调用到DynamicAdvisedInterceptor的intercept:

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
  Object oldProxy = null;
  boolean setProxyContext = false;
  Object target = null;
  TargetSource targetSource = this.advised.getTargetSource();
  try {
    if (this.advised.exposeProxy) {
      // Make invocation available if necessary.
      oldProxy = AopContext.setCurrentProxy(proxy);
      setProxyContext = true;
    }
    target = targetSource.getTarget();
    Class<?> targetClass = (target != null ? target.getClass() : null);
    List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
    Object retVal;
    if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
      Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
      retVal = methodProxy.invoke(target, argsToUse);
    }
    else {
      // We need to create a method invocation...
      retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
    }
    retVal = processReturnType(proxy, target, method, retVal);
    return retVal;
  }
  finally {
    if (target != null && !targetSource.isStatic()) {
      targetSource.releaseTarget(target);
    }
    if (setProxyContext) {
      // Restore old proxy.
      AopContext.setCurrentProxy(oldProxy);
    }
  }
}

先获取代理类对应方法的拦截器链,如果没有拦截器链且方法是public类型,直接调用代理方法返回,否则将方法连同拦截器链构造成CglibMethodInvocation并执行.

在JdkDynamicAopProxy和CglibAopProxy生成的代理类执行的过程都会调用到前边所说的AnnotationAsyncExecutionInterceptor类的invoke方法,也即是异步执行的逻辑.

jdk动态代理异步执行时序图:

Cglib代理异步执行时序图:

五、总结

从本篇第三节异步编程使用方式来看,spring异步编程接入特别简单,但是从第四节的原理和源码解析来看,其实现也挺复杂的,这就是spring的强大之处,把困难留给自己,把便利留给使用者,把一些复杂的实现对用户做到透明化.

从spring异步编程的源码来看,其使用了很多技术和功能点:

  • 导入配置:AsyncConfigurationSelector
  • 后置处理器:AsyncAnnotationBeanPostProcessor
  • Aop编程:AsyncAnnotationAdvisor
  • 线程池:AsyncTaskExecutor
  • 拦截器: AnnotationAsyncExecutionInterceptor
  • 切入点: ComposablePointcut/AnnotationMatchingPointcut
  • 工厂模式: BeanFactory和ProxyFactory
  • 动态代理: JdkDynamicAopProxy和CglibAopProxy
  • 代理类调用委托处理: jdk动态代理委托给JdkDynamicAopProxy.invoke,cglib动态代理类委托给DynamicAdvisedInterceptor.intercept

由于篇幅问题,中间还有很多细节没覆盖到,比如说获取线程池的逻辑设计也比较巧妙,感兴趣的也可以深入研究一下:

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
  if (beanFactory != null) {
    try {
      return beanFactory.getBean(TaskExecutor.class);
    }
    catch (NoUniqueBeanDefinitionException ex) {
      logger.debug("Could not find unique TaskExecutor bean", ex);
      try {
        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
      }
      catch (NoSuchBeanDefinitionException ex2) {
      }
    }
    catch (NoSuchBeanDefinitionException ex) {
      logger.debug("Could not find default TaskExecutor bean", ex);
      try {
        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
      }
      catch (NoSuchBeanDefinitionException ex2) {
      }
    }
  }
  return null;
}

spring异步的使用主要记住两个点,2个注解和一个返回值,在启动类或者配置使用@EnableAsync开启异步,在需要异步调用的方法上添加@Async注解,异步支持的返回类型有CompletableFuture、ListenableFuture和Future和void.

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 PersistentCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、背景
  • 二、Spring异步编程介绍
  • 三、异步编程接入
    • 1.开启异步能力
      • 2.添加异步注解
        • 3.模拟异步调用
        • 四、原理&源码解析
        • 五、总结
        相关产品与服务
        云顾问
        云顾问(Tencent Cloud Smart Advisor)是一款提供可视化云架构IDE和多个ITOM领域垂直应用的云上治理平台,以“一个平台,多个应用”为产品理念,依托腾讯云海量运维专家经验,助您打造卓越架构,实现便捷、灵活的一站式云上治理。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档