OkHttp源码深入解读

简介

目前在HTTP协议请求库中,OKHttp应当是非常火的,使用也非常的简单。网上有很多文章写了关于OkHttp的特点以及使用,而本章主要带领大家阅读OkHttp的源码,让大家对OKhttp的工作原理有所了解,当然源码的代码量是非常大的,这里我只是抓住主线和重点部分,至于细节部分,大家随着我抛出的线来跟进基本是没什么问题的。这篇文章要干嘛,引用一句话:

read the fucking source code

目录:
  • OkHttp介绍
  • 粗绘请求流程
  • RealCall方法execute
  • getResponseWithInterceptorChain调用链
  • RetryAndFollowUpInterceptor
  • ConnectInterceptor获取连接
  • CallServerInterceptor网络请求
  • RealConnection
  • StreamAllocation
  • HttpCodec(Http1Codec)
  • 同步/异步请求

OkHttp介绍:

特点:

  • 支持连接同一地址的连接共享同一个socket(前提服务端支持)
  • 支持Socket连接池,减少请求延迟
  • 使用拦截器模式,将流程拆分
  • 透明的GZIP压缩

粗绘请求流程

注意:这里我选择OkHttp源码版本是 3.8.0。为了方便大家能够和文章同步,最好保持版本一致,我看过老版本和新的版本还是有点不同的。

官网给出的示例

OkHttpClient client = new OkHttpClient();

String run(String url) throws IOException {
  Request request = new Request.Builder()
      .url(url)
      .build();

  Response response = client.newCall(request).execute();
  return response.body().string();
}

我们就从这里入口,来一步一步的跟进。

  1. 首先是创建一个OkHttpClient,Http请求工厂,也就是只要需要发Http请求,那都得找他。内部当然后很多的成员变量和方法,这里我们先不做介绍,等用到时再解释。
  2. 我们继续看client.newCall(request)。找到源码
  @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }

很简单,创建了一个RealCall,这里我就称为一个请求。Request不说大家能理解,里面封装了各种请求的信息。创建过程也很简单,做一些成员变量赋值和初始化。

  RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    final EventListener.Factory eventListenerFactory = client.eventListenerFactory();

    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);

    // TODO(jwilson): this is unsafe publication and not threadsafe.
    this.eventListener = eventListenerFactory.create(this);
  }

这里注意retryAndFollowUpInterceptor;变量,后面会用到。

  1. 调用了RealCall的execute()方法并返回Response结果。

RealCall方法execute

前面我们知道了大致的请求流程,下面我们重点看

  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }
  1. 首先我们发现try的前后调用了Dispatcher的方法:
client.dispatcher().executed(this);
client.dispatcher().finished(this);

分别是将Call加入到Dispatcher中的同步队列中,结束后,移除队列。

  1. 调用getResponseWithInterceptorChain获取Response。

接下来我们就重点看getResponseWithInterceptorChain方法

getResponseWithInterceptorChain调用链

okhttp.png

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

这里的代码就很关键了,设计的很巧妙。可能有点绕,这里我讲一下几个关键的类。

Chain
  1. 获取Request
  2. 执行proceed
RealInterceptorChain

Chain的实现

  1. 包含了完成请求需要的类,包括StreamAllocation、HttpCodec、RealConnection、Request等。这里必要重要的就是可以实现了Chain的request()来获取Request。
  2. 控制Interceptor的调用,调用Interceptor的拦截方法intercept后,就封装下一个RealInterceptorChain并指定index。声明下一个将要被调用的Interceptor。这部分逻辑主要在proceed方法中。我们看核心代码
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

首先会获取当前index的Interceptor。然后执行对应的intercept

方法。同时出入的参数是新创建的RealInterceptorChain。而新创建的RealInterceptorChain对应的index+1。如果执行新创建的RealInterceptorChain的proceed方法,那么interceptors的第index+1个Interceptor的intercept会被执行。依次循环下去。

总结: RealInterceptorChain就是对请求中个中重要对象的封装,执行Interceptor的intercept

的调用,确定下一个RealInterceptorChain。保证所有的Interceptor依次执行intercept

Interceptor

