首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >OkHttp源码深入解读

OkHttp源码深入解读

作者头像
用户2929716
发布2018-08-23 13:34:16
1.6K0
发布2018-08-23 13:34:16
举报
文章被收录于专栏:流媒体流媒体

简介

目前在HTTP协议请求库中,OKHttp应当是非常火的,使用也非常的简单。网上有很多文章写了关于OkHttp的特点以及使用,而本章主要带领大家阅读OkHttp的源码,让大家对OKhttp的工作原理有所了解,当然源码的代码量是非常大的,这里我只是抓住主线和重点部分,至于细节部分,大家随着我抛出的线来跟进基本是没什么问题的。这篇文章要干嘛,引用一句话:

read the fucking source code

目录:
  • OkHttp介绍
  • 粗绘请求流程
  • RealCall方法execute
  • getResponseWithInterceptorChain调用链
  • RetryAndFollowUpInterceptor
  • ConnectInterceptor获取连接
  • CallServerInterceptor网络请求
  • RealConnection
  • StreamAllocation
  • HttpCodec(Http1Codec)
  • 同步/异步请求

OkHttp介绍:

特点:

  • 支持连接同一地址的连接共享同一个socket(前提服务端支持)
  • 支持Socket连接池,减少请求延迟
  • 使用拦截器模式,将流程拆分
  • 透明的GZIP压缩

粗绘请求流程

注意:这里我选择OkHttp源码版本是 3.8.0。为了方便大家能够和文章同步,最好保持版本一致,我看过老版本和新的版本还是有点不同的。

官网给出的示例

OkHttpClient client = new OkHttpClient();

String run(String url) throws IOException {
  Request request = new Request.Builder()
      .url(url)
      .build();

  Response response = client.newCall(request).execute();
  return response.body().string();
}

我们就从这里入口,来一步一步的跟进。

  1. 首先是创建一个OkHttpClient,Http请求工厂,也就是只要需要发Http请求,那都得找他。内部当然后很多的成员变量和方法,这里我们先不做介绍,等用到时再解释。
  2. 我们继续看client.newCall(request)。找到源码
  @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }

很简单,创建了一个RealCall,这里我就称为一个请求。Request不说大家能理解,里面封装了各种请求的信息。创建过程也很简单,做一些成员变量赋值和初始化。

  RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    final EventListener.Factory eventListenerFactory = client.eventListenerFactory();

    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);

    // TODO(jwilson): this is unsafe publication and not threadsafe.
    this.eventListener = eventListenerFactory.create(this);
  }

这里注意retryAndFollowUpInterceptor;变量,后面会用到。

  1. 调用了RealCall的execute()方法并返回Response结果。

RealCall方法execute

前面我们知道了大致的请求流程,下面我们重点看

  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }
  1. 首先我们发现try的前后调用了Dispatcher的方法:
client.dispatcher().executed(this);
client.dispatcher().finished(this);

分别是将Call加入到Dispatcher中的同步队列中,结束后,移除队列。

  1. 调用getResponseWithInterceptorChain获取Response。

接下来我们就重点看getResponseWithInterceptorChain方法

getResponseWithInterceptorChain调用链

okhttp.png

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

这里的代码就很关键了,设计的很巧妙。可能有点绕,这里我讲一下几个关键的类。

Chain
  1. 获取Request
  2. 执行proceed
RealInterceptorChain

Chain的实现

  1. 包含了完成请求需要的类,包括StreamAllocation、HttpCodec、RealConnection、Request等。这里必要重要的就是可以实现了Chain的request()来获取Request。
  2. 控制Interceptor的调用,调用Interceptor的拦截方法intercept后,就封装下一个RealInterceptorChain并指定index。声明下一个将要被调用的Interceptor。这部分逻辑主要在proceed方法中。我们看核心代码
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

首先会获取当前index的Interceptor。然后执行对应的intercept

方法。同时出入的参数是新创建的RealInterceptorChain。而新创建的RealInterceptorChain对应的index+1。如果执行新创建的RealInterceptorChain的proceed方法,那么interceptors的第index+1个Interceptor的intercept会被执行。依次循环下去。

