前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >你想要的系列:网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点)

你想要的系列:网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点)

作者头像
胡飞洋
发布2020-07-23 16:17:11
1.8K0
发布2020-07-23 16:17:11
举报

在本系列的上一篇文章你想要的系列:网络请求框架OkHttp3全解系列 - (三)拦截器详解1:重试重定向、桥、缓存(重点)中,我们分析了OkHttp拦截器链中的前三个拦截器:RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor,它们在请求建立连接之前做了一些预处理。

如果请求经过这三个拦截器后,要继续往下传递,说明是需要进行网络请求的(缓存不能直接满足),也就是今天要分析的内容——剩下的两个拦截器:ConnectInterceptor、CallServerInterceptor,分别负责 连接建立请求服务读写

背景 - HTTP协议发展

在讲解拦截器之前,我们有必要了解http协议相关背景知识,因为okhttp的网络连接正是基于此实现的。HTTP协议经历了以下三个版本阶段。

HTTP1.0

HTTP1.0中,一次请求 会建立一个TCP连接,请求完成后主动断开连接。这种方法的好处是简单,各个请求互不干扰。 但每次请求都会经历 3次握手、2次或4次挥手的连接建立和断开过程——极大影响网络效率和系统开销。

http1.0

HTTP1.1

HTTP1.1中,解决了HTTP1.0中连接不能复用的问题,支持持久连接——使用keep-alive机制:一次HTTP请求结束后不会立即断开TCP连接,如果此时有新的HTTP请求,且其请求的Host同上次请求相同,那么会直接复用TCP连接。这样就减少了建立和关闭连接的消耗和延迟。keep-alive机制在HTTP1.1中是默认打开的——即在请求头添加:connection:keep-alive。(keep-alive不会永久保持连接,它有一个保持时间,可在不同的服务器软件(如Apache)中设定这个时间)

http1.1

HTTP2.0

HTTP1.1中,连接的复用是串行的:一个请求建立了TCP连接,请求完成后,下一个相同host的请求继续使用这个连接。 但客户端想 同时 发起多个并行请求,那么必须建立多个TCP连接。将会产生网络延迟、增大网路开销。

并且HTTP1.1不会压缩请求和响应报头,导致了不必要的网络流量;HTTP1.1不支持资源优先级导致底层TCP连接利用率低下。在HTTP2.0中,这些问题都会得到解决,HTTP2.0主要有以下特性

  • 新的二进制格式(Binary Format):http/1.x使用的是明文协议,其协议格式由三部分组成:request line,header,body,其协议解析是基于文本,但是这种方式存在天然缺陷,文本的表现形式有多样性,要做到健壮性考虑的场景必然很多,二进制则不同,只认0和1的组合;基于这种考虑,http/2.0的协议解析决定采用二进制格式,实现方便且健壮
  • 多路复用(MultiPlexing):即连接共享,使用streamId用来区分请求,一个request对应一个stream并分配一个id,这样一个TCP连接上可以有多个stream,每个stream的frame可以随机的混杂在一起,接收方可以根据stream id将frame再归属到各自不同的request里面
  • 优先级和依赖(Priority、Dependency):每个stream都可以设置优先级和依赖,优先级高的stream会被server优先处理和返回给客户端,stream还可以依赖其它的sub streams;优先级和依赖都是可以动态调整的,比如在APP上浏览商品列表,用户快速滑动到底部,但是前面的请求已经发出,如果不把后面的优先级设高,那么当前浏览的图片将会在最后展示出来,显然不利于用户体验
  • header压缩:http2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小
  • 重置连接:很多APP里都有停止下载图片的需求,对于http1.x来说,是直接断开连接,导致下次再发请求必须重新建立连接;http2.0引入RST_STREAM类型的frame,可以在不断开连接的前提下取消某个request的stream

其中涉及了两个新的概念:

  • 数据流-stream:基于TCP连接之上的逻辑双向字节流,用于承载双向消息,对应一个请求及其响应。客户端每发起一个请求就建立一个数据流,后续该请求及其响应的所有数据都通过该数据流传输。每个数据流都有一个唯一的标识符和可选的优先级信息。
  • 帧-frame:HTTP/2的最小数据切片单位,承载着特定类型的数据,例如 HTTP 标头、消息负载,等等。 来自不同数据流的帧可以交错发送,然后再根据每个帧头的数据流标识符重新组装,从而在宏观上实现了多个请求或响应并行传输的效果。