前面讲到了RealInterceptorChain会执行Interceptor的intercept方法,同时传入下一个RealInterceptorChain。那么intercept方法究竟做了什么事呢,因为Interceptor的实现很多,这里我们挑一个系统的实现类看看,比如:BridgeInterceptor,这个代码虽然长,但逻辑想对简单

  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

里面逻辑我们现在可能还看不懂,我们看中间最核心的一句话,。Response networkResponse = chain.proceed(requestBuilder.build());有没有觉得顿时豁然开朗。realChain是传入的参数,而执行proceed方法,就又回到了前面我们讲RealInterceptorChain的流程。那前后RealInterceptorChain有什么区别呢?那就是index在不断的增加,同时对应的Interceptor也就不同。

那么Interceptor有什么用呢?

我们刚才只关注了中间的chain.proceed(requestBuilder.build());。而在此前后我们可以做很多的逻辑操作了,比如:

对Request进行一些请求头的判断,处理和完善。对Response进行一些处理,如在有gzip的情况下数据的处理等。

总结:Interceptor这里我称之为拦截器。Okhttp将请求的流程,从封装请求头,获取连接,发请求数据,读请求数据等等。拆分成一个个Interceptor。每一个Interceptor有着自己单一的功能,而下层的Interceptor为上层的Interceptor服务,有没有觉得有点像我们的网络TCP/IP的模型,哈哈。其实这种思想让我们的请求变的更加清晰,并且扩展性很好。每一层也就是Interceptor可以有自己的实现。同时我们可以定义自己的Interceptor。 而Interceptor的顺序执行就由RealInterceptorChain完成。

到这里我们就讲了整个请求的大体执行框架和模式。这部分一定要好好的理解,方便后面的学习。

RetryAndFollowUpInterceptor

这个拦截器用来做重连接和重定向的。其中逻辑有以下:

创建StreamAllocation
  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response = null;
      boolean releaseConnection = true;
      try {
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        ...省略剩余代码

看到了吧new StreamAllocation了吧。这里第一个疑惑解决,StreamAllocation的创建地方。这里还要多讲一个地方就是构造参数ConnectionPool。我们看到是从OkHttpClient传了的。而在OkHttpclient创建时候创建了ConnectionPool。

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
  ...省略
    public static final class Builder {
      public Builder() {
      ...省略
connectionPool = new ConnectionPool();
         ...省略

后面用到ConnectionPool大家就别再疑惑了。

创建StreamAllocation在这里,那当然释放也是在这类里:

失败重连接
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response = null;
      boolean releaseConnection = true;
      try {
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

我们看到response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);是在一个无限循环中,如果出现异常,并且满足重连接,就会再次调用。

重定向
      Request followUp = followUpRequest(response);

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

ConnectInterceptor获取连接

前面我们讲了Interceptor的执行流程。而getResponseWithInterceptorChain方法中添加了一些给定的Interceptor。如:RetryAndFollowUpInterceptor(这个是在创建RealCall时候创建的,前面有提醒大家注意)、BridgeInterceptor(上一节有讲到,主要做一些请求头和响应数据的处理)、CacheInterceptor(看名称知道,处理缓存)、ConnectInterceptor、CallServerInterceptor。按上一节的流程,这些Interceptor会依次被调用。这里我们要重点看最后两个,首先是ConnectInterceptor。

通过名称我们知道主要做连接处理。我们看下源码:

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

估计大家都喜欢看这种源码,代码量很少,简洁。我们主要看中间三行代码:

获取StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();

而streamAllocation是RealInterceptorChain的成员变量,在构造方法中赋值。这里我们就往前找,往直前的Interceport找,看谁构造RealInterceptorChain传递了StreamAllocation。经过我们的一个个查找,在RetryAndFollowUpInterceptor的intercept方法中找到:

获取HttpCodec
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
获取RealConnection
RealConnection connection = streamAllocation.connection();

HttpCodec和RealConnection这两玩意干啥的,就现在这几行代码我们也看不出来,那就先不管。继续看CallServerInterceptor

CallServerInterceptor网络请求

这个就厉害了。看名称,请求服务。那就是最核心的地方了。撸上代码:

/** This is the last interceptor in the chain. It makes a network call to the server. */
public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
        // being reused. Otherwise we're still obligated to transmit the request body to leave the
        // connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
}
1. 从RealInterceptorChain获取相关对象
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

这些变量在上一节中已经介绍,这里只是获取一下。

2. 发送请求头数据
httpCodec.writeRequestHeaders(request);
3. 发送请求体

这里会先判断请求方法以及是否有请求体数据。如果有则发送。

4. 读取响应头
responseBuilder = httpCodec.readResponseHeaders(false);
5. 封装相应内容
    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }
6. 判断Connection头部信息是否是close

如果请求头或响应头的Connection值为close。则标识改Connection为noNewStreams。标识不会有新的流。

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

至于noNewStreams是用来控制,当前的连接是否能再次被使用,我们后面会讲到。


前面我们讲了CallServerInterceptor的请求流程,但里面有很多的类我们还不清楚是怎么来的,以及干啥用的。接下来我们就讲核心类的用法以及创建和使用流程。

RealConnection

这个类主要负责进行Socket的操作(连接),获取Socket的输入输出流并封装。

建立Socket连接(connect)
    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
          connectSocket(connectTimeout, readTimeout);
        }
        establishProtocol(connectionSpecSelector);
        break;
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;
        http2Connection = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }

