前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >初探Spring Retry

初探Spring Retry

作者头像
程序猿杜小头
发布2022-12-01 21:37:42
9600
发布2022-12-01 21:37:42
举报
文章被收录于专栏:程序猿杜小头程序猿杜小头

初探Spring Retry

Running with Spring Boot v2.4.5, Spring Retry v1.3.1

在与外部系统交互时,由网络抖动亦或是外部系统自身的短暂性问题触发的瞬时性故障是一个绕不过的坑,而重试可能是一个比较有效的避坑方案;但有一点需要特别注意:外部系统的接口是否满足幂等性,比如:尽管调用外部系统的下单接口超时了,但外部系统订单数据可能已经落库了,这个时候再重试一次,外部系统内的订单数据可能就重复了!

Spring Retry为Spring应用提供了重试功能,同时支持声明式重试(Declarative Retry)和编程式重试(Programmatic Retry)两种风格;此外,其不仅对业务代码无侵入性,而且还支持重试配置;但Spring Retry的重试决策机制大多是基于Throwable的,尚不支持基于返回结果来进行重试决策。

1 核心API解读

1.1 RetryContext

RetryContext记录了已重试次数、上一次重试所触发的异常以及context.statecontext.namecontext.exhaustedcontext.closedcontext.recovered等属性。

代码语言:javascript
复制
public interface RetryContext extends AttributeAccessor {
    String NAME = "context.name";
    String STATE_KEY = "context.state";
    String CLOSED = "context.closed";
    String RECOVERED = "context.recovered";
    String EXHAUSTED = "context.exhausted";
    
    boolean isExhaustedOnly();
    RetryContext getParent();
    int getRetryCount();
    Throwable getLastThrowable();
}

1.2 RetryOperations

RetryOperations接口定义了四个execute()方法,无论是声明式重试还是编程式重试,都是通过execute()来进行重试操作的!

代码语言:javascript
复制
public interface RetryOperations {
    <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E;
    <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E;
    <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState) throws E, ExhaustedRetryException;
    <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState retryState) throws E;
}

1.3 Classifier

Classifier接口只有一个classify()方法,主要用于将给定类型对象C分类为另一类型的对象T

代码语言:javascript
复制
public interface Classifier<C, T> extends Serializable {
    T classify(C classifiable);
}

SubclassClassifier<C, T>内部维护了一个ConcurrentHashMap<Class<? extends C>, T>,用于存储待分类对象C分类后对象T的映射关系。其分类逻辑如下:

  1. 若待分类对象C为null,则直接返回默认值;
  2. 从HashMap中查找映射关系,若存在映射关系,则直接将HashMap中的value值返回;
  3. 若不存在映射关系,则解析待分类对象C的上级继承链,然后将上级继承链中每一级父类作为key,从HashMap中查找映射关系;
  4. 若不存在映射关系,则将上级继承链中每一级父类所实现的接口作为key,再次从HashMap中查找映射关系;
  5. 最后,若存在映射关系,则将映射关系添加到HashMap中(以待分类对象C为key),然后将HashMap中的value值返回;若不存在映射关系,则将默认值作为分类结果返回。

BinaryExceptionClassifier<Throwable, Boolean>继承自SubclassClassifier,主要用于将异常分类为两种类型:truefalse,这也正符合其二元异常分类器的定位。BinaryExceptionClassifier不仅沿用了SubclassClassifier的分类逻辑,同时还进一步拓展:首先通过getCause()方法一层一层地获取待分类异常的原因,然后以这些Throwable类型的原因作为key,来寻找映射关系。

1.4 RetryPolicy

RetryPolicy接口定义了四个方法,canRetry()方法用来判决是否可以重试;open()方法用来分配RetryContext;registerThrowable()方法用于在每次重试失败后注册异常。

代码语言:javascript
复制
public interface RetryPolicy extends Serializable {
    boolean canRetry(RetryContext context);
    RetryContext open(RetryContext parent);
    void close(RetryContext context);
    void registerThrowable(RetryContext context, Throwable throwable);
}
1.4.1 SimpleRetryPolicy
  • 重试次数为固定值,默认最大重试次数为3;
  • 持有BinaryExceptionClassifier二元异常分类器,用于异常分类,只有被分类为true的异常才有可能进行重试操作。
