前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >当Tomcat遇上Netty(续集)

当Tomcat遇上Netty(续集)

作者头像
彤哥
发布2020-05-28 16:43:27
1.9K0
发布2020-05-28 16:43:27
举报

本篇接着上篇,主要讲一下Tomcat与Netty是怎么勾搭上的,过程有点复杂。

Tomcat与Netty是如何衔接起来的?

请看下面这张图:

从下往下看,接收请求的时候走的确实是tomcat,然后通过spring cloud gateway的过滤器链,走到了一个叫作 NettyWriteResponseFilter的过滤器。

再接着往下走,又走到了一个叫作 NettyRoutingFilter的一个过滤器,这个过滤器是干什么的呢?

从名字可以看出,它是用来做路由的,也就是在这里把gateway接收到的请求转发给其它服务。

这里路由的实现其实就是创建一个HttpClient,然后根据配置的路由信息拿到目标地址,然后再向目标服务发送一个Http请求。

我们简单地看一下NettyRoutingFilter里面的实现(跟着我的注释看就好):

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {    // 拿到目标地址,在前面放进去的    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
    String scheme = requestUrl.getScheme();    if (isAlreadyRouted(exchange)            || (!"http".equals(scheme) && !"https".equals(scheme))) {        return chain.filter(exchange);    }    setAlreadyRouted(exchange);
    // 原始请求    ServerHttpRequest request = exchange.getRequest();
    final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());    final String url = requestUrl.toASCIIString();
    HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
    // 构造新的请求头    final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();    filtered.forEach(httpHeaders::set);
    boolean preserveHost = exchange            .getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
    // 获取一个HttpClient    // key1, 此时还在tomcat的线程里    Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)            // 设置请求头            .headers(headers -> {                headers.add(httpHeaders);                // Will either be set below, or later by Netty                headers.remove(HttpHeaders.HOST);                if (preserveHost) {                    String host = request.getHeaders().getFirst(HttpHeaders.HOST);                    headers.add(HttpHeaders.HOST, host);                }            // 发送请求,本文来源于工纵耗彤哥读源码            }).request(method).uri(url).send((req, nettyOutbound) -> {                if (log.isTraceEnabled()) {                    nettyOutbound                            .withConnection(connection -> log.trace("outbound route: "                                    + connection.channel().id().asShortText()                                    + ", inbound: " + exchange.getLogPrefix()));                }                // 发送请求                // key2,此时已经到netty的线程里了                return nettyOutbound.send(request.getBody().map(this::getByteBuf));            }).responseConnection((res, connection) -> {                // response的处理
                // Defer committing the response until all route filters have run                // Put client response as ServerWebExchange attribute and write                // response later NettyWriteResponseFilter                exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);                exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
                ServerHttpResponse response = exchange.getResponse();                // put headers and status so filters can modify the response                HttpHeaders headers = new HttpHeaders();
                res.responseHeaders().forEach(                        entry -> headers.add(entry.getKey(), entry.getValue()));
                String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);                if (StringUtils.hasLength(contentTypeValue)) {                    exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,                            contentTypeValue);                }
                setResponseStatus(res, response);
                // make sure headers filters run after setting status so it is                // available in response                HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(                        getHeadersFilters(), headers, exchange, Type.RESPONSE);
                if (!filteredResponseHeaders                        .containsKey(HttpHeaders.TRANSFER_ENCODING)                        && filteredResponseHeaders                                .containsKey(HttpHeaders.CONTENT_LENGTH)) {                    // It is not valid to have both the transfer-encoding header and                    // the content-length header.                    // Remove the transfer-encoding header in the response if the                    // content-length header is present.                    response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);                }
                exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,                        filteredResponseHeaders.keySet());
                response.getHeaders().putAll(filteredResponseHeaders);
                return Mono.just(res);            });
    Duration responseTimeout = getResponseTimeout(route);    if (responseTimeout != null) {        responseFlux = responseFlux                .timeout(responseTimeout, Mono.error(new TimeoutException(                        "Response took longer than timeout: " + responseTimeout)))                .onErrorMap(TimeoutException.class,                        th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,                                th.getMessage(), th));    }
    // 过滤器链    return responseFlux.then(chain.filter(exchange));}

忽略代码本身的复杂性,整体逻辑就是向目标服务发送请求,并处理响应体。

但是,这里真正发请求的时候是在Netty线程里发送的,同时,处理响应体也同样是在Netty线程中进行的。

而在处理响应之前呢,当然是收到目标服务的响应,在接收目标服务的响应的时候就是在Netty的NioEventLoop中进行的,Netty接收到响应后会创建一个ByteBuf来承载响应的内容,最后,经过一系列的调用就回到了NettyWriteResponseFilter的回调里,在NettyWriteResponseFilter里对响应体进行写出操作,让我们看一下这个类里面的基本内容:

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {    // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added    // until the NettyRoutingFilter is run    // @formatter:off    return chain.filter(exchange)            .doOnError(throwable -> cleanup(exchange))            .then(Mono.defer(() -> {                // 响应的回调
                Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
                if (connection == null) {                    return Mono.empty();                }                if (log.isTraceEnabled()) {                    log.trace("NettyWriteResponseFilter start inbound: "                            + connection.channel().id().asShortText() + ", outbound: "                            + exchange.getLogPrefix());                }                // 响应                ServerHttpResponse response = exchange.getResponse();
                // TODO: needed?                final Flux<DataBuffer> body = connection                        .inbound()                        .receive()                        .retain()                        .map(byteBuf -> wrap(byteBuf, response));
                MediaType contentType = null;                try {                    contentType = response.getHeaders().getContentType();                }                catch (Exception e) {                    if (log.isTraceEnabled()) {                        log.trace("invalid media type", e);                    }                }
                // 写出,本文来源于工纵耗彤哥读源码                return (isStreamingMediaType(contentType)                        ? response.writeAndFlushWith(body.map(Flux::just))                        : response.writeWith(body));            })).doOnCancel(() -> cleanup(exchange));    // @formatter:on}

但是,这里的response是TomcatServerHttpResponse,因为接收请求是通过tomcat接收的,所以,这里的响应是tomcat的。

因此,最后是调用了tomcat的write()方法或者writeAndFlush()方法,此时,已经没有Netty什么事了。

在整个过程中,接收目标服务的响应的时候通过Netty分配了ByteBuf而把这个响应返回给调用者的时候却是走的tomcat,导致这个分配的ByteBuf一直没有释放,所以,出现了内存泄漏。

如果全程都使用Netty的情况下,也会经历上面说到的这些步骤,只不过在最后这里的响应会变成ReactorServerHttpResponse,而不是tomcat的响应。

在ReactorServerHttpResponse里面就会调用到Netty的相关方法,并往Netty的线程池里放一个任务,这个任务是reactor.netty.channel.MonoSendMany.SendManyInner.AsyncFlush:

final class AsyncFlush implements Runnable {    @Override    public void run() {        if (pending != 0) {            ctx.flush();        }    }}

在这个任务里面调用ctx.flush(),这个flush()就是把内容真正地发送出去的方法,发送完了之后,也会把相应的ByteBuf给清理了,也就释放了内存。

好了,让我们用一张图来表示一下整个的过程:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 彤哥读源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本篇接着上篇,主要讲一下Tomcat与Netty是怎么勾搭上的,过程有点复杂。
  • Tomcat与Netty是如何衔接起来的?
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档