我们先简单的看,直接连接socket的方式,大家可以看到是一个无限循环,知道连接成功,或者指定的相关异常抛出则跳出循环。具体哪些异常可以看connectionSpecSelector.connectionFailed(e)内部的实现。

接下来我们就具体看连接socket的方法connectSocket(connectTimeout, readTimeout)

  private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }

逻辑很清晰,就是建立socket连接,然后封装输入输出流。

  1. 建立socket连接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);

进行连接,我们查看实现:

  public void connectSocket(Socket socket, InetSocketAddress address,
      int connectTimeout) throws IOException {
    socket.connect(address, connectTimeout);
  }

这就到了我们熟悉的Socket连接了。

  1. 封装输入输出流
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
获取并封装Socket输入输出流

我们看一下Okio.source方法:

  /**
   * Returns a source that reads from {@code socket}. Prefer this over {@link
   * #source(InputStream)} because this method honors timeouts. When the socket
   * read times out, the socket is asynchronously closed by a watchdog thread.
   */
  public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
  }

还得继续扒:看source的重载方法:

  private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

来了,这下清晰了,对输入流做了包装。既然是输入,就只有read方法而么有write方法。而读的逻辑就是讲InputStream的数据存放到Buffer中。

到这里Okio.source(rawSocket)我们清楚了,把输入流封装成Source。read方法将数据读入到Buffer中。我们接下来继续看Okio.buffer(Okio.source(rawSocket))外面这个方法:

  public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }

有点粗暴,就是new了个RealBufferedSource。而它又是啥玩意呢,它实现了BufferedSource。这分明就是个装饰模式嘛。在原有Source的基础上,多了一些方法。如:readInt、skip等等。那还有啥用呢,我们看read方法:

  @Override public long read(Buffer sink, long byteCount) throws IOException {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");

    if (buffer.size == 0) {
      long read = source.read(buffer, Segment.SIZE);
      if (read == -1) return -1;
    }

    long toRead = Math.min(byteCount, buffer.size);
    return buffer.read(sink, toRead);
  }

这里进行了方法的覆写,数据先读到Buffer里,然后再写到sink里。

至于sink = Okio.buffer(Okio.sink(rawSocket));我就不讲了,模式一样。只是一个负责读,一个负责写。

StreamAllocation

协调Connections、Streams、Calls之间的关系。包括控制RealConnection的创建,释放,状态的管理。

一个RealConnection中可以包含多个StreamAllocation,默认为1个。

findHealthyConnection

这个方法就比较重要了。找到一个健康可用的RealConnection,通过阅读这个类,我们可以把上面说的几个类的关系搞清楚。先源码:

/**
 * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
 * until a healthy connection is found.
 */
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
    throws IOException {
  while (true) {
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
        connectionRetryEnabled);

    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) {
      if (candidate.successCount == 0) {
        return candidate;
      }
    }

    // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
    // isn't, take it out of the pool and start again.
    if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      noNewStreams();
      continue;
    }

    return candidate;
  }
}