总结: RealInterceptorChain就是对请求中个中重要对象的封装,执行Interceptor的intercept

的调用,确定下一个RealInterceptorChain。保证所有的Interceptor依次执行intercept

Interceptor

前面讲到了RealInterceptorChain会执行Interceptor的intercept方法,同时传入下一个RealInterceptorChain。那么intercept方法究竟做了什么事呢,因为Interceptor的实现很多,这里我们挑一个系统的实现类看看,比如:BridgeInterceptor,这个代码虽然长,但逻辑想对简单

  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

里面逻辑我们现在可能还看不懂,我们看中间最核心的一句话,。Response networkResponse = chain.proceed(requestBuilder.build());有没有觉得顿时豁然开朗。realChain是传入的参数,而执行proceed方法,就又回到了前面我们讲RealInterceptorChain的流程。那前后RealInterceptorChain有什么区别呢?那就是index在不断的增加,同时对应的Interceptor也就不同。

那么Interceptor有什么用呢?

我们刚才只关注了中间的chain.proceed(requestBuilder.build());。而在此前后我们可以做很多的逻辑操作了,比如:

对Request进行一些请求头的判断,处理和完善。对Response进行一些处理,如在有gzip的情况下数据的处理等。

总结:Interceptor这里我称之为拦截器。Okhttp将请求的流程,从封装请求头,获取连接,发请求数据,读请求数据等等。拆分成一个个Interceptor。每一个Interceptor有着自己单一的功能,而下层的Interceptor为上层的Interceptor服务,有没有觉得有点像我们的网络TCP/IP的模型,哈哈。其实这种思想让我们的请求变的更加清晰,并且扩展性很好。每一层也就是Interceptor可以有自己的实现。同时我们可以定义自己的Interceptor。 而Interceptor的顺序执行就由RealInterceptorChain完成。

到这里我们就讲了整个请求的大体执行框架和模式。这部分一定要好好的理解,方便后面的学习。

RetryAndFollowUpInterceptor

这个拦截器用来做重连接和重定向的。其中逻辑有以下:

创建StreamAllocation
  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response = null;
      boolean releaseConnection = true;
      try {
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        ...省略剩余代码

看到了吧new StreamAllocation了吧。这里第一个疑惑解决,StreamAllocation的创建地方。这里还要多讲一个地方就是构造参数ConnectionPool。我们看到是从OkHttpClient传了的。而在OkHttpclient创建时候创建了ConnectionPool。

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
  ...省略
    public static final class Builder {
      public Builder() {
      ...省略
connectionPool = new ConnectionPool();
         ...省略

后面用到ConnectionPool大家就别再疑惑了。

创建StreamAllocation在这里,那当然释放也是在这类里:

失败重连接
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response = null;
      boolean releaseConnection = true;
      try {
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

我们看到response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);是在一个无限循环中,如果出现异常,并且满足重连接,就会再次调用。

重定向
      Request followUp = followUpRequest(response);

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

ConnectInterceptor获取连接

前面我们讲了Interceptor的执行流程。而getResponseWithInterceptorChain方法中添加了一些给定的Interceptor。如:RetryAndFollowUpInterceptor(这个是在创建RealCall时候创建的,前面有提醒大家注意)、BridgeInterceptor(上一节有讲到,主要做一些请求头和响应数据的处理)、CacheInterceptor(看名称知道,处理缓存)、ConnectInterceptor、CallServerInterceptor。按上一节的流程,这些Interceptor会依次被调用。这里我们要重点看最后两个,首先是ConnectInterceptor。

通过名称我们知道主要做连接处理。我们看下源码:

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

估计大家都喜欢看这种源码,代码量很少,简洁。我们主要看中间三行代码:

获取StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();

而streamAllocation是RealInterceptorChain的成员变量,在构造方法中赋值。这里我们就往前找,往直前的Interceport找,看谁构造RealInterceptorChain传递了StreamAllocation。经过我们的一个个查找,在RetryAndFollowUpInterceptor的intercept方法中找到:

获取HttpCodec
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
获取RealConnection
RealConnection connection = streamAllocation.connection();

HttpCodec和RealConnection这两玩意干啥的,就现在这几行代码我们也看不出来,那就先不管。继续看CallServerInterceptor

CallServerInterceptor网络请求

这个就厉害了。看名称,请求服务。那就是最核心的地方了。撸上代码:

/** This is the last interceptor in the chain. It makes a network call to the server. */
public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
        // being reused. Otherwise we're still obligated to transmit the request body to leave the
        // connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
}
1. 从RealInterceptorChain获取相关对象
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

这些变量在上一节中已经介绍,这里只是获取一下。

2. 发送请求头数据
httpCodec.writeRequestHeaders(request);
3. 发送请求体

这里会先判断请求方法以及是否有请求体数据。如果有则发送。

4. 读取响应头
responseBuilder = httpCodec.readResponseHeaders(false);
5. 封装相应内容
    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }
6. 判断Connection头部信息是否是close

如果请求头或响应头的Connection值为close。则标识改Connection为noNewStreams。标识不会有新的流。

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

至于noNewStreams是用来控制,当前的连接是否能再次被使用,我们后面会讲到。


前面我们讲了CallServerInterceptor的请求流程,但里面有很多的类我们还不清楚是怎么来的,以及干啥用的。接下来我们就讲核心类的用法以及创建和使用流程。

RealConnection

这个类主要负责进行Socket的操作(连接),获取Socket的输入输出流并封装。

建立Socket连接(connect)
    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
          connectSocket(connectTimeout, readTimeout);
        }
        establishProtocol(connectionSpecSelector);
        break;
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;
        http2Connection = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }

我们先简单的看,直接连接socket的方式,大家可以看到是一个无限循环,知道连接成功,或者指定的相关异常抛出则跳出循环。具体哪些异常可以看connectionSpecSelector.connectionFailed(e)内部的实现。

接下来我们就具体看连接socket的方法connectSocket(connectTimeout, readTimeout)

  private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }

逻辑很清晰,就是建立socket连接,然后封装输入输出流。

  1. 建立socket连接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);

进行连接,我们查看实现:

  public void connectSocket(Socket socket, InetSocketAddress address,
      int connectTimeout) throws IOException {
    socket.connect(address, connectTimeout);
  }

这就到了我们熟悉的Socket连接了。

  1. 封装输入输出流
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
获取并封装Socket输入输出流

我们看一下Okio.source方法:

  /**
   * Returns a source that reads from {@code socket}. Prefer this over {@link
   * #source(InputStream)} because this method honors timeouts. When the socket
   * read times out, the socket is asynchronously closed by a watchdog thread.
   */
  public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
  }

还得继续扒:看source的重载方法:

  private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

来了,这下清晰了,对输入流做了包装。既然是输入,就只有read方法而么有write方法。而读的逻辑就是讲InputStream的数据存放到Buffer中。

到这里Okio.source(rawSocket)我们清楚了,把输入流封装成Source。read方法将数据读入到Buffer中。我们接下来继续看Okio.buffer(Okio.source(rawSocket))外面这个方法:

  public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }

有点粗暴,就是new了个RealBufferedSource。而它又是啥玩意呢,它实现了BufferedSource。这分明就是个装饰模式嘛。在原有Source的基础上,多了一些方法。如:readInt、skip等等。那还有啥用呢,我们看read方法:

  @Override public long read(Buffer sink, long byteCount) throws IOException {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");

    if (buffer.size == 0) {
      long read = source.read(buffer, Segment.SIZE);
      if (read == -1) return -1;
    }

    long toRead = Math.min(byteCount, buffer.size);
    return buffer.read(sink, toRead);
  }

这里进行了方法的覆写,数据先读到Buffer里,然后再写到sink里。

至于sink = Okio.buffer(Okio.sink(rawSocket));我就不讲了,模式一样。只是一个负责读,一个负责写。

StreamAllocation

协调Connections、Streams、Calls之间的关系。包括控制RealConnection的创建,释放,状态的管理。