1.4.2 TimeoutRetryPolicy
  • 重试次数为动态值;
  • 如果目标方法执行时间超过超时阈值,则进行重试操作,默认超时时间1000ms。
1.4.3 ExceptionClassifierRetryPolicy
  • 重试次数取决于委托的重试策略;
  • 持有SubclassClassifier<Throwable, RetryPolicy>异常分类器,用于将异常分类为不同的重试策略,然后根据特定重试策略来判决是否可以重试。
1.4.4 BinaryExceptionClassifierRetryPolicy
  • 重试次数为动态值;
  • 持有BinaryExceptionClassifier<Throwable, Boolean>二元异常分类器,用于将异常分类为两种类型:true或false,true意味着可以进行重试,false意味着无法进行重试。
1.4.5 CompositeRetryPolicy
  • 重试次数为动态值;
  • 组合重试策略持有RetryPolicy[],其有两种模式:乐观模式和悲观模式;若是乐观模式,则只要一个重试策略可以重试,即认为可以重试;若是悲观模式,则只要有一个重试粗略不可以重试,即认为不可以重试。
1.4.6 CircuitBreakerRetryPolicy
  • 一种有熔断器功能的重试策略,该重试策略只能应用于有状态重试场景。

1.5 BackOffPolicy

为了应对瞬时性故障,重试操作往往应该间隔一段时间后执行,BackOffPolicy接口即是对重试间隔的抽象;backOff()方法是BackOffPolicy接口的核心,在Spring Retry 1.1 前其底层依托于Object#wait()方法来模拟重试间隔,在在Spring Retry 1.1 后依托于Thread#sleep()方法来模拟重试间隔。

代码语言:javascript
复制
public interface BackOffPolicy {
    BackOffContext start(RetryContext context);
    void backOff(BackOffContext backOffContext) throws BackOffInterruptedException;
}
1.5.1 NoBackOffPolicy
  • 无间隔重试,即立即执行下次重试。
1.5.2 FixedBackOffPolicy
  • 重试间隔时间为固定值,默认值为1000ms。
1.5.3 UniformRandomBackOffPolicy
  • 重试间隔时间为动态值;
  • minBackOffPeriod默认值为500ms,maxBackOffPeriod默认值为1500ms;下一次重试间隔时间计算公式:
代码语言:javascript
复制
minBackOffPeriod + random.nextInt(maxBackOffPeriod - minBackOffPeriod)
1.5.4 ExponentialBackOffPolicy
  • 重试间隔时间为动态指数值;
  • initialInterval默认值为100ms,maxInterval默认值为30000ms,multiplier默认值为2;下一次重试间隔时间计算公式:
代码语言:javascript
复制
interval * multiplier
1.5.5 ExponentialRandomBackOffPolicy
  • 重试间隔时间为动态随机指数值;
  • initialInterval默认值为100ms,maxInterval默认值为30000ms,multiplier默认值为2;下一次重试间隔时间计算公式:
代码语言:javascript
复制
(interval * multiplier) * (1 + r.nextFloat() * (multiplier - 1))

1.6 RecoveryCallback

为了进一步增强业务方法的健壮性,我们可以通过实现RecoveryCallback回调接口来封装一个兜底逻辑;这样在重试已耗尽且业务方法依然执行失败的时候,就会执行该兜底逻辑。

代码语言:javascript
复制
public interface RecoveryCallback<T> {
    T recover(RetryContext context) throws Exception;
}

1.7 RetryListener

个人感觉重试监听器是一个鸡肋的功能,在实际工作中,基本用不到重试监听器,我们重点关注RetryListener接口中open()close()onError()这三个方法的调用时机即可。

代码语言:javascript
复制
public interface RetryListener {
    // 在整个重试之前执行
    <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback);
    // 在整个重试之后执行
    <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
    // 在每次重试失败后执行
    <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
}

1.8 RetryState

