聊聊spring cloud的AsyncLoadBalancerAutoConfiguration

本文主要研究一下AsyncLoadBalancerAutoConfiguration

AsyncLoadBalancerAutoConfiguration

spring-cloud-commons-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/client/loadbalancer/AsyncLoadBalancerAutoConfiguration.java

/**
 * Auto configuration for Ribbon (client side load balancing).
 *
 * @author Rob Worsnop
 */
@Configuration
@ConditionalOnBean(LoadBalancerClient.class)
@ConditionalOnClass(AsyncRestTemplate.class)
public class AsyncLoadBalancerAutoConfiguration {

    @Configuration
    static class AsyncRestTemplateCustomizerConfig {
        @LoadBalanced
        @Autowired(required = false)
        private List<AsyncRestTemplate> restTemplates = Collections.emptyList();

        @Bean
        public SmartInitializingSingleton loadBalancedAsyncRestTemplateInitializer(
                final List<AsyncRestTemplateCustomizer> customizers) {
            return new SmartInitializingSingleton() {
                @Override
                public void afterSingletonsInstantiated() {
                    for (AsyncRestTemplate restTemplate : AsyncRestTemplateCustomizerConfig.this.restTemplates) {
                        for (AsyncRestTemplateCustomizer customizer : customizers) {
                            customizer.customize(restTemplate);
                        }
                    }
                }
            };
        }
    }

    @Configuration
    static class LoadBalancerInterceptorConfig {
        @Bean
        public AsyncLoadBalancerInterceptor asyncLoadBalancerInterceptor(LoadBalancerClient loadBalancerClient) {
            return new AsyncLoadBalancerInterceptor(loadBalancerClient);
        }

        @Bean
        public AsyncRestTemplateCustomizer asyncRestTemplateCustomizer(
                final AsyncLoadBalancerInterceptor loadBalancerInterceptor) {
            return new AsyncRestTemplateCustomizer() {
                @Override
                public void customize(AsyncRestTemplate restTemplate) {
                    List<AsyncClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                }
            };
        }
    }
}
  • 这里创建一个AsyncRestTemplateCustomizerConfig,用于加载AsyncRestTemplateCustomizer
  • 还有一个LoadBalancerInterceptorConfig,配置了AsyncLoadBalancerInterceptor及AsyncRestTemplateCustomizer

AsyncRestTemplateCustomizer

spring-cloud-commons-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/client/loadbalancer/AsyncRestTemplateCustomizer.java

public interface AsyncRestTemplateCustomizer {
    void customize(AsyncRestTemplate restTemplate);
}
  • 这里采用匿名类实现,主要就是设置AsyncClientHttpRequestInterceptor

AsyncClientHttpRequestInterceptor

spring-cloud-commons-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/client/loadbalancer/AsyncLoadBalancerInterceptor.java

public class AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        this.loadBalancer = loadBalancer;
    }

    @Override
    public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body,
            final AsyncClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        return this.loadBalancer.execute(serviceName,
                new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
                    @Override
                    public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance)
                            throws Exception {
                        HttpRequest serviceRequest = new ServiceRequestWrapper(request,
                                instance, loadBalancer);
                        return execution.executeAsync(serviceRequest, body);
                    }

                });
    }
}
  • 这个拦截器从url获取serviceName,然后调用loadBalancer.execute方法
  • 这里构造的LoadBalancerRequest,采用ServiceRequestWrapper,调用的是execution.executeAsync

AbstractAsyncClientHttpRequest

spring-web-5.0.7.RELEASE-sources.jar!/org/springframework/http/client/AbstractAsyncClientHttpRequest.java

/**
 * Abstract base for {@link AsyncClientHttpRequest} that makes sure that headers and body
 * are not written multiple times.
 *
 * @author Arjen Poutsma
 * @since 4.0
 * @deprecated as of Spring 5.0, in favor of {@link org.springframework.http.client.reactive.AbstractClientHttpRequest}
 */
@Deprecated
abstract class AbstractAsyncClientHttpRequest implements AsyncClientHttpRequest {

    private final HttpHeaders headers = new HttpHeaders();

    private boolean executed = false;