首选进入一个死循环,直到获取一个健康的可用的RealConnection或者有异常抛出。


findConnection》》》》》》》》》》》》》》》》Start

第一步调用到findConnection方法,我们查看该方法

/**
 * Returns a connection to host a new stream. This prefers the existing connection if it exists,
 * then the pool, finally building a new connection.
 */
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
    boolean connectionRetryEnabled) throws IOException {
  Route selectedRoute;
  synchronized (connectionPool) {
    if (released) throw new IllegalStateException("released");
    if (codec != null) throw new IllegalStateException("codec != null");
    if (canceled) throw new IOException("Canceled");

    // Attempt to use an already-allocated connection.
    RealConnection allocatedConnection = this.connection;
    if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
      return allocatedConnection;
    }

    // Attempt to get a connection from the pool.
    Internal.instance.get(connectionPool, address, this, null);
    if (connection != null) {
      return connection;
    }

    selectedRoute = route;
  }

  // If we need a route, make one. This is a blocking operation.
  if (selectedRoute == null) {
    selectedRoute = routeSelector.next();
  }

  RealConnection result;
  synchronized (connectionPool) {
    if (canceled) throw new IOException("Canceled");

    // Now that we have an IP address, make another attempt at getting a connection from the pool.
    // This could match due to connection coalescing.
    Internal.instance.get(connectionPool, address, this, selectedRoute);
    if (connection != null) return connection;

    // Create a connection and assign it to this allocation immediately. This makes it possible
    // for an asynchronous cancel() to interrupt the handshake we're about to do.
    route = selectedRoute;
    refusedStreamCount = 0;
    result = new RealConnection(connectionPool, selectedRoute);
    acquire(result);
  }

  // Do TCP + TLS handshakes. This is a blocking operation.
  result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
  routeDatabase().connected(result.route());

  Socket socket = null;
  synchronized (connectionPool) {
    // Pool the connection.
    Internal.instance.put(connectionPool, result);

    // If another multiplexed connection to the same address was created concurrently, then
    // release this connection and acquire that one.
    if (result.isMultiplexed()) {
      socket = Internal.instance.deduplicate(connectionPool, address, this);
      result = connection;
    }
  }
  closeQuietly(socket);

  return result;
}
  1. 先判断当前的RealConnection allocatedConnection = this.connection;判断当前连接是否已存在,如果存在且没有标记noNewStreams,则直接返回该连接
  2. 到连接池中寻找匹配的连接Internal.instance.get(connectionPool, address, this, null);。这里Internal.instance是一个抽象类中的静态变量,那在哪里实现的呢。我们看到OkHttpClient类。类中第三个static关键字就是instance的实现 static { Internal.instance = new Internal() { @Override public void addLenient(Headers.Builder builder, String line) { builder.addLenient(line); } @Override public RealConnection get(ConnectionPool pool, Address address, StreamAllocation streamAllocation, Route route) { return pool.get(address, streamAllocation, route); } ...省略代码 } 这里我们就知道其实最终调用到了ConnectionPool的get方法。我们查看 /** * Returns a recycled connection to {@code address}, or null if no such connection exists. The * route is null if the address has not yet been routed. */ @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (connection.isEligible(address, route)) { streamAllocation.acquire(connection); return connection; } } return null; } 内部就是循环遍历connections,找到匹配的Connection。至于如何判断,大家查看方法的实现即可。如果找到则直接返回,否则进入下一步
  3. 前面在调用ConnectionPool.get方法时候Route参数为空,这一步就是获取一个Route然后再次查找。如果成功就返回
  4. 经过上面三个步骤后,说明已经没有可用的Connection。那么就得创建一个, result = new RealConnection(connectionPool, selectedRoute); acquire(result);
  5. 创建完后调用acquire,这个是干啥的呢 public void acquire(RealConnection connection) { assert (Thread.holdsLock(connectionPool)); if (this.connection != null) throw new IllegalStateException(); this.connection = connection; connection.allocations.add(new StreamAllocationReference(this, callStackTrace)); } 把当前的StreamAllocation添加到RealConnection。这和我们前面说到的一个RealConnection可能对应多个StreamAllocation。
  6. 开始进行Socket连接 result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); routeDatabase().connected(result.route());
  7. 将RealConnection添加到connectionPool中 Internal.instance.put(connectionPool, result);