在Spring Retry中,重试可以分为:无状态重试(Stateless Retry)与有状态重试(Stateful Retry)。在无状态重试中,每次进入RetryTemplate#execute()方法内的while循环前,一定会生成一个全新的RetryContext实例。但在有状态重试中,第一次进入RetryTemplate#execute()方法内的while循环前,首先会生成一个全新的RetryContext实例,然后根据目标方法生成一个key,最后将key与RetryContext实例组成键值对保存在RetryContextCache中;那么在下次进入RetryTemplate#execute()方法内的while循环前,就不再生成RetryContext实例了,而是使用RetryContextCache中的RetryContext实例。换句话说,无状态重试中的RetryContext实例是保存在栈内,而有状态重试中的RetryContext实例是保存在堆内

相信大家和笔者一样,在工作中使用无状态重试,但其实有状态重试也是有用武之地的,比如:事务回滚和熔断器。

在事务回滚场景中,当目标方法(业务方法)抛出特定异常时,重试变得没有意义了,需要立即从execute()方法内的while循环语句内重新抛出该异常,从而进行事务回滚操作,重新抛出异常代码如下所示:

代码语言:javascript
复制
if (shouldRethrow(retryPolicy, context, state)) {
    throw RetryTemplate.<E>wrapIfNecessary(e);
}

在熔断器场景中,CircuitBreakerRetryPolicy一般与SimpleRetryPolicy或MaxAttemptsRetryPolicy组合使用,前者委派后者来进行重试决策,但需要明确一点:目标方法每次只会在execute()方法内的while循环中执行一次(目标方法的每一次执行都是在不同的线程中完成的),之后借助于如下代码立即退出while循环:

代码语言:javascript
复制
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
    break;
}

RetryState接口用于标识无状态重试与有状态重试,具体地,在调用execute()方法时,若RetryState参数为null,则意味着无状态重试,否则为有状态重试。RetryState接口定义了三个方法:

  • getKey(),生成RetryContext实例的全局唯一标识;
  • isForceRefresh(),若值为true,则每次都重新生成RetryContext实例,若值为false,则从RetryContextCache中查询RetryContext实例;
  • rollbackFor(),标识该异常是否需要回滚。
代码语言:javascript
复制
public interface RetryState {
    Object getKey();
    boolean isForceRefresh();
    boolean rollbackFor(Throwable exception);
}

2 声明式重试中若干注解

2.1 @Backoff

属性

描述

数据类型

默认值

value

重试间隔时间,若delay>0,则忽略value值,否则选用value值

long

1000 ms

delay

重试间隔时间,若delay=0时,则选用value值;否则选用delay值

long

0 ms

maxDelay

最大重试间隔时间,若maxDelay<delay,则maxDelay=30000 ms

long

0 ms

multiplier

乘数,若multiplier=0,则忽略;若multiplier>0,则用于生成下一次重试间隔时间

double

0

delayExpression

重试间隔时间表达式,一般用于从配置文件中加载delay值

String

""

maxDelayExpression

最大重试间隔时间表达式,一般用于从配置文件中加载maxDelay值

String

""

multiplierExpression

乘数表达式,一般用于从配置文件中加载multiplier值

String

""

2.2 @Retryable

属性

描述

数据类型

默认值

recover

兜底方法

String

""

interceptor

重试拦截器MethodInterceptor

String

""

value

可重试异常白名单

Class<? extends Throwable>[]

{}

include

可重试异常白名单

Class<? extends Throwable>[]

{}

exclude

可重试异常黑名单

Class<? extends Throwable>[]

{}

stateful

有状态重试或无状态重试

boolean

false

maxAttempts

最大重试次数

int

3

maxAttemptsExpression

最大重试次数表达式,一般用于从配置文件中加载maxAttempts值

String

""

backoff

退避策略

Backoff

@Backoff()

listeners

重试监听器

String[]

{}

2.3 @Recover

@Recover是一个标记接口,并没有定义相关属性。但需要注意:兜底方法的第一个参数是可选的,若存在,则只能是Throwable及其子类,其余参数必须与目标方法中的参数一致,包括:类型和顺序。

2.4 @CircuitBreaker

属性

描述

数据类型

默认值

value

可重试异常白名单

Class<? extends Throwable>[]

{}

include

可重试异常白名单

Class<? extends Throwable>[]

{}

exclude

可重试异常黑名单

Class<? extends Throwable>[]

{}

maxAttempts

最大重试次数

int

3

maxAttemptsExpression