一个RealConnection中可以包含多个StreamAllocation,默认为1个。

findHealthyConnection

这个方法就比较重要了。找到一个健康可用的RealConnection,通过阅读这个类,我们可以把上面说的几个类的关系搞清楚。先源码:

/**
 * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
 * until a healthy connection is found.
 */
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
    throws IOException {
  while (true) {
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
        connectionRetryEnabled);

    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) {
      if (candidate.successCount == 0) {
        return candidate;
      }
    }

    // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
    // isn't, take it out of the pool and start again.
    if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      noNewStreams();
      continue;
    }

    return candidate;
  }
}

首选进入一个死循环,直到获取一个健康的可用的RealConnection或者有异常抛出。


findConnection》》》》》》》》》》》》》》》》Start

第一步调用到findConnection方法,我们查看该方法

/**
 * Returns a connection to host a new stream. This prefers the existing connection if it exists,
 * then the pool, finally building a new connection.
 */
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
    boolean connectionRetryEnabled) throws IOException {
  Route selectedRoute;
  synchronized (connectionPool) {
    if (released) throw new IllegalStateException("released");
    if (codec != null) throw new IllegalStateException("codec != null");
    if (canceled) throw new IOException("Canceled");

    // Attempt to use an already-allocated connection.
    RealConnection allocatedConnection = this.connection;
    if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
      return allocatedConnection;
    }

    // Attempt to get a connection from the pool.
    Internal.instance.get(connectionPool, address, this, null);
    if (connection != null) {
      return connection;
    }

    selectedRoute = route;
  }

  // If we need a route, make one. This is a blocking operation.
  if (selectedRoute == null) {
    selectedRoute = routeSelector.next();
  }

  RealConnection result;
  synchronized (connectionPool) {
    if (canceled) throw new IOException("Canceled");

    // Now that we have an IP address, make another attempt at getting a connection from the pool.
    // This could match due to connection coalescing.
    Internal.instance.get(connectionPool, address, this, selectedRoute);
    if (connection != null) return connection;

    // Create a connection and assign it to this allocation immediately. This makes it possible
    // for an asynchronous cancel() to interrupt the handshake we're about to do.
    route = selectedRoute;
    refusedStreamCount = 0;
    result = new RealConnection(connectionPool, selectedRoute);
    acquire(result);
  }

  // Do TCP + TLS handshakes. This is a blocking operation.
  result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
  routeDatabase().connected(result.route());

  Socket socket = null;
  synchronized (connectionPool) {
    // Pool the connection.
    Internal.instance.put(connectionPool, result);

    // If another multiplexed connection to the same address was created concurrently, then
    // release this connection and acquire that one.
    if (result.isMultiplexed()) {
      socket = Internal.instance.deduplicate(connectionPool, address, this);
      result = connection;
    }
  }
  closeQuietly(socket);

  return result;
}
  1. 先判断当前的RealConnection allocatedConnection = this.connection;判断当前连接是否已存在,如果存在且没有标记noNewStreams,则直接返回该连接
  2. 到连接池中寻找匹配的连接Internal.instance.get(connectionPool, address, this, null);。这里Internal.instance是一个抽象类中的静态变量,那在哪里实现的呢。我们看到OkHttpClient类。类中第三个static关键字就是instance的实现 static { Internal.instance = new Internal() { @Override public void addLenient(Headers.Builder builder, String line) { builder.addLenient(line); } @Override public RealConnection get(ConnectionPool pool, Address address, StreamAllocation streamAllocation, Route route) { return pool.get(address, streamAllocation, route); } ...省略代码 } 这里我们就知道其实最终调用到了ConnectionPool的get方法。我们查看 /** * Returns a recycled connection to {@code address}, or null if no such connection exists. The * route is null if the address has not yet been routed. */ @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (connection.isEligible(address, route)) { streamAllocation.acquire(connection); return connection; } } return null; } 内部就是循环遍历connections,找到匹配的Connection。至于如何判断,大家查看方法的实现即可。如果找到则直接返回,否则进入下一步
  3. 前面在调用ConnectionPool.get方法时候Route参数为空,这一步就是获取一个Route然后再次查找。如果成功就返回
  4. 经过上面三个步骤后,说明已经没有可用的Connection。那么就得创建一个, result = new RealConnection(connectionPool, selectedRoute); acquire(result);
  5. 创建完后调用acquire,这个是干啥的呢 public void acquire(RealConnection connection) { assert (Thread.holdsLock(connectionPool)); if (this.connection != null) throw new IllegalStateException(); this.connection = connection; connection.allocations.add(new StreamAllocationReference(this, callStackTrace)); } 把当前的StreamAllocation添加到RealConnection。这和我们前面说到的一个RealConnection可能对应多个StreamAllocation。
  6. 开始进行Socket连接 result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); routeDatabase().connected(result.route());
  7. 将RealConnection添加到connectionPool中 Internal.instance.put(connectionPool, result);

