刨解OkHttp之访问连接

因为OkHttp能讲的东西太多了,上一篇文章只是讲到了他的设计架构即责任链模式和异步多线程网络访问,这对于OkHttp只是冰山一角,对于一个网络请求框架,最重要的就是网络访问了,为此我们来说一下Okttp网络访问的一些细节。

这个访问分为两个部分,一个部分是与服务器形成连接,另一个部分是与服务器进行交互。与服务器连接的是ConnectInterceptor拦截器,而与服务器交互的是CallServerInterceptor拦截器。我们就来讲一下这两个拦截器吧。

ConnectInterceptor

先看源码:

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

这里看起来很简单,就是给首先StreamAllocation赋值,然后调用newStream()方法,那streamAllocation是什么东西呢?它是整个连接的中心,协调着几个重要的类。后面都会说。 我们看一下newStream()方法:

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
}

通过看代码我们有追溯findHealthyConnection()方法,源码如下:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
}
  
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }

    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) {
        route = selectedRoute;
        return connection;
      }

      route = selectedRoute;
      refusedStreamCount = 0;
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }

    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      Internal.instance.put(connectionPool, result);

      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
}

在findHealthyConnection()里通过while(true)不断调用findConnection()去获取健康可用的RealConnection。RealConnection是与服务器连接的一个socket的连接,有和这个就可以进行三次握手的tcp连接,所以上面的result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);这一行很关键,就是socket连接服务器的关键代码,具体里面的代码如下:

public void connect(
      int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
   //省略代码

    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
          connectSocket(connectTimeout, readTimeout);
        }
        establishProtocol(connectionSpecSelector);
        break;
      } catch (IOException e) {
         //省略代码
      }
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }

private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
      throws IOException {
    Request tunnelRequest = createTunnelRequest();
    HttpUrl url = tunnelRequest.url();
    int attemptedConnections = 0;
    int maxAttempts = 21;
    while (true) {
      if (++attemptedConnections > maxAttempts) {
        throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
      }

      connectSocket(connectTimeout, readTimeout);
      tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);

      if (tunnelRequest == null) break; 

      closeQuietly(rawSocket);
      rawSocket = null;
      sink = null;
      source = null;
    }
  }

private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
}    

private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    if (route.address().sslSocketFactory() == null) {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
      return;
    }

    connectTls(connectionSpecSelector);

    if (protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
      http2Connection = new Http2Connection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .listener(this)
          .build();
      http2Connection.start();
    }
}

不管是直接调用connectSocket()还是connectTunnel(),最终都会调connectSocket()方法然后通过 Platform.get().connectSocket()(rawSocket, route.socketAddress(), connectTimeout);进行Socket连接。Platform.get().connectSocket()对应代码如下:

 public void connectSocket(Socket socket, InetSocketAddress address,
      int connectTimeout) throws IOException {
    socket.connect(address, connectTimeout);
  }

然后有调用了establishProtocol(),起始最主要的就是初始化Http2Connection对象.接着返回StreamAllocation,下一步就是HttpCodec resultCodec = resultConnection.newCodec(client, this);newCodec()源码如下:

public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(client.readTimeoutMillis());
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
}

其实没什么,也就是初始化Http1Codec或Http2Codec对象,这两个类都集成接口类HttpCodec,典型的面向接口编程,我们看一下接口类是什么:

public interface HttpCodec {
  int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
  //写入请求体
  Sink createRequestBody(Request request, long contentLength);
  //写入请求头    
  void writeRequestHeaders(Request request) throws IOException;
  //相当于flush,把请求刷入底层socket
  void flushRequest() throws IOException;
  //相当于flush,把请求输入底层socket并不在发出请求
  void finishRequest() throws IOException;
  //读取响应头
  Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
  //读取响应体
  ResponseBody openResponseBody(Response response) throws IOException;
  //取消请求
  void cancel();
}

有方法知道HttpCodec是网络读写的管理类,而Http1Codec和Http2Codec分别对应Http1和Http2,在后面的CallServerInterceptor就主要用这个类进行操作。最后ConnectInterceptor的RealConnection connection = streamAllocation.connection();只是获取了前面生成的RealConnection,然后通过前一篇介绍的责任链模式传给CallServerInterceptor。

CallServerInterceptor

前面的ConnectInterceptor只是socket连接了服务器,而连接后怎么操作就是CallServerInterceptor了,接着我们看一下其实现方法:

@Override 
public Response intercept(Chain chain) throws IOException {
    //省略代码。。。
    
    //写入请求头
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }
      //写入请求体     
      if (responseBuilder == null) {
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();
    //读取响应头
    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    //读取响应体
    int code = response.code();
    if (forWebSocket && code == 101) {
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
}

这里有一个东西需要讲,就是Socket连接了服务器之后,是通过Okio向服务器发送请求的。再次列取writeRequestHeaders()方法,源码如下:

@Override
public void writeRequestHeaders(Request request) throws IOException {
    String requestLine = RequestLine.get(
        request, streamAllocation.connection().route().proxy().type());
    writeRequest(request.headers(), requestLine);
}

public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
}