最大重试次数表达式,一般用于从配置文件中加载maxAttempts值

String

""

resetTimeout

熔断器重置超时时间,如果熔断器开启时间大于resetTimeout值,则在下一次调用目标方法时重置该熔断器,从而使得目标方法可以尝试从下游接口获得响应数据

int

20000

resetTimeoutExpression

熔断器重置超时时间表达式

String

""

openTimeout

熔断器开始超时时间,若目标方法在(0,openTimeout)时间范围内重试次数耗尽,那么将自动打开熔断器,从而屏蔽目标方法对下游接口的访问

int

5000

openTimeoutExpression

熔断器开始超时时间表达式

String

""

3 快速入门

GoogleSearchService通过委派谷歌搜索接口来搜索给定内容。GoogleSearchService

代码语言:javascript
复制
public interface GoogleSearchService {
    public void search(String content) throws IOException, InterruptedException, URISyntaxException;
}

GoogleSearchServiceImpl

代码语言:javascript
复制
public class GoogleSearchServiceImpl implements GoogleSearchService {
    private static final Logger LOGGER = LoggerFactory.getLogger(GoogleSearchServiceImpl.class);

    @Override
    public void search(String content) throws IOException, InterruptedException, URISyntaxException {
        LOGGER.info(">>>>>> searching··· ");

        URL url = new URL("https://www.google.com/complete/search?q=" + content);
        URI uri = new URI(url.getProtocol(), url.getHost(), url.getPath(), url.getQuery(), null);

        HttpRequest request = HttpRequest
                .newBuilder()
                .uri(uri)
                .build();
        HttpClient client = HttpClient
                .newBuilder()
                .version(HttpClient.Version.HTTP_1_1)
                .connectTimeout(Duration.ofSeconds(3))
                .build();
        HttpResponse<Void> response = client.send(request, HttpResponse.BodyHandlers.discarding());
    }
}

需求search()方法抛出HttpConnectTimeoutException异常,则重复执行search()方法,最大重试次数为三次(包括初始执行),重试间隔为500毫秒;如果重试耗尽后,search()方法依然抛出该异常,则打印错误日志信息。

下面分别从两个小节来展示编程式重试和声明式重试是如何实现上述需求的。

3.1 编程式重试

代码语言:javascript
复制
public class ProgrammaticSpringRetryApp {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProgrammaticSpringRetryApp.class);

    public static void main(String[] args) throws Throwable {
        GoogleSearchService googleSearchService = new GoogleSearchServiceImpl();
        // 重试策略:CompositeRetryPolict,由MaxAttemptRetryPolicy和BinaryExceptionClassifierRetryPolicy组成
        // 退避策略:FixedBackOffPolicy,重试间隔500毫秒
        RetryTemplate retryTemplate = RetryTemplate
                .builder()
                .retryOn(HttpConnectTimeoutException.class)
                .maxAttempts(3)
                .fixedBackoff(500)
                .build();

        retryTemplate.execute(
                new RetryCallback<Object, Throwable>() {
                    @Override
                    public Object doWithRetry(RetryContext context) throws Throwable {
                        googleSearchService.search("spring retry");
                        return null;
                    }
                },
                new RecoveryCallback<Object>() {
                    @Override
                    public Object recover(RetryContext context) throws Exception {
                        LOGGER.error("调用谷歌搜索接口失败,执行兜底逻辑");
                        return null;
                    }
                }
        );
    }
}

执行结果

代码语言:javascript
复制
2021:20:25.005 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
2021:20:25.015 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching··· 
2021:20:30.117 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
2021:20:30.117 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
2021:20:30.118 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching··· 
2021:20:33.641 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
2021:20:33.641 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
2021:20:33.641 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching··· 
2021:20:36.655 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
2021:20:36.655 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
2021:20:36.656 [main] ERROR pers.duxiaotou.ProgrammaticSpringRetryApp - 调用谷歌搜索接口失败,执行兜底逻辑

3.2 声明式重试

代码语言:javascript
复制
@Service
@Slf4j
public class GoogleSearchServiceImpl implements GoogleSearchService {
    private static final Logger LOGGER = LoggerFactory.getLogger(GoogleSearchServiceImpl.class);