到这里findHealthyConnection执行完毕,结果获取一个可用的RealConnection。

findConnection《《《《《《《《《《《《《《《《《《《《End


继续findHealthyConnection的代码:

      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;

这里就是检查RealConnection是否正常可用。也就是做个体检isHealthy,如果发现返回False,那么标记这个RealConnection的noNewStreams为true。此变量标记为true后,代码后面就不要从使用这个RealConnection。何以得知呢?

看到前面第2步,从ConnectionPool调用get方法寻找合适的RealConnection,有一句判断,前面我们没有讲,这里我跟踪一下:

  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }

看到connection.isEligible(address, route)这句话,我们进入:

public boolean isEligible(Address address, @Nullable Route route) {
  // If this connection is not accepting new streams, we're done.
  if (allocations.size() >= allocationLimit || noNewStreams) return false;

  // If the non-host fields of the address don't overlap, we're done.
  if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
  .../省略代码

看到noNewStreams了吧, 现在知道他的用处了吧。还有allocations.size() >= allocationLimit,控制一个RealConnection可以被多少个StreamAllocation持有,这下都清楚了吧。

HttpCodec(Http1Codec)

Http请求和响应的编解码抽象HttpCodec

这是一个接口,定义了编解码的抽象方法。

public interface HttpCodec {
  /**
   * The timeout to use while discarding a stream of input data. Since this is used for connection
   * reuse, this timeout should be significantly less than the time it takes to establish a new
   * connection.
   */
  int DISCARD_STREAM_TIMEOUT_MILLIS = 100;

  /** Returns an output stream where the request body can be streamed. */
  Sink createRequestBody(Request request, long contentLength);

  /** This should update the HTTP engine's sentRequestMillis field. */
  void writeRequestHeaders(Request request) throws IOException;

  /** Flush the request to the underlying socket. */
  void flushRequest() throws IOException;

  /** Flush the request to the underlying socket and signal no more bytes will be transmitted. */
  void finishRequest() throws IOException;

  /**
   * Parses bytes of a response header from an HTTP transport.
   *
   * @param expectContinue true to return null if this is an intermediate response with a "100"
   *     response code. Otherwise this method never returns null.
   */
  Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;

  /** Returns a stream that reads the response body. */
  ResponseBody openResponseBody(Response response) throws IOException;

  /**
   * Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
   * That may happen later by the connection pool thread.
   */
  void cancel();
}

主要就是针对Request和Response的处理。将我们传入的请求Request编码成Http的协议请求,将响应解码成Response。

针对HTTP/1.1的实现Http1Codec

前面讲了HttpCodec的抽象方法。这里就是实现,Http协议也有多个版本,也就对应不同的实现。这里我们就看现在常用的Http/1.1。

而Http1Codec的创建在ConnectInterceptor中。

HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);

我们继续跟踪到StreamAllocation的newStream方法

HttpCodec resultCodec = resultConnection.newCodec(client, this);

继续进入:

  public HttpCodec newCodec(
      OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(client.readTimeoutMillis());
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
  }

两个参数我们已经很熟悉了。重点看到了new Http1Codec(client, streamAllocation, source, sink),在这里创建了Http1Codec。而传递的参数source、sink前面我们已经介绍了。在连接Socket后进行输入输出的封装。

