聊聊resilience4j的Retry

本文主要研究一下resilience4j的Retry

Retry

resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/Retry.java

/**
 * A Retry instance is thread-safe can be used to decorate multiple requests.
 * A Retry.
 */
public interface Retry {

    /**
     * Returns the ID of this Retry.
     *
     * @return the ID of this Retry
     */
    String getName();

    /**
     * Creates a retry Context.
     *
     * @return the retry Context
     */
    Retry.Context context();

    /**
     * Returns the RetryConfig of this Retry.
     *
     * @return the RetryConfig of this Retry
     */
    RetryConfig getRetryConfig();

    /**
     * Returns an EventPublisher can be used to register event consumers.
     *
     * @return an EventPublisher
     */
    EventPublisher getEventPublisher();

    /**
     * Creates a Retry with a custom Retry configuration.
     *
     * @param name the ID of the Retry
     * @param retryConfig a custom Retry configuration
     *
     * @return a Retry with a custom Retry configuration.
     */
    static Retry of(String name, RetryConfig retryConfig){
        return new RetryImpl(name, retryConfig);
    }

    /**
     * Creates a Retry with a custom Retry configuration.
     *
     * @param name the ID of the Retry
     * @param retryConfigSupplier a supplier of a custom Retry configuration
     *
     * @return a Retry with a custom Retry configuration.
     */
    static Retry of(String name, Supplier<RetryConfig> retryConfigSupplier){
        return new RetryImpl(name, retryConfigSupplier.get());
    }

    /**
     * Creates a retryable supplier.
     *
     * @param retry the retry context
     * @param supplier the original function
     * @param <T> the type of results supplied by this supplier
     *
     * @return a retryable function
     */
    static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier){
        return () -> {
            Retry.Context context = retry.context();
            do try {
                T result = supplier.get();
                context.onSuccess();
                return result;
            } catch (RuntimeException runtimeException) {
                context.onRuntimeError(runtimeException);
            } while (true);
        };
    }

    /**
     * Creates a retryable callable.
     *
     * @param retry the retry context
     * @param supplier the original function
     * @param <T> the type of results supplied by this supplier
     *
     * @return a retryable function
     */
    static <T> Callable<T> decorateCallable(Retry retry, Callable<T> supplier){
        return () -> {
            Retry.Context context = retry.context();
            do try {
                T result = supplier.call();
                context.onSuccess();
                return result;
            } catch (RuntimeException runtimeException) {
                context.onRuntimeError(runtimeException);
            } while (true);
        };
    }

    /**
     * Creates a retryable runnable.
     *
     * @param retry the retry context
     * @param runnable the original runnable
     *
     * @return a retryable runnable
     */
    static Runnable decorateRunnable(Retry retry, Runnable runnable){
        return () -> {
            Retry.Context context = retry.context();
            do try {
                runnable.run();
                context.onSuccess();
                break;
            } catch (RuntimeException runtimeException) {
                context.onRuntimeError(runtimeException);
            } while (true);
        };
    }
    //......
}
  • 这个类定义了一些工厂方法,最后new的是RetryImpl
  • 还定义了decorate开头的方法,包装retry的逻辑
  • retry逻辑是包装在一个循环里头,先执行业务代码,如果成功调用context.onSuccess(),跳出循环,如果失败捕获RuntimeException,然后调用context.onRuntimeError

RetryConfig

resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/RetryConfig.java

public class RetryConfig {

    public static final int DEFAULT_MAX_ATTEMPTS = 3;
    public static final long DEFAULT_WAIT_DURATION = 500;
    public static final IntervalFunction DEFAULT_INTERVAL_FUNCTION = (numOfAttempts) -> DEFAULT_WAIT_DURATION;
    public static final Predicate<Throwable> DEFAULT_RECORD_FAILURE_PREDICATE = (throwable) -> true;

    private int maxAttempts = DEFAULT_MAX_ATTEMPTS;

    private IntervalFunction intervalFunction = DEFAULT_INTERVAL_FUNCTION;
    // The default exception predicate retries all exceptions.
    private Predicate<Throwable> exceptionPredicate = DEFAULT_RECORD_FAILURE_PREDICATE;

    //......
}
  • 该配置主要有3个参数,一个是maxAttempts,一个是exceptionPredicate,一个是intervalFunction

Retry.Context

resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/Retry.java

   interface Context {

        /**
         *  Records a successful call.
         */
        void onSuccess();

        /**
         * Handles a checked exception
         *
         * @param exception the exception to handle
         * @throws Throwable the exception
         */
        void onError(Exception exception) throws Throwable;

        /**
         * Handles a runtime exception
         *
         * @param runtimeException the exception to handle
         */
        void onRuntimeError(RuntimeException runtimeException);
    }
  • 这个接口定义了onSuccess、onError、onRuntimeError

RetryImpl.ContextImpl

resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/internal/RetryImpl.java

   public final class ContextImpl implements Retry.Context {

        private final AtomicInteger numOfAttempts = new AtomicInteger(0);
        private final AtomicReference<Exception> lastException = new AtomicReference<>();
        private final AtomicReference<RuntimeException> lastRuntimeException = new AtomicReference<>();

        private ContextImpl() {
        }

        public void onSuccess() {
            int currentNumOfAttempts = numOfAttempts.get();
            if(currentNumOfAttempts > 0){
                succeededAfterRetryCounter.increment();
                Throwable throwable = Option.of(lastException.get()).getOrElse(lastRuntimeException.get());
                publishRetryEvent(() -> new RetryOnSuccessEvent(getName(), currentNumOfAttempts, throwable));
            }else{
                succeededWithoutRetryCounter.increment();
            }
        }

        public void onError(Exception exception) throws Throwable{
            if(exceptionPredicate.test(exception)){
                lastException.set(exception);
                throwOrSleepAfterException();
            }else{
                failedWithoutRetryCounter.increment();
                publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), exception));
                throw exception;
            }
        }

        public void onRuntimeError(RuntimeException runtimeException){
            if(exceptionPredicate.test(runtimeException)){
                lastRuntimeException.set(runtimeException);
                throwOrSleepAfterRuntimeException();
            }else{
                failedWithoutRetryCounter.increment();
                publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), runtimeException));
                throw runtimeException;
            }
        }

        private void throwOrSleepAfterException() throws Exception {
            int currentNumOfAttempts = numOfAttempts.incrementAndGet();
            Exception throwable = lastException.get();
            if(currentNumOfAttempts >= maxAttempts){
                failedAfterRetryCounter.increment();
                publishRetryEvent(() -> new RetryOnErrorEvent(getName(), currentNumOfAttempts, throwable));
                throw throwable;
            }else{
                waitIntervalAfterFailure(currentNumOfAttempts, throwable);
            }
        }

        private void throwOrSleepAfterRuntimeException(){
            int currentNumOfAttempts = numOfAttempts.incrementAndGet();
            RuntimeException throwable = lastRuntimeException.get();
            if(currentNumOfAttempts >= maxAttempts){
                failedAfterRetryCounter.increment();
                publishRetryEvent(() -> new RetryOnErrorEvent(getName(), currentNumOfAttempts, throwable));
                throw throwable;
            }else{
                waitIntervalAfterFailure(currentNumOfAttempts, throwable);
            }
        }

        private void waitIntervalAfterFailure(int currentNumOfAttempts, Throwable throwable) {
            // wait interval until the next attempt should start
            long interval = intervalFunction.apply(numOfAttempts.get());
            publishRetryEvent(()-> new RetryOnRetryEvent(getName(), currentNumOfAttempts, throwable, interval));
            Try.run(() -> sleepFunction.accept(interval))
                    .getOrElseThrow(ex -> lastRuntimeException.get());
        }

    }
  • throwOrSleepAfterRuntimeException方法会根据重试次数判断,如果超出则抛lastRuntimeException,如果不超出则调用waitIntervalAfterFailure
  • waitIntervalAfterFailure则通过sleepFunction来进行延时

小结

  • resilience4j的Retry沿用了该组件的一贯风格,通过decorate方法来织入重试的逻辑
  • 重试的逻辑就是一个while true循环,先执行业务方法,如果成功则调用Retry.Context的onSuccess方法,然后跳出循环,如果失败的话,只捕获RuntimeException,然后调用Retry.Context的onRuntimeError
  • onRuntimeError会先判断该异常是否需要重试,如果不需要则直接抛出原有异常,需要重试的话,则numOfAttempts.incrementAndGet(),如果超出限制则抛出异常,没有超出限制则根据配置的重试间隔进行sleep,然后onRuntimeError返回继续下一循环重试。

doc

  • Resilience4j is a fault tolerance library designed for Java8 and functional programming

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-07-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊HystrixEventNotifier

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/eventnotifier/Hyst...

1192
来自专栏一个会写诗的程序员的博客

Spring Boot集成Security使用数据库用户角色权限用户名问题问题描述原因分析解决方案

sql语法手误。1?这地方写错了,应该是?1。这在敲代码的时候,手速一旦稍有不慎,就会导致前后顺序颠倒,而导致输入错误。这个虽然说是“低级错误”,但是错误搞起来...

1086
来自专栏码匠的流水账

tomcat如何关闭response的outputStream

在写文件下载的时候,遇到了一个问题,就是这个ServletOutputStream到底要不要自己flush以及close。这里以tomcat容易为例,解读一下。

1521
来自专栏码匠的流水账

聊聊sentinel的SentinelWebAutoConfiguration

本文主要研究一下sentinel的SentinelWebAutoConfiguration

1920
来自专栏码匠的流水账

聊聊spring-data-redis的连接池的校验

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/o...

6701
来自专栏码匠的流水账

聊聊storm的CheckpointSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

2666
来自专栏大数据-Hadoop、Spark

【Hive】ERROR exec.DDLTask: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nati...

1/在$HADOOP_HOME/lib/native下面放了2.5.0-native-snappy.tar.gz,这是HDFS存储压缩所需的本地lib包。 c...

1112
来自专栏码匠的流水账

聊聊resilience4j的bulkhead

resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/Bulkhe...

2051
来自专栏码匠的流水账

聊聊sentinel的NettyHttpCommandCenter

com/alibaba/csp/sentinel/transport/command/NettyHttpCommandCenter.java

1801
来自专栏码匠的流水账

聊聊rocketmq的PushConsumerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

1862

扫码关注云+社区

领取腾讯云代金券