而sink就是在ConnectInterceptor已经初始化完成了,就在上面的connectSocket()连接服务器的方法里

  source = Okio.buffer(Okio.source(rawSocket));
  sink = Okio.buffer(Okio.sink(rawSocket));

其中sink是向服务器写数据,而source是获取服务器数据,rawSocket就是我们与服务器保持连接socker,okio我只会点到为止,不然又要开新的一篇讲解了。createRequestBody()的原理和writeRequestHeaders()是一样的。 接着就是接收数据了,读取响应头readResponseHeaders()和上面原理一样的,值得讲的是读取响应体 httpCodec.openResponseBody(response),里面的源码如下:

@Override 
public ResponseBody openResponseBody(Response response) throws IOException {
    Source source = getTransferStream(response);
    return new RealResponseBody(response.headers(), Okio.buffer(source));
}

private Source getTransferStream(Response response) throws IOException {
    if (!HttpHeaders.hasBody(response)) {
      return newFixedLengthSource(0);
    }

    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
      return newChunkedSource(response.request().url());
    }

    long contentLength = HttpHeaders.contentLength(response);
    if (contentLength != -1) {
      return newFixedLengthSource(contentLength);
    }

    return newUnknownLengthSource();
}

其实没有做任何的读取操作,只是ResponseBody封装了headers()获取响应头和Source对象,而Source就可以获取响应体,只是没有马上获取而是封装好传递给上一个拦截器。最后在哪里获取响应体呢, 回到上一篇刚开始最简单的访问网络demo

 Request request = new Request.Builder().url(url).build();
 Response response = client.newCall(request).execute();
 return response.body().string();

我们看一下body().toString()这个方法

public @Nullable ResponseBody body() {
    return body;
}

public final String string() throws IOException {
    BufferedSource source = source();
    try {
      Charset charset = Util.bomAwareCharset(source, charset());
      return source.readString(charset);
    } finally {
      Util.closeQuietly(source);
    }
}   

source就是Okio的读对象对应上面的source = Okio.buffer(Okio.source(rawSocket));,bomAwareCharset是获取字符类型,默认utf-8,调用source.readString(charset);就可以获取他的请求体了,也就是请求内容字符串。closeQuietly()最终这个读对象,整个访问流程也基本结束了。

内容有点多,自身感觉讲解的也仅讲了最主要的部分,很多东西还可以扩展却因为篇幅没说,请见谅。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏yukong的小专栏

【ssm个人博客项目实战05】easy ui datagrid实现数据的分页显示1、数据格式准备工作2、业务层实现3、控制层实现4、前端视图处理

前面一节 我们已经实现博客类别的dao层的实现,其中特别讲解了博客类别的分页的实现,那么现在我们实现了后台的分页,那么前台分页怎么显示呢,这时候我们用到了eas...

2342
来自专栏Java技术分享圈

基于jsp+servlet的javaweb实现最基本的用户注册登陆注销功能

本案例的技术选型主要是jsp+servlet+JavaBean,采用三层架构的分层思想与MVC设计模式结合进行规范开发。

3272
来自专栏Android开发实战

谷歌官方Android应用架构库——LiveData

LiveData 是一个数据持有者类,它持有一个值并允许观察该值。不同于普通的可观察者,LiveData 遵守应用程序组件的生命周期,以便 Observer 可...

1113
来自专栏Android 研究

Android系统启动——5 zyogte进程(Java篇)

上一篇文章,我们知道在AndroidRuntime.cpp的start()函数里面是调用的Zygoteinit类的main()函数,那我们就继续研究

2272
来自专栏Dawnzhang的开发者手册

Tomcat处理一个http请求的过程

假设来自客户的请求为: http://localhost:8080/wsota/wsota_index.jsp

2062
来自专栏Flutter知识集

Flutter 实践 MVVM

在做Android或iOS开发时,经常会了解到MVC,MVP和MVVM。MVVM在移动端一度被非常推崇,虽然也有不少反对的声音,不过MVVM确实是不错的设计架构...

2.4K3
来自专栏Java帮帮-微信公众号-技术文章全总结

Java多线程详解5【面试+工作】

Java多线程详解【面试+工作】 Java线程:新特征-信号量 Java的信号量实际上是一个功能完毕的计数器,对控制一定资源的消费与回收有着很重要的意义,信号量...

41610
来自专栏求索之路

四大组件以及Application和Context的全面理解

1.概述 ? Context抽象结构 2.用处 1.Context的实现类有很多,但是ContextImpl(后称CI)是唯一做具体工作的,其他实现都是对CI做...

4395
来自专栏Spark学习技巧

Spark源码系列之spark2.2的StructuredStreaming使用及源码介绍

一,概述 Structured Streaming是一个可扩展和容错的流处理引擎,并且是构建于sparksql引擎之上。你可以用处理静态数据的方式去处理你的流计...

1.1K7
来自专栏菩提树下的杨过

Silverlight Telerik控件学习:数据录入、数据验证

相信很多人都听说过这句名言:garbage in ,garbage out ! 数据录入不规范(或错误)就象一颗定时炸弹,迟早会给系统带来麻烦,所以在数据录入时...

2836

扫码关注云+社区

领取腾讯云代金券