    @Retryable(
            maxAttempts = 3,
            backoff = @Backoff(value = 500),
            include = {HttpConnectTimeoutException.class},
            stateful = false)
    @Override
    public void search(String content) throws IOException, InterruptedException, URISyntaxException {
        LOGGER.info(">>>>>> searching··· ");

        URL url = new URL("https://www.google.com/complete/search?q=" + content);
        URI uri = new URI(url.getProtocol(), url.getHost(), url.getPath(), url.getQuery(), null);

        HttpRequest request = HttpRequest
                .newBuilder()
                .uri(uri)
                .build();
        HttpClient client = HttpClient
                .newBuilder()
                .version(HttpClient.Version.HTTP_1_1)
                .connectTimeout(Duration.ofSeconds(3))
                .build();
        HttpResponse<Void> response = client.send(request, HttpResponse.BodyHandlers.discarding());
    }

    @Recover
    public void recover(Throwable throwable, String content) {
        LOGGER.error("调用谷歌搜索接口失败,执行兜底逻辑");
    }
}
代码语言:javascript
复制
@SpringBootApplication
@EnableRetry
public class DuXiaoTouSpringAopApp {
    public static void main(String[] args) throws Throwable {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(DuXiaoTouSpringAopApp.class, args);
        GoogleSearchService googleSearchService = applicationContext.getBean(GoogleSearchService.class);
        googleSearchService.search("spring retry");
    }
}

执行结果

代码语言:javascript
复制
2021-06-14 21:39:01.321 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate          : Retry: count=0
2021-06-14 21:39:01.321  INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl       : >>>>>> searching··· 
2021-06-14 21:39:04.907 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=1
2021-06-14 21:39:04.907 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate          : Retry: count=1
2021-06-14 21:39:04.908  INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl       : >>>>>> searching··· 
2021-06-14 21:39:08.436 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=2
2021-06-14 21:39:08.436 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate          : Retry: count=2
2021-06-14 21:39:08.437  INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl       : >>>>>> searching··· 
2021-06-14 21:39:11.449 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=3
2021-06-14 21:39:11.449 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate          : Retry failed last attempt: count=3
2021-06-14 21:39:11.450 ERROR 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl       : 调用谷歌搜索接口失败,执行兜底逻辑

4 @Retryable实现原理剖析

@Retryable注解添加到目标对象GoogleSearchServiceImpl中search()方法后,当search()方法抛出HttpConnectTimeoutException异常时,Spring Retry会自动为调用方重复执行search()方法。那Spring Retry究竟是如何为调用方提供自动重试能力的呢?众所周知,获取重试能力的关键在于@EnableRetry注解,该注解可以开启Spring Retry开关。那@EnableRetry注解可能就是解开谜团的突破口了。

代码语言:javascript
复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(proxyTargetClass = false)
@Import(RetryConfiguration.class)
@Documented
public @interface EnableRetry {
 boolean proxyTargetClass() default false;
}

在@Retryable源码中,我们发现了@EnableAspectJAutoProxy注解的身影,这说明Spring Retry是基于Spring AOP为目标对象生成代理对象从而拓展出重试能力的!

关于Spring AOP的基础知识请参考《Spring AOP,从入门到进阶》。BeanPostProcessor接口是Spring中常用的IoC容器拓展点;有了BeanPostProcessor,任何人都可以在Bean初始化前后对其进行个性化改造,甚至使用代理对象将Bean替换;在Spring AOP中,扮演BeanPostProcessor角色的是AbstractAutoProxyCreator抽象类,其主要用于创建代理对象。

代码语言:javascript
复制
public abstract class AbstractAutoProxyCreator implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
        if (bean != null) {
            return wrapIfNecessary(bean, beanName);
        }
        return bean;
    }
    
    protected Object wrapIfNecessary(Object bean, String beanName) {
        Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
        if (specificInterceptors != DO_NOT_PROXY) {
            Object proxy = createProxy(
                    bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
            return proxy;
        }
        return bean;
    }
}

从上述AbstractAutoProxyCreator源码中,我们发现:只有当某一Bean获取到PointcutAdvisor时(specificInterceptors数组非空),才会为该Bean生成代理对象。显然,Spring Retry肯定定义了一个PointcutAdvisor,要不然谁会闲的无聊帮你定义哦!

