前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊ribbon的retry

聊聊ribbon的retry

作者头像
code4it
发布2018-09-17 17:09:06
1.2K0
发布2018-09-17 17:09:06
举报

本文主要研究一下ribbon的retry

配置

HttpClientRibbonConfiguration

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/apache/HttpClientRibbonConfiguration.java

    @Bean
    @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
            IClientConfig config, ServerIntrospector serverIntrospector,
            ILoadBalancer loadBalancer, RetryHandler retryHandler,
            LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient) {
        RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
            httpClient, config, serverIntrospector, loadBalancedRetryFactory);
        client.setLoadBalancer(loadBalancer);
        client.setRetryHandler(retryHandler);
        Monitors.registerObject("Client_" + this.name, client);
        return client;
    }

OkHttpRibbonConfiguration

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/okhttp/OkHttpRibbonConfiguration.java

    @Bean
    @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    public RetryableOkHttpLoadBalancingClient retryableOkHttpLoadBalancingClient(
        IClientConfig config,
        ServerIntrospector serverIntrospector,
        ILoadBalancer loadBalancer,
        RetryHandler retryHandler,
        LoadBalancedRetryFactory loadBalancedRetryFactory,
        OkHttpClient delegate) {
        RetryableOkHttpLoadBalancingClient client = new RetryableOkHttpLoadBalancingClient(delegate, config,
                serverIntrospector, loadBalancedRetryFactory);
        client.setLoadBalancer(loadBalancer);
        client.setRetryHandler(retryHandler);
        Monitors.registerObject("Client_" + this.name, client);
        return client;
    }

RetryableRibbonLoadBalancingHttpClient

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/apache/RetryableRibbonLoadBalancingHttpClient.java

/**
 * An Apache HTTP client which leverages Spring Retry to retry failed requests.
 * @author Ryan Baxter
 * @author Gang Li
 */
public class RetryableRibbonLoadBalancingHttpClient extends RibbonLoadBalancingHttpClient {
    private LoadBalancedRetryFactory loadBalancedRetryFactory;

    public RetryableRibbonLoadBalancingHttpClient(CloseableHttpClient delegate,
                                                  IClientConfig config, ServerIntrospector serverIntrospector,
                                                  LoadBalancedRetryFactory loadBalancedRetryFactory) {
        super(delegate, config, serverIntrospector);
        this.loadBalancedRetryFactory = loadBalancedRetryFactory;
    }

    @Override
    public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
        final RequestConfig.Builder builder = RequestConfig.custom();
        IClientConfig config = configOverride != null ? configOverride : this.config;
        RibbonProperties ribbon = RibbonProperties.from(config);
        builder.setConnectTimeout(ribbon.connectTimeout(this.connectTimeout));
        builder.setSocketTimeout(ribbon.readTimeout(this.readTimeout));
        builder.setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects));

        final RequestConfig requestConfig = builder.build();
        final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);

        RetryCallback<RibbonApacheHttpResponse, Exception> retryCallback = context -> {
            //on retries the policy will choose the server and set it in the context
            //extract the server and update the request being made
            RibbonApacheHttpRequest newRequest = request;
            if (context instanceof LoadBalancedRetryContext) {
                ServiceInstance service = ((LoadBalancedRetryContext) context).getServiceInstance();
                validateServiceInstance(service);
                if (service != null) {
                    //Reconstruct the request URI using the host and port set in the retry context
                    newRequest = newRequest.withNewUri(UriComponentsBuilder.newInstance().host(service.getHost())
                            .scheme(service.getUri().getScheme()).userInfo(newRequest.getURI().getUserInfo())
                            .port(service.getPort()).path(newRequest.getURI().getPath())
                            .query(newRequest.getURI().getQuery()).fragment(newRequest.getURI().getFragment())
                            .build().encode().toUri());
                }
            }
            newRequest = getSecureRequest(newRequest, configOverride);
            HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);
            final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
            if (retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {
                throw new HttpClientStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,
                        httpResponse, HttpClientUtils.createEntity(httpResponse), httpUriRequest.getURI());
            }
            return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
        };
        LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse> recoveryCallback = new LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse>() {
            @Override
            protected RibbonApacheHttpResponse createResponse(HttpResponse response, URI uri) {
                return new RibbonApacheHttpResponse(response, uri);
            }
         };
        return this.executeWithRetry(request, retryPolicy, retryCallback, recoveryCallback);
    }

    @Override
    public boolean isClientRetryable(ContextAwareRequest request) {
        return request!= null && isRequestRetryable(request);
    }

    private boolean isRequestRetryable(ContextAwareRequest request) {
        if (request.getContext() == null || request.getContext().getRetryable() == null) {
            return true;
        }
        return request.getContext().getRetryable();
    }

    private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, LoadBalancedRetryPolicy retryPolicy,
                                                      RetryCallback<RibbonApacheHttpResponse, Exception> callback,
                                                      RecoveryCallback<RibbonApacheHttpResponse> recoveryCallback) throws Exception {
        RetryTemplate retryTemplate = new RetryTemplate();
        boolean retryable = isRequestRetryable(request);
        retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
                : new RetryPolicy(request, retryPolicy, this, this.getClientName()));
        BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
        retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
        RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());
        if (retryListeners != null && retryListeners.length != 0) {
            retryTemplate.setListeners(retryListeners);
        }
        return retryTemplate.execute(callback, recoveryCallback);
    }

    @Override
    public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
        return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
    }

    static class RetryPolicy extends InterceptorRetryPolicy {
        public RetryPolicy(HttpRequest request, LoadBalancedRetryPolicy policy,
                ServiceInstanceChooser serviceInstanceChooser, String serviceName) {
            super(request, policy, serviceInstanceChooser, serviceName);
        }
    }
}
  • 这里重点看executeWithRetry,通过设置RetryTemplate的RetryPolicy及BackOffPolicy,来执行重试策略