Http1Codec核心方法实现

我们在介绍CallServerInterceptor的intercept方法时候,只是粗略的讲了下流程。这里我们将一下和Http1Codec相关的方法。

  • 发送请求头 httpCodec.writeRequestHeaders(request); 看到实现 /** * Prepares the HTTP headers and sends them to the server. * * <p>For streaming requests with a body, headers must be prepared <strong>before</strong> the * output stream has been written to. Otherwise the body would need to be buffered! * * <p>For non-streaming requests with a body, headers must be prepared <strong>after</strong> the * output stream has been written to and closed. This ensures that the {@code Content-Length} * header field receives the proper value. */ @Override public void writeRequestHeaders(Request request) throws IOException { String requestLine = RequestLine.get( request, streamAllocation.connection().route().proxy().type()); writeRequest(request.headers(), requestLine); } requestLine就是HTTP的起始行,内部大家可以自己查看。然后看到writeRequest

方法:

  /** Returns bytes of a request header for sending on an HTTP transport. */
  public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
  }

逻辑就很简单了,遍历Headers,将请求头写入到sink中。

  • 发送请求体
      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
        // being reused. Otherwise we're still obligated to transmit the request body to leave the
        // connection in a consistent state.
        streamAllocation.noNewStreams();
      }

这里会先判断,如果后请求体的话就发送。看到createRequestBody方法。

  @Override public Sink createRequestBody(Request request, long contentLength) {
    if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
      // Stream a request body of unknown length.
      return newChunkedSink();
    }

    if (contentLength != -1) {
      // Stream a request body of a known length.
      return newFixedLengthSink(contentLength);
    }

    throw new IllegalStateException(
        "Cannot stream a request body without chunked encoding or a known content length!");
  }

确定发送的请求数据大小是否确定,然后返回对应的Sink实现。接下来看到request.body().writeTo(bufferedRequestBody);

的实现。我们看到request.body()返回是RequestBody,一个抽象类,定义请求体的方法。看到自带的实现有FormBody、MultipartBody。我们挑一个看FormBody。看到writeTo方法:

  @Override public void writeTo(BufferedSink sink) throws IOException {
    writeOrCountBytes(sink, false);
  }

继续看:

  /**
   * Either writes this request to {@code sink} or measures its content length. We have one method
   * do double-duty to make sure the counting and content are consistent, particularly when it comes
   * to awkward operations like measuring the encoded length of header strings, or the
   * length-in-digits of an encoded integer.
   */
  private long writeOrCountBytes(@Nullable BufferedSink sink, boolean countBytes) {
    long byteCount = 0L;

    Buffer buffer;
    if (countBytes) {
      buffer = new Buffer();
    } else {
      buffer = sink.buffer();
    }

    for (int i = 0, size = encodedNames.size(); i < size; i++) {
      if (i > 0) buffer.writeByte('&');
      buffer.writeUtf8(encodedNames.get(i));
      buffer.writeByte('=');
      buffer.writeUtf8(encodedValues.get(i));
    }

    if (countBytes) {
      byteCount = buffer.size();
      buffer.clear();
    }

    return byteCount;
  }

这里就清晰了,将请求体(Form表单)遍历的写出。

  • 读取响应头
    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

进入到实现:

  @Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
    if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
      throw new IllegalStateException("state: " + state);
    }

    try {
      StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());

      Response.Builder responseBuilder = new Response.Builder()
          .protocol(statusLine.protocol)
          .code(statusLine.code)
          .message(statusLine.message)
          .headers(readHeaders());

      if (expectContinue && statusLine.code == HTTP_CONTINUE) {
        return null;
      }

      state = STATE_OPEN_RESPONSE_BODY;
      return responseBuilder;
    } catch (EOFException e) {
      // Provide more context if the server ends the stream before sending a response.
      IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
      exception.initCause(e);
      throw exception;
    }
  }

首先读取起始行statusLine。我们进入到实现source.readUtf8LineStrict()看怎么读取的