http2.0

这里的 多路复用机制 就实现了 在同一个TCP连接上 多个请求 并行执行。

无论是HTTP1.1的Keep-Alive机制还是HTTP2.0的多路复用机制,在实现上都需要引入连接池来维护网络连接。下面就开始分析 OkHttp中的连接池实现——连接拦截器ConnectInterceptor

ConnectInterceptor

连接拦截器ConnectInterceptor代码如下:

代码语言:javascript
复制
//打开到目标服务的连接、处理下一个拦截器
public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

    return realChain.proceed(request, transmitter, exchange);
  }
}

代码很少,主要是使用transmitter.newExchange获取Exchange实例,并作为参数调用拦截器链的proceed的方法。注意到前面分析过的拦截器调用的proceed方法是一个参数的,而这里是三个参数的。这是因为下一个拦截器(如果没有配置网络拦截器的话,就是CallServerInterceptor,也是最后一个)需要进行真正的网络IO操作,而 Exchange(意为交换)主要作用就是真正的IO操作:写入请求、读取响应(会在下一个拦截器做介绍)。

实际上获取Exchange实例的逻辑处理都封装在Transmitter中了。前面的文章提到过Transmitter,它是“发射器”,是把 请求 从应用端 发射到 网络层,它持有请求的 连接、请求、响应 和 流,一个请求对应一个Transmitter实例,一个数据流。下面就看下它的newExchange方法:

代码语言:javascript
复制
  Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if (exchange != null) {
        throw new IllegalStateException("cannot make a new request because the previous response "
            + "is still open: please call response.close()");
      }
    }

    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      return result;
    }
  }

若是第一次请求,前面两个if是没走进去的。接着看到使用exchangeFinder的find方法获取到了ExchangeCodec实例,然后作为参数构建了Exchange实例,并返回。嗯,看起来也很简单的样子。 注意到这个方法里涉及了连接池RealConnectionPool、交换寻找器ExchangeFinder、交换编解码ExchangeCodec、交换管理Exchange这几个类(翻译成这样尽力了?,意会吧)。

  • RealConnectionPool,连接池,负责管理请求的连接,包括新建、复用、关闭,理解上类似线程池。
  • ExchangeCodec,接口类,负责真正的IO操作—写请求、读响应,实现类有Http1ExchangeCodec、Http2ExchangeCodec,分别对应HTTP1.1协议、HTTP2.0协议。
  • Exchange,管理IO操作,可以理解为 数据流,是ExchangeCodec的包装,增加了事件回调;一个请求对应一个Exchange实例。传给下个拦截器CallServerInterceptor使用。
  • ExchangeFinder,(从连接池中)寻找可用TCP连接,然后通过连接得到ExchangeCodec。

ExchangeFinder

ExchangeFinder的作用从名字就可以看出——Exchange寻找者,本质是为请求寻找一个TCP连接。如果已有可用连接就直接使用,没有则打开一个新的连接。 一个网络请求的执行,需要先有一个指向目标服务的TCP连接,然后再进行写请求、读响应的IO操作。ExchangeFinder是怎么寻找的呢?继续往下看~

我们先看exchangeFinder初始化的地方:

代码语言:javascript
复制
  public void prepareToConnect(Request request) {
    ...
    this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()),
        call, eventListener);
  }

看到这里应该会想起上一篇文章中分析RetryAndFollowUpInterceptor时提到过,prepareToConnect这个方法作用是连接准备,就是创建了ExchangeFinder实例。主要到传入的参数有connectionPool、createAddress方法返回的Address、call、eventListener。connectionPool是连接池,稍后分析,先看下createAddress方法:

代码语言:javascript
复制
  private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    if (url.isHttps()) {
      sslSocketFactory = client.sslSocketFactory();
      hostnameVerifier = client.hostnameVerifier();
      certificatePinner = client.certificatePinner();
    }

    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
        sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
        client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
  }