RetryableOkHttpLoadBalancingClient

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/okhttp/RetryableOkHttpLoadBalancingClient.java

/**
 * An OK HTTP client which leverages Spring Retry to retry failed request.
 * @author Ryan Baxter
 * @author Gang Li
 */
public class RetryableOkHttpLoadBalancingClient extends OkHttpLoadBalancingClient {

    private LoadBalancedRetryFactory loadBalancedRetryFactory;

    public RetryableOkHttpLoadBalancingClient(OkHttpClient delegate, IClientConfig config, ServerIntrospector serverIntrospector,
                                              LoadBalancedRetryFactory loadBalancedRetryPolicyFactory) {
        super(delegate, config, serverIntrospector);
        this.loadBalancedRetryFactory = loadBalancedRetryPolicyFactory;
    }

    @Override
    public boolean isClientRetryable(ContextAwareRequest request) {
        return request!= null && isRequestRetryable(request);
    }

    private boolean isRequestRetryable(ContextAwareRequest request) {
        if (request.getContext() == null || request.getContext().getRetryable() == null) {
            return true;
        }
        return request.getContext().getRetryable();
    }

    private OkHttpRibbonResponse executeWithRetry(OkHttpRibbonRequest request, LoadBalancedRetryPolicy retryPolicy,
                                                  RetryCallback<OkHttpRibbonResponse, Exception> callback,
                                                  RecoveryCallback<OkHttpRibbonResponse> recoveryCallback) throws Exception {
        RetryTemplate retryTemplate = new RetryTemplate();
        BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
        retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
        RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());
        if (retryListeners != null && retryListeners.length != 0) {
            retryTemplate.setListeners(retryListeners);
        }
        boolean retryable = isRequestRetryable(request);
        retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
                : new RetryPolicy(request, retryPolicy, this, this.getClientName()));
        return retryTemplate.execute(callback, recoveryCallback);
    }

    @Override
    public OkHttpRibbonResponse execute(final OkHttpRibbonRequest ribbonRequest,
                                        final IClientConfig configOverride) throws Exception {
        final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);
        RetryCallback<OkHttpRibbonResponse, Exception> retryCallback  = new RetryCallback<OkHttpRibbonResponse, Exception>() {
            @Override
            public OkHttpRibbonResponse doWithRetry(RetryContext context) throws Exception {
                //on retries the policy will choose the server and set it in the context
                //extract the server and update the request being made
                OkHttpRibbonRequest newRequest = ribbonRequest;
                if(context instanceof LoadBalancedRetryContext) {
                    ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();
                    validateServiceInstance(service);
                    //Reconstruct the request URI using the host and port set in the retry context
                    newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),
                            newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),
                            newRequest.getURI().getPath(), newRequest.getURI().getQuery(),
                            newRequest.getURI().getFragment()));
                }
                if (isSecure(configOverride)) {
                    final URI secureUri = UriComponentsBuilder.fromUri(newRequest.getUri())
                            .scheme("https").build().toUri();
                    newRequest = newRequest.withNewUri(secureUri);
                }
                OkHttpClient httpClient = getOkHttpClient(configOverride, secure);

                final Request request = newRequest.toRequest();
                Response response = httpClient.newCall(request).execute();
                if(retryPolicy.retryableStatusCode(response.code())) {
                    ResponseBody responseBody = response.peekBody(Integer.MAX_VALUE);
                    response.close();
                    throw new OkHttpStatusCodeException(RetryableOkHttpLoadBalancingClient.this.clientName,
                            response, responseBody, newRequest.getURI());
                }
                return new OkHttpRibbonResponse(response, newRequest.getUri());
            }
        };
        return this.executeWithRetry(ribbonRequest, retryPolicy, retryCallback, new LoadBalancedRecoveryCallback<OkHttpRibbonResponse, Response>(){

            @Override
            protected OkHttpRibbonResponse createResponse(Response response, URI uri) {
                return new OkHttpRibbonResponse(response, uri);
            }
        });
    }



    @Override
    public RequestSpecificRetryHandler getRequestSpecificRetryHandler(OkHttpRibbonRequest request, IClientConfig requestConfig) {
        return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
    }

    static class RetryPolicy extends InterceptorRetryPolicy {
        public RetryPolicy(HttpRequest request, LoadBalancedRetryPolicy policy, ServiceInstanceChooser serviceInstanceChooser, String serviceName) {
            super(request, policy, serviceInstanceChooser, serviceName);
        }
    }
}
  • 这里的executeWithRetry,也是通过设置RetryTemplate的RetryPolicy及BackOffPolicy,来执行重试策略