long newline = indexOf((byte) '\n', 0, scanLength);   
if (scanLength < Long.MAX_VALUE
        && request(scanLength) && buffer.getByte(scanLength - 1) == '\r'
        && request(scanLength + 1) && buffer.getByte(scanLength) == '\n') {
      return buffer.readUtf8Line(scanLength); // The line was 'limit' UTF-8 bytes followed by \r\n.
    }

这里先时候去到\n的位置。然后看到,起始行的结束是否是\r\n。最后读取并返回。StatusLine.parse就是解析得到Http的Method、Code、Message。下面读取响应头:

      Response.Builder responseBuilder = new Response.Builder()
          .protocol(statusLine.protocol)
          .code(statusLine.code)
          .message(statusLine.message)
          .headers(readHeaders());

先将起始行封装到responseBuilder中,然后readHeaders()读取响应头。

  /** Reads headers or trailers. */
  public Headers readHeaders() throws IOException {
    Headers.Builder headers = new Headers.Builder();
    // parse the result headers until the first blank line
    for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) {
      Internal.instance.addLenient(headers, line);
    }
    return headers.build();
  }

这里就是一行行读取响应头,然后添加到Headers中。细节大家跟踪到方法内部查看即可。

  • 封装响应体
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

先判断响应头,101啥意思呢:服务器将遵从客户的请求转换到另外一种协议(HTTP 1.1新)。所以就不用管响应体了。我们重点看到httpCodec.openResponseBody(response)

  @Override public ResponseBody openResponseBody(Response response) throws IOException {
    Source source = getTransferStream(response);
    return new RealResponseBody(response.headers(), Okio.buffer(source));
  }

两个步骤:

  • 第一步getTransferStream(response)
  private Source getTransferStream(Response response) throws IOException {
    if (!HttpHeaders.hasBody(response)) {
      return newFixedLengthSource(0);
    }

    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
      return newChunkedSource(response.request().url());
    }

    long contentLength = HttpHeaders.contentLength(response);
    if (contentLength != -1) {
      return newFixedLengthSource(contentLength);
    }

    // Wrap the input stream from the connection (rather than just returning
    // "socketIn" directly here), so that we can control its use after the
    // reference escapes.
    return newUnknownLengthSource();
  }

主要判断响应头Transfer-Encoding来确定响应体的数据大小是否确定,如果是chunked则是分块传输,则没有Content-Length。否则可以确定响应体大小。然后返回不同的Source实现。

  • 第二步 new RealResponseBody(response.headers(), Okio.buffer(source)
public final class RealResponseBody extends ResponseBody {
  private final Headers headers;
  private final BufferedSource source;

  public RealResponseBody(Headers headers, BufferedSource source) {
    this.headers = headers;
    this.source = source;
  }

  @Override public MediaType contentType() {
    String contentType = headers.get("Content-Type");
    return contentType != null ? MediaType.parse(contentType) : null;
  }

  @Override public long contentLength() {
    return HttpHeaders.contentLength(headers);
  }

  @Override public BufferedSource source() {
    return source;
  }
}

这个就是做了一个封装,没什么别的逻辑。


到此为止整个Http的发送和响应就介绍完毕了。

同步/异步请求

先来一张流程图:

Call.png

文章的开始我们发送http请求直接使用的同步请求

Response response = client.newCall(request).execute();

这样比较粗暴,我们还需要开启线程。如此OkHttp当然也就提供了异步调用方法。

client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
});

调用也是非常的方便的。整个请求OkHttp会帮我们开启线程,并完成Http请求。接下来我们就分析这块的流程。

newCall方法就不介绍了,我们看到enqueue方法。

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
  • 先判断是否已经executed。防止多次调用
  • 将任务加入到队里中
client.dispatcher().enqueue(new AsyncCall(responseCallback));

我们进入到enqueue方法。

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
  • 先判断当前正在请求的数量是否大于最大请求数,同一个主机的请求是否超过限制
  • 如果超过限制,则将AsyncCall任务放到readyAsyncCalls(准备任务)队列中。
  • 如果没有超过限制,加入到runningAsyncCalls(运行)队列中,并直接调度执行。