到这里findHealthyConnection执行完毕,结果获取一个可用的RealConnection。

findConnection《《《《《《《《《《《《《《《《《《《《End


继续findHealthyConnection的代码:

      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;

这里就是检查RealConnection是否正常可用。也就是做个体检isHealthy,如果发现返回False,那么标记这个RealConnection的noNewStreams为true。此变量标记为true后,代码后面就不要从使用这个RealConnection。何以得知呢?

看到前面第2步,从ConnectionPool调用get方法寻找合适的RealConnection,有一句判断,前面我们没有讲,这里我跟踪一下:

  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }

看到connection.isEligible(address, route)这句话,我们进入:

public boolean isEligible(Address address, @Nullable Route route) {
  // If this connection is not accepting new streams, we're done.
  if (allocations.size() >= allocationLimit || noNewStreams) return false;

  // If the non-host fields of the address don't overlap, we're done.
  if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
  .../省略代码

看到noNewStreams了吧, 现在知道他的用处了吧。还有allocations.size() >= allocationLimit,控制一个RealConnection可以被多少个StreamAllocation持有,这下都清楚了吧。

HttpCodec(Http1Codec)

Http请求和响应的编解码抽象HttpCodec

这是一个接口,定义了编解码的抽象方法。

public interface HttpCodec {
  /**
   * The timeout to use while discarding a stream of input data. Since this is used for connection
   * reuse, this timeout should be significantly less than the time it takes to establish a new
   * connection.
   */
  int DISCARD_STREAM_TIMEOUT_MILLIS = 100;

  /** Returns an output stream where the request body can be streamed. */
  Sink createRequestBody(Request request, long contentLength);

  /** This should update the HTTP engine's sentRequestMillis field. */
  void writeRequestHeaders(Request request) throws IOException;

  /** Flush the request to the underlying socket. */
  void flushRequest() throws IOException;

  /** Flush the request to the underlying socket and signal no more bytes will be transmitted. */
  void finishRequest() throws IOException;

  /**
   * Parses bytes of a response header from an HTTP transport.
   *
   * @param expectContinue true to return null if this is an intermediate response with a "100"
   *     response code. Otherwise this method never returns null.
   */
  Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;

  /** Returns a stream that reads the response body. */
  ResponseBody openResponseBody(Response response) throws IOException;

  /**
   * Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
   * That may happen later by the connection pool thread.
   */
  void cancel();
}

主要就是针对Request和Response的处理。将我们传入的请求Request编码成Http的协议请求,将响应解码成Response。

针对HTTP/1.1的实现Http1Codec

前面讲了HttpCodec的抽象方法。这里就是实现,Http协议也有多个版本,也就对应不同的实现。这里我们就看现在常用的Http/1.1。

而Http1Codec的创建在ConnectInterceptor中。

HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);

我们继续跟踪到StreamAllocation的newStream方法

HttpCodec resultCodec = resultConnection.newCodec(client, this);

继续进入:

  public HttpCodec newCodec(
      OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(client.readTimeoutMillis());
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
  }

两个参数我们已经很熟悉了。重点看到了new Http1Codec(client, streamAllocation, source, sink),在这里创建了Http1Codec。而传递的参数source、sink前面我们已经介绍了。在连接Socket后进行输入输出的封装。

Http1Codec核心方法实现

我们在介绍CallServerInterceptor的intercept方法时候,只是粗略的讲了下流程。这里我们将一下和Http1Codec相关的方法。

  • 发送请求头 httpCodec.writeRequestHeaders(request); 看到实现 /** * Prepares the HTTP headers and sends them to the server. * * <p>For streaming requests with a body, headers must be prepared <strong>before</strong> the * output stream has been written to. Otherwise the body would need to be buffered! * * <p>For non-streaming requests with a body, headers must be prepared <strong>after</strong> the * output stream has been written to and closed. This ensures that the {@code Content-Length} * header field receives the proper value. */ @Override public void writeRequestHeaders(Request request) throws IOException { String requestLine = RequestLine.get( request, streamAllocation.connection().route().proxy().type()); writeRequest(request.headers(), requestLine); } requestLine就是HTTP的起始行,内部大家可以自己查看。然后看到writeRequest