使用url和client配置 创建一个Address实例。Address意思是指向服务的连接的地址,可以理解为请求地址及其配置。Address有一个重要作用:相同Address的HTTP请求 共享 相同的连接。这可以作为前面提到的 HTTP1.1和HTTP2.0 复用连接 的请求的判断。

回头看exchangeFinder的find方法

代码语言:javascript
复制
  public ExchangeCodec find(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //找到一个健康的连接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      //利用连接实例化ExchangeCodec对象,如果是HTTP/2返回Http2ExchangeCodec,否则返回Http1ExchangeCodec
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw new RouteException(e);
    }
  }

主要就是通过findHealthyConnection方法获取连接RealConnection实例,然后用RealConnection的newCodec方法获取了ExchangeCodec实例,如果是HTTP/2返回Http2ExchangeCodec,否则返回Http1ExchangeCodec,然后返回。

findHealthyConnection方法名透露着 就是去寻找可用TCP连接的,而我们猜测这个方法内部肯定和连接池ConnectionPool有紧密的关系。接着跟进findHealthyConnection方法:

代码语言:javascript
复制
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      //找连接
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // 是新连接 且不是HTTP2.0 就不用体检
      synchronized (connectionPool) {
        if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
          return candidate;
        }
      }
      // 体检不健康,继续找
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
          //标记不可用
        candidate.noNewExchanges();
        continue;
      }

      return candidate;
    }
  }

循环寻找连接:如果是不健康的连接,标记不可用(标记后会移除,后面讲连接池会讲到),然后继续找。健康是指连接可以承载新的数据流,socket是连接状态。我们跟进findConnection方法,看看到底是怎么 找连接 的:

代码语言:javascript
复制
  //为承载新的数据流 寻找 连接。寻找顺序是 已分配的连接、连接池、新建连接
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      //请求已被取消(Call的cancel方法->transmitter的cancel方法),抛异常
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; 

      // 尝试使用 已给数据流分配的连接.(例如重定向请求时,可以复用上次请求的连接)
      releasedConnection = transmitter.connection;
      //有已分配的连接,但已经被限制承载新的数据流,就尝试释放掉(如果连接上已没有数据流),并返回待关闭的socket。
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      if (transmitter.connection != null) {
        // 不为空,说明上面没有释放掉,那么此连接可用
        result = transmitter.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // 没有已分配的可用连接,就尝试从连接池获取。(连接池稍后详细讲解)
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry;//有可尝试的路由
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    closeQuietly(toClose);//(如果有)关闭待关闭的socket

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);//(如果有)回调连接释放事件
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);//(如果有)回调(从连接池)获取连接事件
    }
    if (result != null) {
      // 如果有已分配可用连接 或 从连接池获取到连接,结束!  没有 就走下面的新建连接过程。
      return result;
    }

    // 如果需要路由信息,就获取。是阻塞操作
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        //现在有了IP地址,再次尝试从连接池获取。可能会因为连接合并而匹配。(这里传入了routes,上面的传的null)
        routes = routeSelection.getAll();
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }
      //第二次连接池也没找到,就新建连接
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    // 如果第二次从连接池的尝试成功了,结束,因为连接池中的连接是已经和服务器建立连接的
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // 第二次没成功,就把新建的连接,进行TCP + TLS 握手,与服务端建立连接. 是阻塞操作
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());//从失败名单中移除

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // 最后一次尝试从连接池获取,注意最后一个参数为true,即要求 多路复用(http2.0)
      //意思是,如果本次是http2.0,那么为了保证 多路复用性,(因为上面的握手操作不是线程安全)会再次确认连接池中此时是否已有同样连接
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // 如果获取到,就关闭我们创建里的连接,返回获取的连接
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;

        // 那么这个刚刚连接成功的路由 就可以 用作下次 尝试的路由
        nextRouteToTry = selectedRoute;
      } else {
        //最后一次尝试也没有的话,就把刚刚新建的连接存入连接池
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);//把连接赋给transmitter
      }
    }
    closeQuietly(socket);//如果刚刚建立的连接没用到,就关闭

    eventListener.connectionAcquired(call, result);
    return result;
  }