RetryTemplate

spring-retry-1.2.2.RELEASE-sources.jar!/org/springframework/retry/support/RetryTemplate.java

public class RetryTemplate implements RetryOperations {

    /**
     * Retry context attribute name that indicates the context should be considered global
     * state (never closed). TODO: convert this to a flag in the RetryState.
     */
    private static final String GLOBAL_STATE = "state.global";

    protected final Log logger = LogFactory.getLog(getClass());

    private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy();

    private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3);

    private volatile RetryListener[] listeners = new RetryListener[0];

    private RetryContextCache retryContextCache = new MapRetryContextCache();

    private boolean throwLastExceptionOnExhausted;

    //......

    /**
     * Execute the callback once if the policy dictates that we can, otherwise execute the
     * recovery callback.
     * @param recoveryCallback the {@link RecoveryCallback}
     * @param retryCallback the {@link RetryCallback}
     * @param state the {@link RetryState}
     * @param <T> the type of the return value
     * @param <E> the exception type to throw
     * @see RetryOperations#execute(RetryCallback, RecoveryCallback, RetryState)
     * @throws ExhaustedRetryException if the retry has been exhausted.
     * @throws E an exception if the retry operation fails
     * @return T the retried value
     */
    protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
            RecoveryCallback<T> recoveryCallback, RetryState state)
            throws E, ExhaustedRetryException {

        RetryPolicy retryPolicy = this.retryPolicy;
        BackOffPolicy backOffPolicy = this.backOffPolicy;

        // Allow the retry policy to initialise itself...
        RetryContext context = open(retryPolicy, state);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("RetryContext retrieved: " + context);
        }

        // Make sure the context is available globally for clients who need
        // it...
        RetrySynchronizationManager.register(context);

        Throwable lastException = null;

        boolean exhausted = false;
        try {

            // Give clients a chance to enhance the context...
            boolean running = doOpenInterceptors(retryCallback, context);

            if (!running) {
                throw new TerminatedRetryException(
                        "Retry terminated abnormally by interceptor before first attempt");
            }

            // Get or Start the backoff context...
            BackOffContext backOffContext = null;
            Object resource = context.getAttribute("backOffContext");

            if (resource instanceof BackOffContext) {
                backOffContext = (BackOffContext) resource;
            }

            if (backOffContext == null) {
                backOffContext = backOffPolicy.start(context);
                if (backOffContext != null) {
                    context.setAttribute("backOffContext", backOffContext);
                }
            }

            /*
             * We allow the whole loop to be skipped if the policy or context already
             * forbid the first try. This is used in the case of external retry to allow a
             * recovery in handleRetryExhausted without the callback processing (which
             * would throw an exception).
             */
            while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

                try {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Retry: count=" + context.getRetryCount());
                    }
                    // Reset the last exception, so if we are successful
                    // the close interceptors will not think we failed...
                    lastException = null;
                    return retryCallback.doWithRetry(context);
                }
                catch (Throwable e) {

                    lastException = e;

                    try {
                        registerThrowable(retryPolicy, state, context, e);
                    }
                    catch (Exception ex) {
                        throw new TerminatedRetryException("Could not register throwable",
                                ex);
                    }
                    finally {
                        doOnErrorInterceptors(retryCallback, context, e);
                    }

                    if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                        try {
                            backOffPolicy.backOff(backOffContext);
                        }
                        catch (BackOffInterruptedException ex) {
                            lastException = e;
                            // back off was prevented by another thread - fail the retry
                            if (this.logger.isDebugEnabled()) {
                                this.logger
                                        .debug("Abort retry because interrupted: count="
                                                + context.getRetryCount());
                            }
                            throw ex;
                        }
                    }

                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug(
                                "Checking for rethrow: count=" + context.getRetryCount());
                    }

                    if (shouldRethrow(retryPolicy, context, state)) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Rethrow in retry for policy: count="
                                    + context.getRetryCount());
                        }
                        throw RetryTemplate.<E>wrapIfNecessary(e);
                    }

                }

                /*
                 * A stateful attempt that can retry may rethrow the exception before now,
                 * but if we get this far in a stateful retry there's a reason for it,
                 * like a circuit breaker or a rollback classifier.
                 */
                if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                    break;
                }
            }

            if (state == null && this.logger.isDebugEnabled()) {
                this.logger.debug(
                        "Retry failed last attempt: count=" + context.getRetryCount());
            }

            exhausted = true;
            return handleRetryExhausted(recoveryCallback, context, state);

        }
        catch (Throwable e) {
            throw RetryTemplate.<E>wrapIfNecessary(e);
        }
        finally {
            close(retryPolicy, context, state, lastException == null || exhausted);
            doCloseInterceptors(retryCallback, context, lastException);
            RetrySynchronizationManager.clear();
        }

    }

    //......
}
  • 整个retry的逻辑就是通过while控制重试次数,循环内没有返回,到循环外会抛出ExhaustedRetryException
  • BackOffPolicy的backOff方法通过sleeper.sleep执行重试间隔
  • BackOffPolicy有一个子接口SleepingBackOffPolicy,还有一个实现该接口的抽象类StatelessBackOffPolicy
  • SleepingBackOffPolicy的实现类有FixedBackOffPolicy、UniformRandomBackOffPolicy、ExponentialBackOffPolicy、ExponentialRandomBackOffPolicy这三个
  • StatelessBackOffPolicy有FixedBackOffPolicy、NoBackOffPolicy、UniformRandomBackOffPolicy三个子类

小结

ribbon的retry,有两个实现,一个是RetryableRibbonLoadBalancingHttpClient,另外一个是RetryableOkHttpLoadBalancingClient,他们使用的是spring-retry组件的RetryTemplate来实现,该template主要由RetryPolicy以及BackOffPolicy来控制重试。重试大的逻辑就是用while来控制重试次数,然后通过BackOffPolicy来控制重试时间间隔。

doc

  • 9. Retry
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 配置
    • HttpClientRibbonConfiguration
      • OkHttpRibbonConfiguration
      • RetryableRibbonLoadBalancingHttpClient
      • RetryableOkHttpLoadBalancingClient
      • RetryTemplate
      • 小结
      • doc
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档