方法:

  /** Returns bytes of a request header for sending on an HTTP transport. */
  public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
  }

逻辑就很简单了,遍历Headers,将请求头写入到sink中。

  • 发送请求体
      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
        // being reused. Otherwise we're still obligated to transmit the request body to leave the
        // connection in a consistent state.
        streamAllocation.noNewStreams();
      }

这里会先判断,如果后请求体的话就发送。看到createRequestBody方法。

  @Override public Sink createRequestBody(Request request, long contentLength) {
    if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
      // Stream a request body of unknown length.
      return newChunkedSink();
    }

    if (contentLength != -1) {
      // Stream a request body of a known length.
      return newFixedLengthSink(contentLength);
    }

    throw new IllegalStateException(
        "Cannot stream a request body without chunked encoding or a known content length!");
  }

确定发送的请求数据大小是否确定,然后返回对应的Sink实现。接下来看到request.body().writeTo(bufferedRequestBody);

的实现。我们看到request.body()返回是RequestBody,一个抽象类,定义请求体的方法。看到自带的实现有FormBody、MultipartBody。我们挑一个看FormBody。看到writeTo方法:

  @Override public void writeTo(BufferedSink sink) throws IOException {
    writeOrCountBytes(sink, false);
  }

继续看:

  /**
   * Either writes this request to {@code sink} or measures its content length. We have one method
   * do double-duty to make sure the counting and content are consistent, particularly when it comes
   * to awkward operations like measuring the encoded length of header strings, or the
   * length-in-digits of an encoded integer.
   */
  private long writeOrCountBytes(@Nullable BufferedSink sink, boolean countBytes) {
    long byteCount = 0L;

    Buffer buffer;
    if (countBytes) {
      buffer = new Buffer();
    } else {
      buffer = sink.buffer();
    }

    for (int i = 0, size = encodedNames.size(); i < size; i++) {
      if (i > 0) buffer.writeByte('&');
      buffer.writeUtf8(encodedNames.get(i));
      buffer.writeByte('=');
      buffer.writeUtf8(encodedValues.get(i));
    }

    if (countBytes) {
      byteCount = buffer.size();
      buffer.clear();
    }

    return byteCount;
  }

这里就清晰了,将请求体(Form表单)遍历的写出。

  • 读取响应头
    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

进入到实现:

  @Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
    if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
      throw new IllegalStateException("state: " + state);
    }

    try {
      StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());

      Response.Builder responseBuilder = new Response.Builder()
          .protocol(statusLine.protocol)
          .code(statusLine.code)
          .message(statusLine.message)
          .headers(readHeaders());

      if (expectContinue && statusLine.code == HTTP_CONTINUE) {
        return null;
      }

      state = STATE_OPEN_RESPONSE_BODY;
      return responseBuilder;
    } catch (EOFException e) {
      // Provide more context if the server ends the stream before sending a response.
      IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
      exception.initCause(e);
      throw exception;
    }
  }

首先读取起始行statusLine。我们进入到实现source.readUtf8LineStrict()看怎么读取的

long newline = indexOf((byte) '\n', 0, scanLength);   
if (scanLength < Long.MAX_VALUE
        && request(scanLength) && buffer.getByte(scanLength - 1) == '\r'
        && request(scanLength + 1) && buffer.getByte(scanLength) == '\n') {
      return buffer.readUtf8Line(scanLength); // The line was 'limit' UTF-8 bytes followed by \r\n.
    }

这里先时候去到\n的位置。然后看到,起始行的结束是否是\r\n。最后读取并返回。StatusLine.parse就是解析得到Http的Method、Code、Message。下面读取响应头:

      Response.Builder responseBuilder = new Response.Builder()
          .protocol(statusLine.protocol)
          .code(statusLine.code)
          .message(statusLine.message)
          .headers(readHeaders());