RetryConfiguration就是Spring Retry自己定义的PointcutAdvisor,它主要负责构建AdvicePointcut,RetryConfiguration源码如下:

代码语言:javascript
复制
@Component
public class RetryConfiguration extends AbstractPointcutAdvisor implements IntroductionAdvisor, BeanFactoryAware, InitializingBean {
    private Advice advice;

    private Pointcut pointcut;

    @Override
    public void afterPropertiesSet() throws Exception {
        Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
        retryableAnnotationTypes.add(Retryable.class);
        this.pointcut = buildPointcut(retryableAnnotationTypes);
        this.advice = buildAdvice();
    }

    protected Advice buildAdvice() {
        return new AnnotationAwareRetryOperationsInterceptor();
    }
    
    protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {
        ComposablePointcut result = null;
        for (Class<? extends Annotation> retryAnnotationType : retryAnnotationTypes) {
            Pointcut filter = new RetryConfiguration.AnnotationClassOrMethodPointcut(retryAnnotationType);
            if (result == null) {
                result = new ComposablePointcut(filter);
            }
            else {
                result.union(filter);
            }
        }
        return result;
    }
}

至此,我们搞明白了一点,即只要目标对象含有@Retryable注解,那么就一定可以获取到RetryConfiguration这一PointcutAdvisor,继而为目标对象生成代理对象!

接下来,我们需要搞清楚RetryConfiguration在构建Advice时所使用的AnnotationAwareRetryOperationsInterceptor有何意义?AnnotationAwareRetryOperationsInterceptor继承自MethodInterceptor,无疑invoke()是核心逻辑。

代码语言:javascript
复制
public class AnnotationAwareRetryOperationsInterceptor implements MethodInterceptor {
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // invocation.getThis()获取到的为目标对象,如:GoogleSearchServiceImpl
        // invocation.getMethod()获取到的为目标方法,如:search()
        // 若是无状态重试,则委派RetryOperationsInterceptor进行处理
        // 若是有状态重试,则委派StatefulRetryOperationsInterceptor进行处理
        MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
        if (delegate != null) {
            return delegate.invoke(invocation);
        }
        else {
            return invocation.proceed();
        }
    }

    /**
     * 根据目标对象和目标方法获取对应的重试拦截器
     */
    private MethodInterceptor getDelegate(Object target, Method method) {
        MethodInterceptor interceptor = NULL_INTERCEPTOR;
        Retryable retryable = findAnnotationOnTarget(target, method, Retryable.class);
        if (retryable != null) {
            if (StringUtils.hasText(retryable.interceptor())) {
                interceptor = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
            }
            else if (retryable.stateful()) {
                interceptor = getStatefulInterceptor(target, method, retryable);
            }
            else {
                interceptor = getStatelessInterceptor(target, method, retryable);
            }
        }
        return interceptor;
    }

    /**
     * 无状态重试拦截器:RetryOperationsInterceptor
     */
    private MethodInterceptor getStatelessInterceptor(Object target, Method method, Retryable retryable) {
        RetryTemplate template = createTemplate(retryable.listeners());
        template.setRetryPolicy(getRetryPolicy(retryable));
        template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
        return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label())
                .recoverer(getRecoverer(target, method)).build();
    }

    /**
     * 有状态重试拦截器:StatefulRetryOperationsInterceptor
     */
    private MethodInterceptor getStatefulInterceptor(Object target, Method method, Retryable retryable) {
        RetryTemplate template = createTemplate(retryable.listeners());
        template.setRetryContextCache(this.retryContextCache);

        CircuitBreaker circuit = findAnnotationOnTarget(target, method, CircuitBreaker.class);
        if (circuit != null) {
            RetryPolicy policy = getRetryPolicy(circuit);
            CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy(policy);
            breaker.setOpenTimeout(getOpenTimeout(circuit));
            breaker.setResetTimeout(getResetTimeout(circuit));
            template.setRetryPolicy(breaker);
            template.setBackOffPolicy(new NoBackOffPolicy());
            String label = circuit.label();
            if (!StringUtils.hasText(label)) {
                label = method.toGenericString();
            }
            return RetryInterceptorBuilder.circuitBreaker().keyGenerator(new FixedKeyGenerator("circuit"))
                    .retryOperations(template).recoverer(getRecoverer(target, method)).label(label).build();
        }
        RetryPolicy policy = getRetryPolicy(retryable);
        template.setRetryPolicy(policy);
        template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
        String label = retryable.label();
        return RetryInterceptorBuilder.stateful().keyGenerator(this.methodArgumentsKeyGenerator)
                .newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier).retryOperations(template).label(label)
                .recoverer(getRecoverer(target, method)).build();
    }
}