    @Override
    public final HttpHeaders getHeaders() {
        return (this.executed ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
    }

    @Override
    public final OutputStream getBody() throws IOException {
        assertNotExecuted();
        return getBodyInternal(this.headers);
    }

    @Override
    public ListenableFuture<ClientHttpResponse> executeAsync() throws IOException {
        assertNotExecuted();
        ListenableFuture<ClientHttpResponse> result = executeInternal(this.headers);
        this.executed = true;
        return result;
    }

    /**
     * Asserts that this request has not been {@linkplain #executeAsync() executed} yet.
     * @throws IllegalStateException if this request has been executed
     */
    protected void assertNotExecuted() {
        Assert.state(!this.executed, "ClientHttpRequest already executed");
    }

    /**
     * Abstract template method that returns the body.
     * @param headers the HTTP headers
     * @return the body output stream
     */
    protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException;

    /**
     * Abstract template method that writes the given headers and content to the HTTP request.
     * @param headers the HTTP headers
     * @return the response object for the executed request
     */
    protected abstract ListenableFuture<ClientHttpResponse> executeInternal(HttpHeaders headers)
            throws IOException;

}
  • 这个executeAsync的委托给子类的executeInternal实现
  • 主要有SimpleStreamingAsyncClientHttpRequest、Netty4ClientHttpRequest两个实现

SimpleStreamingAsyncClientHttpRequest

spring-web-5.0.7.RELEASE-sources.jar!/org/springframework/http/client/SimpleStreamingAsyncClientHttpRequest.java

/**
 * {@link org.springframework.http.client.ClientHttpRequest} implementation
 * that uses standard Java facilities to execute streaming requests. Created
 * via the {@link org.springframework.http.client.SimpleClientHttpRequestFactory}.
 *
 * @author Arjen Poutsma
 * @since 3.0
 * @see org.springframework.http.client.SimpleClientHttpRequestFactory#createRequest
 * @see org.springframework.http.client.support.AsyncHttpAccessor
 * @see org.springframework.web.client.AsyncRestTemplate
 * @deprecated as of Spring 5.0, with no direct replacement
 */
@Deprecated
final class SimpleStreamingAsyncClientHttpRequest extends AbstractAsyncClientHttpRequest {

    private final HttpURLConnection connection;

    private final int chunkSize;

    @Nullable
    private OutputStream body;

    private final boolean outputStreaming;

    private final AsyncListenableTaskExecutor taskExecutor;

    SimpleStreamingAsyncClientHttpRequest(HttpURLConnection connection, int chunkSize,
            boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {

        this.connection = connection;
        this.chunkSize = chunkSize;
        this.outputStreaming = outputStreaming;
        this.taskExecutor = taskExecutor;
    }

    @Override
    public String getMethodValue() {
        return this.connection.getRequestMethod();
    }

    @Override
    public URI getURI() {
        try {
            return this.connection.getURL().toURI();
        }
        catch (URISyntaxException ex) {
            throw new IllegalStateException(
                    "Could not get HttpURLConnection URI: " + ex.getMessage(), ex);
        }
    }

    @Override
    protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {
        if (this.body == null) {
            if (this.outputStreaming) {
                long contentLength = headers.getContentLength();
                if (contentLength >= 0) {
                    this.connection.setFixedLengthStreamingMode(contentLength);
                }
                else {
                    this.connection.setChunkedStreamingMode(this.chunkSize);
                }
            }
            SimpleBufferingClientHttpRequest.addHeaders(this.connection, headers);
            this.connection.connect();
            this.body = this.connection.getOutputStream();
        }
        return StreamUtils.nonClosing(this.body);
    }

    @Override
    protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
        return this.taskExecutor.submitListenable(new Callable<ClientHttpResponse>() {
            @Override
            public ClientHttpResponse call() throws Exception {
                try {
                    if (body != null) {
                        body.close();
                    }
                    else {
                        SimpleBufferingClientHttpRequest.addHeaders(connection, headers);
                        connection.connect();
                        // Immediately trigger the request in a no-output scenario as well
                        connection.getResponseCode();
                    }
                }
                catch (IOException ex) {
                    // ignore
                }
                return new SimpleClientHttpResponse(connection);
            }
        });

    }

}
  • 主要使用的是jdk的HttpURLConnection来实现

Netty4ClientHttpRequest

spring-web-5.0.7.RELEASE-sources.jar!/org/springframework/http/client/Netty4ClientHttpRequest.java

/**
 * {@link ClientHttpRequest} implementation based on Netty 4.
 *
 * <p>Created via the {@link Netty4ClientHttpRequestFactory}.
 *
 * @author Arjen Poutsma
 * @author Rossen Stoyanchev
 * @author Brian Clozel
 * @since 4.1.2
 * @deprecated as of Spring 5.0, in favor of
 * {@link org.springframework.http.client.reactive.ReactorClientHttpConnector}
 */
@Deprecated
class Netty4ClientHttpRequest extends AbstractAsyncClientHttpRequest implements ClientHttpRequest {

    private final Bootstrap bootstrap;

    private final URI uri;

    private final HttpMethod method;

    private final ByteBufOutputStream body;

    public Netty4ClientHttpRequest(Bootstrap bootstrap, URI uri, HttpMethod method) {
        this.bootstrap = bootstrap;
        this.uri = uri;
        this.method = method;
        this.body = new ByteBufOutputStream(Unpooled.buffer(1024));
    }

    @Override
    public HttpMethod getMethod() {
        return this.method;
    }

    @Override
    public String getMethodValue() {
        return this.method.name();
    }

    @Override
    public URI getURI() {
        return this.uri;
    }