先将起始行封装到responseBuilder中,然后readHeaders()读取响应头。

  /** Reads headers or trailers. */
  public Headers readHeaders() throws IOException {
    Headers.Builder headers = new Headers.Builder();
    // parse the result headers until the first blank line
    for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) {
      Internal.instance.addLenient(headers, line);
    }
    return headers.build();
  }

这里就是一行行读取响应头,然后添加到Headers中。细节大家跟踪到方法内部查看即可。

  • 封装响应体
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

先判断响应头,101啥意思呢:服务器将遵从客户的请求转换到另外一种协议(HTTP 1.1新)。所以就不用管响应体了。我们重点看到httpCodec.openResponseBody(response)

  @Override public ResponseBody openResponseBody(Response response) throws IOException {
    Source source = getTransferStream(response);
    return new RealResponseBody(response.headers(), Okio.buffer(source));
  }

两个步骤:

  • 第一步getTransferStream(response)
  private Source getTransferStream(Response response) throws IOException {
    if (!HttpHeaders.hasBody(response)) {
      return newFixedLengthSource(0);
    }

    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
      return newChunkedSource(response.request().url());
    }

    long contentLength = HttpHeaders.contentLength(response);
    if (contentLength != -1) {
      return newFixedLengthSource(contentLength);
    }

    // Wrap the input stream from the connection (rather than just returning
    // "socketIn" directly here), so that we can control its use after the
    // reference escapes.
    return newUnknownLengthSource();
  }

主要判断响应头Transfer-Encoding来确定响应体的数据大小是否确定,如果是chunked则是分块传输,则没有Content-Length。否则可以确定响应体大小。然后返回不同的Source实现。

  • 第二步 new RealResponseBody(response.headers(), Okio.buffer(source)
public final class RealResponseBody extends ResponseBody {
  private final Headers headers;
  private final BufferedSource source;

  public RealResponseBody(Headers headers, BufferedSource source) {
    this.headers = headers;
    this.source = source;
  }

  @Override public MediaType contentType() {
    String contentType = headers.get("Content-Type");
    return contentType != null ? MediaType.parse(contentType) : null;
  }

  @Override public long contentLength() {
    return HttpHeaders.contentLength(headers);
  }

  @Override public BufferedSource source() {
    return source;
  }
}

这个就是做了一个封装,没什么别的逻辑。


到此为止整个Http的发送和响应就介绍完毕了。

同步/异步请求

先来一张流程图:

Call.png

文章的开始我们发送http请求直接使用的同步请求

Response response = client.newCall(request).execute();

这样比较粗暴,我们还需要开启线程。如此OkHttp当然也就提供了异步调用方法。

client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
});

调用也是非常的方便的。整个请求OkHttp会帮我们开启线程,并完成Http请求。接下来我们就分析这块的流程。

newCall方法就不介绍了,我们看到enqueue方法。

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
  • 先判断是否已经executed。防止多次调用
  • 将任务加入到队里中
client.dispatcher().enqueue(new AsyncCall(responseCallback));

我们进入到enqueue方法。

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
  • 先判断当前正在请求的数量是否大于最大请求数,同一个主机的请求是否超过限制
  • 如果超过限制,则将AsyncCall任务放到readyAsyncCalls(准备任务)队列中。
  • 如果没有超过限制,加入到runningAsyncCalls(运行)队列中,并直接调度执行。

这里我们先看几个变量runningAsyncCalls、readyAsyncCalls:

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

这个写的很清楚,异步就绪队列、异步运行队列,同步运行队列。runningSyncCalls这个在前面同步调用的时候有涉及。剩下两个变量就在这里体现。整体的思路就是,异步调用先判断请求数量是否超限,如果没有直接交给线程池执行;超限就先放到准备队列中。

我们在看到executorService().execute(call);进入到executorService()方法:

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

这里很明显了一个单例的实现,方法上加上了synchronized关键字。ThreadPoolExecutor是创建一个线程池。

这里有朋友要问了,当请求数量超限制我们只看到了把任务放到准备队列中,那啥时候被调用呢?这里大家先别着急,后面会讲到。

到此我们已经把client.dispatcher().enqueue(new AsyncCall(responseCallback));