AnnotationAwareRetryOperationsInterceptor的invoke()主要逻辑有:

  1. 首先,根据目标对象和目标方法获取Retryable注解接口;
  2. 然后,优先根据Retryable注解中interceptor属性获取MethodInterceptor重试拦截器;若无重试拦截器,则进一步根据stateful属性获取RetryOperationsInterceptor或StatefulRetryOperationsInterceptor;
  3. 最后,AnnotationAwareRetryOperationsInterceptor委派RetryOperationsInterceptor或StatefulRetryOperationsInterceptor进行处理。

RetryOperationsInterceptorStatefulRetryOperationsInterceptor同样继承自MethodInterceptor接口,继续跟进invoke()方法。RetryOperationsInterceptor

代码语言:javascript
复制
public class RetryOperationsInterceptor implements MethodInterceptor {
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        String name;
        if (StringUtils.hasText(label)) {
            name = label;
        } else {
            name = invocation.getMethod().toGenericString();
        }
        final String label = name;
        // 封装RetryCallback回调实例
        // RetryCallback中doWithRetry()方法内最终将执行到目标对象的目标方法
        RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(
                invocation, label) {
            public Object doWithRetry(RetryContext context) throws Exception {
                context.setAttribute(RetryContext.NAME, label);
                return ((ProxyMethodInvocation) invocation).invocableClone().proceed();
            }
        };
        if (recoverer != null) {
            RetryOperationsInterceptor.ItemRecovererCallback recoveryCallback = new RetryOperationsInterceptor.ItemRecovererCallback(invocation.getArguments(), recoverer);
            return this.retryOperations.execute(retryCallback, recoveryCallback);
        }
        return this.retryOperations.execute(retryCallback);
    }
}

StatefulRetryOperationsInterceptor

代码语言:javascript
复制
public class StatefulRetryOperationsInterceptor implements MethodInterceptor {
    @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        Object defaultKey = Arrays.asList(args);
        if (args.length == 1) {
            defaultKey = args[0];
        }

        Object key = createKey(invocation, defaultKey);
        RetryState retryState = new DefaultRetryState(key,
                this.newMethodArgumentsIdentifier != null && this.newMethodArgumentsIdentifier.isNew(args),
                this.rollbackClassifier);

        RetryCallback<Object, Throwable> retryCallback = new StatefulMethodInvocationRetryCallback<Object, Throwable>(
                invocation, label) {
            public Object doWithRetry(RetryContext context) throws Exception {
                context.setAttribute(RetryContext.NAME, label);
                return this.invocation.proceed();
            }
        };
        if (recoverer != null) {
            ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(args, this.recoverer, retryState);
            return this.retryOperations.execute(retryCallback, recoveryCallback, retryState);
        }
        return this.retryOperations.execute(retryCallback, null, retryState);
    }
}

从上面RetryOperationsInterceptor和StatefulRetryOperationsInterceptor的源码来看,二者invoke()实现逻辑基本一致,最终都是手动调用RetryTemplate#execute()方法。

最后,RetryTemplate中的若干execute()重载方法的核心逻辑均落在doExecute()方法,解读如下:

代码语言:javascript
复制
public class RetryTemplate implements RetryOperations {
    private static final String GLOBAL_STATE = "state.global";
    