    @Override
    public ClientHttpResponse execute() throws IOException {
        try {
            return executeAsync().get();
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted during request execution", ex);
        }
        catch (ExecutionException ex) {
            if (ex.getCause() instanceof IOException) {
                throw (IOException) ex.getCause();
            }
            else {
                throw new IOException(ex.getMessage(), ex.getCause());
            }
        }
    }

    @Override
    protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {
        return this.body;
    }

    @Override
    protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
        final SettableListenableFuture<ClientHttpResponse> responseFuture = new SettableListenableFuture<>();

        ChannelFutureListener connectionListener = future -> {
            if (future.isSuccess()) {
                Channel channel = future.channel();
                channel.pipeline().addLast(new RequestExecuteHandler(responseFuture));
                FullHttpRequest nettyRequest = createFullHttpRequest(headers);
                channel.writeAndFlush(nettyRequest);
            }
            else {
                responseFuture.setException(future.cause());
            }
        };

        this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
        return responseFuture;
    }

    private FullHttpRequest createFullHttpRequest(HttpHeaders headers) {
        io.netty.handler.codec.http.HttpMethod nettyMethod =
                io.netty.handler.codec.http.HttpMethod.valueOf(this.method.name());

        String authority = this.uri.getRawAuthority();
        String path = this.uri.toString().substring(this.uri.toString().indexOf(authority) + authority.length());
        FullHttpRequest nettyRequest = new DefaultFullHttpRequest(
                HttpVersion.HTTP_1_1, nettyMethod, path, this.body.buffer());

        nettyRequest.headers().set(HttpHeaders.HOST, this.uri.getHost() + ":" + getPort(uri));
        nettyRequest.headers().set(HttpHeaders.CONNECTION, "close");
        headers.forEach((headerName, headerValues) -> nettyRequest.headers().add(headerName, headerValues));
        if (!nettyRequest.headers().contains(HttpHeaders.CONTENT_LENGTH) && this.body.buffer().readableBytes() > 0) {
            nettyRequest.headers().set(HttpHeaders.CONTENT_LENGTH, this.body.buffer().readableBytes());
        }

        return nettyRequest;
    }

    private static int getPort(URI uri) {
        int port = uri.getPort();
        if (port == -1) {
            if ("http".equalsIgnoreCase(uri.getScheme())) {
                port = 80;
            }
            else if ("https".equalsIgnoreCase(uri.getScheme())) {
                port = 443;
            }
        }
        return port;
    }

    /**
     * A SimpleChannelInboundHandler to update the given SettableListenableFuture.
     */
    private static class RequestExecuteHandler extends SimpleChannelInboundHandler<FullHttpResponse> {

        private final SettableListenableFuture<ClientHttpResponse> responseFuture;

        public RequestExecuteHandler(SettableListenableFuture<ClientHttpResponse> responseFuture) {
            this.responseFuture = responseFuture;
        }

        @Override
        protected void channelRead0(ChannelHandlerContext context, FullHttpResponse response) throws Exception {
            this.responseFuture.set(new Netty4ClientHttpResponse(context, response));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
            this.responseFuture.setException(cause);
        }
    }

}
  • 使用netty的bootstrap.connect进行请求

小结

AsyncLoadBalancerAutoConfiguration使用的AsyncClientHttpRequest及其实现类都被标记为废弃,spring 5之后推荐使用webClient。

doc

  • 3.4 Spring WebClient as a Load Balancer Client

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-07-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊reactive streams publisher的doOn方法

本文主要研究一下reactive streams Publisher的doOn方法

12610
来自专栏雪胖纸的玩蛇日常

炫丽的倒计时效果Canvas绘图与动画基础

22820
来自专栏小鹏的专栏

ubuntu下C++如何调用python程序,gdb调试C++代码

Linux下gdb调试C++代码:http://jingyan.baidu.com/article/acf728fd464984f8e410a369.html ...

36190
来自专栏腾讯云流计算

Managing Large State in Apache Flink®: An Intro to Incremental Checkpointing

Apache Flink was purpose-built forstatefulstream processing. Let’s quickly revie...

26250
来自专栏lgp20151222

RestTemplate的异常:Not enough variables available to expand

原因:RestTemplate使用出错,我的情况是不知道这里要求用RestTemplate的使用格式,应该很多人都是这样吧?不过,看了下RestTemplate...

19540
来自专栏一个会写诗的程序员的博客

java.base.jmod

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/jmods$ jmod list java....

19420
来自专栏pydata

python asynchrous network

7410
来自专栏余生开发

echarts太阳分布图-饼图来回穿梭

var dom = document.getElementById("container");

22520
来自专栏闵开慧

运行wordcount时显示Could not obtain block

该文章接上面hadoop运行wordcount时卡住不动,接着下面 hadoop@ubuntu118:~/hadoop-1.0.2$ bi...

372140
来自专栏码匠的流水账

聊聊spring cloud的LoadBalancerAutoConfiguration

本文主要研究一下spring cloud的LoadBalancerAutoConfiguration

24220

扫码关注云+社区

领取腾讯云代金券