enqueue这个方法干了什么事讲清楚了。这里还设计一个类AsyncCall。我们看看:

  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

光看这里看不出啥门道,我们得看看父类NamedRunnable

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

嘿!就是个Runnable,run里就做了两个事

  • 设置线程名称
  • 进行execute()调用

然后我们回到AsyncCall看到execute的实现:

Response response = getResponseWithInterceptorChain();

有没有觉得很眼熟,这个不就是我们前面讲同步调用的时候,通过这个方法完成的请求。

请求成功进行CallBack回调

responseCallback.onResponse(RealCall.this, response)

失败或者发生异常也回调

responseCallback.onFailure(RealCall.this, e);

上面我们就把异步调用的发起和回调讲清楚了,前面我们还有个问题就是准备队列的任务啥时候被执行。

准备就绪队列任务的调度

我们还是看到AsyncCall的execute方法,真正的执行调用时在这个方法中,我们看到最后的finally块

finally {
        client.dispatcher().finished(this);
      }

跟踪到Dispatcher的finish方法

  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

看到重载的方法:

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

这里先把call从队列中移除。然后判断promoteCalls,这里我们知道是true,所以重点看到if (promoteCalls) promoteCalls();这段代码:

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
  • 还是先判断正在请求的数量是否超限制
  • 遍历readyAsyncCalls,找到符合条件的请求(同一个主机的请求数量是否超限制)。如果找到就从readyAsyncCalls中移除,然后加入到runningAsyncCalls。然后通过线程池获进行调度执行。

前面我们将到同步调用的时候,RealCall的execute()方法的开始有client.dispatcher().executed(this);

方法的结束finally调用了client.dispatcher().finished(this);。然后调用了Dispatcher的finished(runningSyncCalls, call, false);方法。这里和异步调用的区别就是最后一个参数为false。

到这里整个同步调用和异步调用我们就串联起来了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏智能大石头

关于自定义控件设计时如何把属性写入aspx中的研究(上)

如何通过继承GridView来修改在设计时绑定数据源时自动生成的ASP.Net代码? 具体情况是这样的,ObjectDataSource绑定到实体类,Grid...

2408
来自专栏草根专栏

从头编写 asp.net core 2.0 web api 基础框架 (4) EF配置

Github源码地址:https://github.com/solenovex/Building-asp.net-core-2-web-api-starter-...

7727
来自专栏Java帮帮-微信公众号-技术文章全总结

数据库连接池、dbutil_知识点全掌握

数据库连接池、dbutil ? 数据库连接池 1 数据库连接池的概念 用池来管理Connection,这可以重复使用Connection。有了池,所以我们就不用...

3005
来自专栏java 成神之路

RocketMQ 底层通信机制 源码分析

RocketMQ 底层通讯是使用Netty来实现的。 下面我们通过源码分析下RocketMQ是怎么利用Netty进行通讯的。

1682
来自专栏Android先生

Android开发者怎么能不会写后台接口呢?

然后在src下创建三个包,一个放Servlet,一个放mysql的工具类,一个放对象;

823
来自专栏分布式系统进阶

Influxdb的Meta data分析

Influxdb定义了一个Service:Precreator Serivec(services/precreator/service.go),实现比较简单,周...

1302
来自专栏张善友的专栏

Contact Manager Web API 示例[1]CRUD 操作

联系人管理器web API是一个Asp.net web api示例程序,演示了通过ASP.NET Web API 公开联系信息,并允许您添加和删除联系人,示例地...

2269
来自专栏Linux驱动

46.Linux-分析rc红外遥控平台驱动框架,修改内核的NEC解码函数BUG(1)

1.2然后在drivers\media\rc\keymaps里存了各种不同的键映射文件

2502
来自专栏大内老A

ASP.NET MVC涉及到的5个同步与异步,你是否傻傻分不清楚?[上篇]

Action方法的执行具有两种基本的形式,即同步执行和异步执行,而在ASP.NETMVC的整个体系中涉及到很多同步/异步的执行方式,虽然在前面相应的文章中已经对...

2316
来自专栏coolblog.xyz技术专栏

Spring AOP 源码分析 - 拦截器链的执行过程

2643

扫码关注云+社区

领取腾讯云代金券