代码看着很长,已经加了注释,方法目的就是 为 承载新的数据流 寻找 连接。寻找顺序是 已分配的连接、连接池、新建连接。梳理如下:

  1. 首先会尝试使用 已给数据流分配的连接。(已分配连接的情况例如重定向时的再次请求,说明上次已经有了连接)
  2. 若没有 已分配的可用连接,就尝试从连接池中 匹配获取。因为此时没有路由信息,所以匹配条件:address一致——host、port、代理等一致,且 匹配的连接可以接受新的数据流。
  3. 若从连接池没有获取到,则取下一个代理的路由信息(多个Route,即多个IP地址),再次尝试从连接池获取,此时可能因为连接合并而匹配到。
  4. 若第二次也没有获取到,就创建RealConnection实例,进行TCP + TLS 握手,与服务端建立连接。
  5. 此时为了确保Http2.0连接的多路复用性,会第三次从连接池匹配。因为新建立的连接的握手过程是非线程安全的,所以此时可能连接池新存入了相同的连接。
  6. 第三次若匹配到,就使用已有连接,释放刚刚新建的连接;若未匹配到,则把新连接存入连接池并返回。

流程图如下:

findConnection方法流程

看到这里,小盆友,你是否有很多问号?

  • 开始若有了已分配的连接,但已经被限制承载新的数据流,是如何释放的呢?
  • 代理路由信息是如何获取的呢?
  • 如何从连接池获取连接的?三次有什么不同?

没关系,慢慢来,我们先看第二个问号,代理路由信息的获取。

RouteSelector

先来看下Route类:

代码语言:javascript
复制
public final class Route {
  final Address address;
  final Proxy proxy;//代理
  final InetSocketAddress inetSocketAddress;//连接目标地址

  public Route(Address address, Proxy proxy, InetSocketAddress inetSocketAddress) {
    ...
    this.address = address;
    this.proxy = proxy;
    this.inetSocketAddress = inetSocketAddress;
  }

Route,通过代理服务器信息 proxy、连接目标地址 InetSocketAddress 来描述一条 连接服务器的具体路由