    protected <T, E extends Throwable> T doExecute(
            RetryCallback<T, E> retryCallback,
            RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
        // 重试策略
        RetryPolicy retryPolicy = this.retryPolicy;
        // 退避策略
        BackOffPolicy backOffPolicy = this.backOffPolicy;

        // 对于无状态重试,直接重新生成RetryContext实例
        // 对于有状态重试,则从RetryContextCache获取已有RetryContext实例
        RetryContext context = open(retryPolicy, state);

        // 上一次执行目标方法时抛出异常
        Throwable lastException = null;
        // 重试是否已耗尽
        boolean exhausted = false;
        try {
            // 在执行目标方法前,执行重试监听器的open()方法
            boolean running = doOpenInterceptors(retryCallback, context);
            if (!running) {
                throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
            }

            // 生成BackOffContext实例,其内维护的退避策略相关信息,比如重试间隔值
            BackOffContext backOffContext = null;
            Object resource = context.getAttribute("backOffContext");
            if (resource instanceof BackOffContext) {
                backOffContext = (BackOffContext) resource;
            }
            if (backOffContext == null) {
                backOffContext = backOffPolicy.start(context);
                if (backOffContext != null) {
                    context.setAttribute("backOffContext", backOffContext);
                }
            }
            // while循环
            // 委托具体RetryPolicy来进行重试判决
            // context.isExhaustedOnly()一般总是false
            while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                try {
                    lastException = null;
                    // 执行目标方法,即业务方法,同步回调
                    return retryCallback.doWithRetry(context);
                }
                catch (Throwable e) {
                    // 更新lastException
                    lastException = e;

                    // 注册异常
                    // 首先,委托RetryPolicy的registerThrowable()方法注册异常,如:记录重试次数
                    // 然后,若是有状态重试,则注册当前RetryContext,即将其添加到RetryContextCache
                    registerThrowable(retryPolicy, state, context, e);

                    // 执行重试监听器的OnError()方法
                    doOnErrorInterceptors(retryCallback, context, e);

                    // 当前执行目标方法失败,会执行退避策略,但前提是可以继续重试
                    if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                        // 执行退避策略,主要依赖Thread.sleep()方法来模拟重试间隔
                        backOffPolicy.backOff(backOffContext);
                    }
                    // 是否应该立即重新抛出异常
                    // 对于无状态异常,是不会重新抛出异常的
                    // 对于有状态异常,则委托RetryState进行判断是否需要重新抛出异常:
                    // state != null && state.rollbackFor(context.getLastThrowable())
                    if (shouldRethrow(retryPolicy, context, state)) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
                        }
                        // 抛出异常
                        throw RetryTemplate.<E>wrapIfNecessary(e);
                    }
                }
                // 能执行到这里,说明目标方法执行一定是执行失败了的,但并不需要重新抛出异常,即shouldRethrow()为false,
                // shouldRethrow()为false,有两种情况:无状态重试和有状态重试;
                // 这部分代码主要用于退出while循环,而当前只有在熔断器重试策略中,RetryContext中才会有GLOBAL_STATE哦,
                // 这也说明在熔断器模式下,调用方每次调用目标方法,最终只会执行一次,会在此退出while循环啊。
                if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                    break;
                }
            }
            // 执行至此,说明重试已耗尽了
            exhausted = true;
            // 重试耗尽,若有兜底方法,则执行兜底逻辑;否则抛出异常
            return handleRetryExhausted(recoveryCallback, context, state);
        }
        catch (Throwable e) {
            throw RetryTemplate.<E>wrapIfNecessary(e);
        }
        finally {
            close(retryPolicy, context, state, lastException == null || exhausted);
            // 执行重试监听器的close()方法
            doCloseInterceptors(retryCallback, context, lastException);
        }
    }
}

Spring Retry声明式重试执行流程

5 参考文档

  1. https://github.com/spring-projects/spring-retry
  2. https://developer.aliyun.com/article/92899
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿杜小头 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 初探Spring Retry
    • 1 核心API解读
      • 1.1 RetryContext
      • 1.2 RetryOperations
      • 1.3 Classifier
      • 1.4 RetryPolicy
      • 1.5 BackOffPolicy
      • 1.6 RecoveryCallback
      • 1.7 RetryListener
      • 1.8 RetryState
    • 2 声明式重试中若干注解
      • 2.1 @Backoff
      • 2.2 @Retryable
      • 2.3 @Recover
      • 2.4 @CircuitBreaker
    • 3 快速入门
      • 3.1 编程式重试
      • 3.2 声明式重试
    • 4 @Retryable实现原理剖析
      • 5 参考文档
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档