这里我们先看几个变量runningAsyncCalls、readyAsyncCalls:

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

这个写的很清楚,异步就绪队列、异步运行队列,同步运行队列。runningSyncCalls这个在前面同步调用的时候有涉及。剩下两个变量就在这里体现。整体的思路就是,异步调用先判断请求数量是否超限,如果没有直接交给线程池执行;超限就先放到准备队列中。

我们在看到executorService().execute(call);进入到executorService()方法:

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

这里很明显了一个单例的实现,方法上加上了synchronized关键字。ThreadPoolExecutor是创建一个线程池。

这里有朋友要问了,当请求数量超限制我们只看到了把任务放到准备队列中,那啥时候被调用呢?这里大家先别着急,后面会讲到。

到此我们已经把client.dispatcher().enqueue(new AsyncCall(responseCallback));

enqueue这个方法干了什么事讲清楚了。这里还设计一个类AsyncCall。我们看看:

  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

光看这里看不出啥门道,我们得看看父类NamedRunnable

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

嘿!就是个Runnable,run里就做了两个事

  • 设置线程名称
  • 进行execute()调用

然后我们回到AsyncCall看到execute的实现:

Response response = getResponseWithInterceptorChain();

有没有觉得很眼熟,这个不就是我们前面讲同步调用的时候,通过这个方法完成的请求。

请求成功进行CallBack回调

responseCallback.onResponse(RealCall.this, response)

失败或者发生异常也回调

responseCallback.onFailure(RealCall.this, e);

上面我们就把异步调用的发起和回调讲清楚了,前面我们还有个问题就是准备队列的任务啥时候被执行。

准备就绪队列任务的调度

我们还是看到AsyncCall的execute方法,真正的执行调用时在这个方法中,我们看到最后的finally块

finally {
        client.dispatcher().finished(this);
      }

跟踪到Dispatcher的finish方法

  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

看到重载的方法:

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

这里先把call从队列中移除。然后判断promoteCalls,这里我们知道是true,所以重点看到if (promoteCalls) promoteCalls();这段代码:

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
  • 还是先判断正在请求的数量是否超限制
  • 遍历readyAsyncCalls,找到符合条件的请求(同一个主机的请求数量是否超限制)。如果找到就从readyAsyncCalls中移除,然后加入到runningAsyncCalls。然后通过线程池获进行调度执行。

前面我们将到同步调用的时候,RealCall的execute()方法的开始有client.dispatcher().executed(this);

方法的结束finally调用了client.dispatcher().finished(this);。然后调用了Dispatcher的finished(runningSyncCalls, call, false);方法。这里和异步调用的区别就是最后一个参数为false。

到这里整个同步调用和异步调用我们就串联起来了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.11.03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
    • 目录:
    • OkHttp介绍:
    • 粗绘请求流程
    • RealCall方法execute
    • getResponseWithInterceptorChain调用链
      • Chain
        • RealInterceptorChain
          • Interceptor
          • RetryAndFollowUpInterceptor
            • 创建StreamAllocation
              • 失败重连接
                • 重定向
                • ConnectInterceptor获取连接
                  • 获取StreamAllocation
                    • 获取HttpCodec
                      • 获取RealConnection
                      • CallServerInterceptor网络请求
                        • 1. 从RealInterceptorChain获取相关对象
                          • 2. 发送请求头数据
                            • 3. 发送请求体
                              • 4. 读取响应头
                                • 5. 封装相应内容
                                  • 6. 判断Connection头部信息是否是close
                                  • RealConnection
                                    • 建立Socket连接(connect)
                                      • 获取并封装Socket输入输出流
                                      • StreamAllocation
                                        • findHealthyConnection
                                        • HttpCodec(Http1Codec)
                                          • Http请求和响应的编解码抽象HttpCodec
                                            • 针对HTTP/1.1的实现Http1Codec
                                              • Http1Codec核心方法实现
                                              • 同步/异步请求
                                                • 准备就绪队列任务的调度
                                                领券
                                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档