  • proxy代理:可以为客户端显式配置代理服务器。否则,将使用ProxySelector代理选择器。可能会返回多个代理。
  • IP地址:无论是直连还是代理,打开socket连接都需要IP地址。 DNS服务可能返回多个IP地址尝试。

上面分析的findConnection方法中是使用routeSelection.getAll()获取Route集合routes,而routeSelection是通过routeSelector.next()获取,routeSelector是在ExchangeFinder的构造方法内创建的,也就是说routeSelector在RetryAndFollowUpInterceptor中就创建了,那么我们看下RouteSelector:

代码语言:javascript
复制
  RouteSelector(Address address, RouteDatabase routeDatabase, Call call,
      EventListener eventListener) {
    this.address = address;
    this.routeDatabase = routeDatabase;//连接池中的路由黑名单(连接失败的路由)
    this.call = call;
    this.eventListener = eventListener;

    resetNextProxy(address.url(), address.proxy());
  }
  //收集代理服务器
  private void resetNextProxy(HttpUrl url, Proxy proxy) {
    if (proxy != null) {
      // 若指定了代理,那么就这一个。(就是初始化OkhttpClient时配置的)
      proxies = Collections.singletonList(proxy);
    } else {
      //没配置就使用ProxySelector获取代理(若初始化OkhttpClient时没有配置ProxySelector,会使用系统默认的) 
      List<Proxy> proxiesOrNull = address.proxySelector().select(url.uri());
      proxies = proxiesOrNull != null && !proxiesOrNull.isEmpty()
          ? Util.immutableList(proxiesOrNull)
          : Util.immutableList(Proxy.NO_PROXY);
    }
    nextProxyIndex = 0;
  }

注意到RouteSelector的构造方法中传入了routeDatabase,是连接失败的路由黑名单(后面连接池也会讲到),并使用resetNextProxy方法获取代理服务器列表:若没有指定proxy就是用ProxySelector获取proxy列表(若没有配置ProxySelector会使用系统默认)。接着看next方法:

代码语言:javascript
复制
  //收集代理的路由信息
  public Selection next() throws IOException {
    if (!hasNext()) {//还有下一个代理
      throw new NoSuchElementException();
    }

    List<Route> routes = new ArrayList<>();
    while (hasNextProxy()) {
      Proxy proxy = nextProxy();
      //遍历proxy经DNS后的所有IP地址,组装成Route
      for (int i = 0, size = inetSocketAddresses.size(); i < size; i++) {
        Route route = new Route(address, proxy, inetSocketAddresses.get(i));
        if (routeDatabase.shouldPostpone(route)) {//此路由在黑名单中,存起来最后尝试
          postponedRoutes.add(route);
        } else {
          routes.add(route);
        }
      }

      if (!routes.isEmpty()) {
        break;
      }
    }

    if (routes.isEmpty()) {
      // 若没有拿到路由,就尝试上面存的黑名单的路由
      routes.addAll(postponedRoutes);
      postponedRoutes.clear();
    }
    //routes包装成Selection返回
    return new Selection(routes);
  }

next方法主要就是获取下一个代理Proxy的代理信息,即多个路由。具体是在resetNextInetSocketAddress方法中实现,主要是对代理服务地址进行DNS解析获取多个IP地址,这里就不展开了。

好了,到这里 就解决了第二个问号。其他两个问号涉及 连接池RealConnectionPool、连接RealConnection,下面就来瞅瞅。

ConnectionPool

ConnectionPool,即连接池,用于管理http1.1/http2.0连接重用,以减少网络延迟。相同Address的http请求可以共享一个连接,ConnectionPool就是实现了连接的复用。

代码语言:javascript
复制
public final class ConnectionPool {
  final RealConnectionPool delegate;
  //最大空闲连接数5,最大空闲时间5分钟
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
  }
  //返回空闲连接数
  public int idleConnectionCount() {
    return delegate.idleConnectionCount();
  }
  //返回池子中的连接数
  public int connectionCount() {
    return delegate.connectionCount();
  }
  //关闭并移除所以空闲连接
  public void evictAll() {
    delegate.evictAll();
  }
}

ConnectionPool看起来比较好理解,默认配置是最大空闲连接数5,最大空闲时间5分钟(即一个连接空闲时间超过5分钟就移除),我们也可以在初始化okhttpClient时进行不同的配置。需要注意的是ConnectionPool是用于应用层,实际管理者是RealConnectionPool。RealConnectionPool是okhttp内部真实管理连接的地方。

连接池对连接的管理无非是 存、取、删,上面的两个问号分别对应 删、取,跟进RealConnectionPool我们一个个看:

代码语言:javascript
复制
  private final Deque<RealConnection> connections = new ArrayDeque<>();

  private final Runnable cleanupRunnable = () -> {
    //循环清理
    while (true) {
      //清理
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (RealConnectionPool.this) {
          try {
            //下一次清理之前的等待
            RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
  //存
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

connections是用于存连接的队列Deque。看到在add之前 使用线程池executor执行了cleanupRunnable,意思是清理连接,为啥要清理呢?上面提到过 连接池有 最大空闲连接数、最大空闲时间的限制,所以不满足时是要进行清理的。并且注意到清理是一个循环,并且下一次清理前要等待waitNanos时间,啥意思呢?我们看下cleanup方法:

代码语言:javascript
复制
  long cleanup(long now) {
    int inUseConnectionCount = 0;//正在使用的连接数
    int idleConnectionCount = 0;//空闲连接数
    RealConnection longestIdleConnection = null;//空闲时间最长的连接
    long longestIdleDurationNs = Long.MIN_VALUE;//最长的空闲时间

    //遍历连接:找到待清理的连接, 找到下一次要清理的时间(还未到最大空闲时间)
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //若连接正在使用,continue,正在使用连接数+1
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
        //空闲连接数+1
        idleConnectionCount++;

        // 赋值最长的空闲时间和对应连接
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }
      //若最长的空闲时间大于5分钟 或 空闲数 大于5,就移除并关闭这个连接
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // else,就返回 还剩多久到达5分钟,然后wait这个时间再来清理
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        //连接没有空闲的,就5分钟后再尝试清理.
        return keepAliveDurationNs;
      } else {
        // 没有连接,不清理
        cleanupRunning = false;
        return -1;
      }
    }
    //关闭移除的连接
    closeQuietly(longestIdleConnection.socket());

    //关闭移除后 立刻 进行下一次的 尝试清理
    return 0;
  }

思路还是很清晰的:

  • 有空闲连接的话,如果最长的空闲时间大于5分钟 或 空闲数 大于5,就移除关闭这个最长空闲连接;如果 空闲数 不大于5 且 最长的空闲时间不大于5分钟,就返回到5分钟的剩余时间,然后等待这个时间再来清理。
  • 没有空闲连接就等5分钟后再尝试清理。
  • 没有连接不清理。

其中判断连接正在使用的方法pruneAndGetAllocationCount我们来看下:

代码语言:javascript
复制
  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    //连接上的数据流,弱引用列表
    List<Reference<Transmitter>> references = connection.transmitters;
    for (int i = 0; i < references.size(); ) {
      Reference<Transmitter> reference = references.get(i);
      if (reference.get() != null) {
        i++;
        continue;
      }

      // 到这里,transmitter是泄漏的,要移除,且此连接不能再承载新的数据流(泄漏的原因就是下面的message)
      TransmitterReference transmitterRef = (TransmitterReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
      references.remove(i);
      connection.noNewExchanges = true;

      //连接因为泄漏没有数据流了,那么可以立即移除了。所以设置 开始空闲时间 是5分钟前(厉害厉害!)
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }
    //返回连接上的数据流数量,大于0说明正在使用。
    return references.size();
  }

逻辑注释已经标明了,很好理解。其中connection.transmitters,表示在此连接上的数据流,transmitters size大于1即表示多个请求复用此连接。

另外,在findConnection中,使用connectionPool.put(result)存连接后,又调用transmitter.acquireConnectionNoEvents方法,瞅下:

代码语言:javascript
复制
  void acquireConnectionNoEvents(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();
    this.connection = connection;
    connection.transmitters.add(new TransmitterReference(this, callStackTrace));
  }

先把连接赋给transmitter,表示数据流transmitter依附在这个connection上;然后connection.transmitters add 这个transmitter的弱引用,connection.transmitters表示这个连接承载的所有数据流,即承载的所有请求。

好了,存 讲完了,主要就是把连接存入队列,同时开始循环尝试清理过期连接。

代码语言:javascript
复制
  //为transmitter 从连接池 获取 对应address的连接。若果routes不为空,可能会因为 连接合并(复用) 而获取到HTTP/2连接。
  boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
      @Nullable List<Route> routes, boolean requireMultiplexed) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (requireMultiplexed && !connection.isMultiplexed()) continue;
      if (!connection.isEligible(address, routes)) continue;
      transmitter.acquireConnectionNoEvents(connection);
      return true;
    }
    return false;
  }

存的方法名是put,但你发现 取 的方法名却不是get,transmitterAcquirePooledConnection意思是 为transmitter 从连接池 获取连接,实际上transmitter就代表一个数据流,也就是一个http请求。注意到,在遍历中 经过判断后也是transmitter的acquireConnectionNoEvents方法,即把匹配到的connection赋给transmitter。所以方法名还是很生动的。

继续看是如何匹配的:如果requireMultiplexed为false,即不是多路复用(不是http/2),那么就要看Connection的isEligible方法了,isEligible方法返回true,就代表匹配成功:

代码语言:javascript
复制
  //用于判断 连接 是否 可以承载指向address的数据流
  boolean isEligible(Address address, @Nullable List<Route> routes) {
    //连接不再接受新的数据流,false
    if (transmitters.size() >= allocationLimit || noNewExchanges) return false;

    //匹配address中非host的部分
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    //匹配address的host,到这里也匹配的话,就return true
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }

    //到这里hostname是没匹配的,但是还是有机会返回true:连接合并
    // 1. 连接须是 HTTP/2.
    if (http2Connection == null) return false;

    // 2. IP 地址匹配
    if (routes == null || !routeMatchesAny(routes)) return false;

    // 3. 证书匹配
    if (address.hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
    if (!supportsUrl(address.url())) return false;

    // 4. 证书 pinning 匹配.
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

  private boolean routeMatchesAny(List<Route> candidates) {
    for (int i = 0, size = candidates.size(); i < size; i++) {
      Route candidate = candidates.get(i);
      if (candidate.proxy().type() == Proxy.Type.DIRECT
          && route.proxy().type() == Proxy.Type.DIRECT
          && route.socketAddress().equals(candidate.socketAddress())) {
        return true;
      }
    }
    return false;
  }

取的过程就是 遍历连接池,进行地址等一系列匹配,到这里第三个问号也解决了。

代码语言:javascript
复制
  //移除关闭空闲连接
  public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.transmitters.isEmpty()) {
          connection.noNewExchanges = true;
          evictedConnections.add(connection);
          i.remove();
        }
      }
    }

    for (RealConnection connection : evictedConnections) {
      closeQuietly(connection.socket());
    }
  }

遍历连接池,如果连接上的数据流是空,那么就从连接池移除并且关闭。

我们回过头看下Transmitter的releaseConnectionNoEvents方法,也就第一个问号,如果连接不再接受新的数据流,就会调用这个方法:

代码语言:javascript
复制
  //从连接上移除transmitter.
  @Nullable Socket releaseConnectionNoEvents() {
    assert (Thread.holdsLock(connectionPool));

    int index = -1;
    //遍历 此数据流依附的连接 上的所有数据流,找到index
    for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
      Reference<Transmitter> reference = this.connection.transmitters.get(i);
      if (reference.get() == this) {
        index = i;
        break;
      }
    }

    if (index == -1) throw new IllegalStateException();
    //transmitters移除此数据流
    RealConnection released = this.connection;
    released.transmitters.remove(index);
    this.connection = null;
    //如果连接上没有有数据流了,就置为空闲(等待清理),并返回待关闭的socket
    if (released.transmitters.isEmpty()) {
      released.idleAtNanos = System.nanoTime();
      if (connectionPool.connectionBecameIdle(released)) {
        return released.socket();
      }
    }

    return null;
  }

主要就是尝试释放连接,连接上没有数据流就关闭socket等待被清理。

好了,到这里连接池的管理就分析完了。

从连接的查找 到 连接池的管理,就是ConnectInterceptor的内容了。

CallServerInterceptor

哎呀,终于到最后一个拦截器了!

请求服务拦截器,也就是真正地去进行网络IO读写了——写入http请求的header和body数据、读取响应的header和body。

上面ConnectInterceptor主要介绍了如何 寻找连接 以及 连接池如何管理连接。在获取到连接后,调用了RealConnection的newCodec方法ExchangeCodec实例,然后使用ExchangeCodec实例创建了Exchange实例传入CallServerInterceptor了。上面提到过ExchangeCodec负责请求和响应的IO读写,我们先来看看ExchangeCodec创建过程——RealConnection的newCodec方法:

代码语言:javascript
复制
  ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
    if (http2Connection != null) {
      return new Http2ExchangeCodec(client, this, chain, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1ExchangeCodec(client, this, source, sink);
    }
  }

http2Connection不为空就创建Http2ExchangeCodec,否则是Http1ExchangeCodec。而http2Connection的创建是连接进行TCP、TLS握手的时候,即在RealConnection的connect方法中,具体就是connect方法中调用的establishProtocol方法:

代码语言:javascript
复制
  private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
      int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
    //针对http请求,如果配置的协议包含Protocol.H2_PRIOR_KNOWLEDGE,则开启Http2连接
    if (route.address().sslSocketFactory() == null) {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        socket = rawSocket;
        protocol = Protocol.H2_PRIOR_KNOWLEDGE;
        startHttp2(pingIntervalMillis);
        return;
      }

      socket = rawSocket;
      protocol = Protocol.HTTP_1_1;
      return;
    }
    //针对https请求,会在TLS握手后,根据平台获取协议(),如果协议是Protocol.HTTP_2,则开启Http2连接
    eventListener.secureConnectStart(call);
    connectTls(connectionSpecSelector);
    eventListener.secureConnectEnd(call, handshake);

    if (protocol == Protocol.HTTP_2) {
      startHttp2(pingIntervalMillis);
    }
  }

  private void startHttp2(int pingIntervalMillis) throws IOException {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)
        .listener(this)
        .pingIntervalMillis(pingIntervalMillis)
        .build();
    http2Connection.start();
  }

好了,到这里不再深入了,继续了解可以参考HTTP 2.0与OkHttp。那么到这里,ExchangeCodec已经创建了,然后又包装成Exchange,最后传入了CallServerInterceptor。

下面就来看看这最后一个拦截器:

代码语言:javascript
复制
public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();//上个拦截器传入的exchange
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    //写请求头
    exchange.writeRequestHeaders(request);

    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    //含body的请求
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // 若请求头包含 "Expect: 100-continue" , 就会等服务端返回含有 "HTTP/1.1 100 Continue"的响应,然后再发送请求body. 
      //如果没有收到这个响应(例如收到的响应是4xx),那就不发送body了。
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true);
      }
      //responseBuilder为null说明服务端返回了100,也就是可以继续发送body了
      if (responseBuilder == null) {
        if (request.body().isDuplex()) {//默认是false不会进入
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else {
          // 满足了 "Expect: 100-continue" ,写请求body
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
       //没有满足 "Expect: 100-continue" ,请求发送结束
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection();
        }
      }
    } else {
     //没有body,请求发送结束
      exchange.noRequestBody();
    }

    //请求发送结束
    if (request.body() == null || !request.body().isDuplex()) {
      exchange.finishRequest();
    }
    //回调 读响应头开始事件(如果上面没有)
    if (!responseHeadersStarted) {
      exchange.responseHeadersStart();
    }
    //读响应头(如果上面没有)
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false);
    }
    //构建response
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      //这里服务端又返回了个100,就再尝试获取真正的响应()
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }
    //回调读响应头结束
    exchange.responseHeadersEnd(response);
    //这里就是获取响应body了
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }
    //请求头中Connection是close,表示请求完成后要关闭连接
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }
    //204(无内容)、205(充值内容),body应该是空
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
}

你会发现,整个内容就是前面说的一句话:写入http请求的header和body、读取响应的header和body。这里就不再解释了。

这里我们可以看到,无论写请求还是读响应,都是使用Exchange对应的方法。上面也提到过Exchange理解上是对ExchangeCodec的包装,这写方法内部除了事件回调和一些参数获取外,核心工作都由 ExchangeCodec 对象完成,而 ExchangeCodec实际上利用的是 Okio,而 Okio 实际上还是用的 Socket。

ExchangeCodec的实现类 有对应Http1.1的Http1ExchangeCodec 和 对应Http2.0的Http2ExchangeCodec。其中Http2ExchangeCodec是使用Http2.0中 数据帧 的概念完成请求响应的读写。关于Http1ExchangeCodec、Http2ExchangeCodec具体实现原理涉及okio这不再展开。

最后一点,CallServerInterceptor的intercept方法中没有调用连接器链Chain的proceed方法,因为这是最后一个拦截器啦!

好了,到这里最后一个拦截器也分析完啦!

总结

本篇分析了ConnectInterceptor、CallServerInterceptor两个拦截器的作用和原理。ConnectInterceptor负责连接的获取,其中涉及到连接池的概念;CallServerInterceptor是真正的网络IO读写。ConnectInterceptor涉及的内容较多,它是Okhttp的核心。 结合上一篇,我们已经分析完了Okhttp内部所有的拦截器,最后给出Okhttp的整体架构图:

okhttp

到这里,Okhttp源码解析部分就真的结束了,可真是一个漫长的过程! 从使用方式到工作流程,再到具体拦截器,掌握了这四篇文章的内容,应该说得上对Okhttp是比较熟悉了。这里还计划会出第五篇终章,来介绍一些Okhttp的常见问题和高级使用方式,敬请期待!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 胡飞洋 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景 - HTTP协议发展
    • HTTP1.0
      • HTTP1.1
        • HTTP2.0
        • ConnectInterceptor
          • ExchangeFinder
            • RouteSelector
              • ConnectionPool
              • CallServerInterceptor
              • 总结
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档