刨解OkHttp框架

继AsyncTask,又把手术刀指向OkHttp,有时候解析源码会上瘾。因为源码里包含的东西仿佛就是组成计算机世界的砖头,水分,只要有这些东西,就可以保罗万物,无招胜有招。又说多了,开始吧

首先okhttp的依赖是:

compile 'com.squareup.okhttp3:okhttp:3.8.1'

我就是根据这里的源码进行解析的。

再来也很简单,就是最简单的OkHttp的同步和异步网络访问:

OkHttpClient client = new OkHttpClient();

 //同步网络访问
public String Synch(String url) throws IOException {
    Request request = new Request.Builder().url(url).build();
    Response response = client.newCall(request).execute();
    return response.body().string();
}

//异步网络访问
public void Async(String url) throws IOException {
    Request request = new Request.Builder().url(url).build();
    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {}
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            Log.e("response",response.body().string());
        }
    });
}

OkHttpClient

我们首先是定义OkHttpClient,通过看源码如下:

public OkHttpClient() {
    this(new Builder());
}

 public static final class Builder {
    Dispatcher dispatcher;
    @Nullable Proxy proxy;
    List<Protocol> protocols;
    List<ConnectionSpec> connectionSpecs;
    final List<Interceptor> interceptors = new ArrayList<>();
    final List<Interceptor> networkInterceptors = new ArrayList<>();
    EventListener.Factory eventListenerFactory;
    ProxySelector proxySelector;
    CookieJar cookieJar;
    @Nullable Cache cache;
    @Nullable InternalCache internalCache;
    SocketFactory socketFactory;
    @Nullable SSLSocketFactory sslSocketFactory;
    @Nullable CertificateChainCleaner certificateChainCleaner;
    HostnameVerifier hostnameVerifier;
    CertificatePinner certificatePinner;
    Authenticator proxyAuthenticator;
    Authenticator authenticator;
    ConnectionPool connectionPool;
    Dns dns;
    boolean followSslRedirects;
    boolean followRedirects;
    boolean retryOnConnectionFailure;
    int connectTimeout;
    int readTimeout;
    int writeTimeout;
    int pingInterval;

    public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }
     
 }

OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;
    this.proxy = builder.proxy;
    this.protocols = builder.protocols;
    this.connectionSpecs = builder.connectionSpecs;
    this.interceptors = Util.immutableList(builder.interceptors);
    this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
    this.eventListenerFactory = builder.eventListenerFactory;
    this.proxySelector = builder.proxySelector;
    this.cookieJar = builder.cookieJar;
    this.cache = builder.cache;
    this.internalCache = builder.internalCache;
    this.socketFactory = builder.socketFactory;

    boolean isTLS = false;
    for (ConnectionSpec spec : connectionSpecs) {
      isTLS = isTLS || spec.isTls();
    }

    if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;
      this.certificateChainCleaner = builder.certificateChainCleaner;
    } else {
      X509TrustManager trustManager = systemDefaultTrustManager();
      this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
      this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    }

    this.hostnameVerifier = builder.hostnameVerifier;
    this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
        certificateChainCleaner);
    this.proxyAuthenticator = builder.proxyAuthenticator;
    this.authenticator = builder.authenticator;
    this.connectionPool = builder.connectionPool;
    this.dns = builder.dns;
    this.followSslRedirects = builder.followSslRedirects;
    this.followRedirects = builder.followRedirects;
    this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
    this.connectTimeout = builder.connectTimeout;
    this.readTimeout = builder.readTimeout;
    this.writeTimeout = builder.writeTimeout;
    this.pingInterval = builder.pingInterval;
}

看出OkHttpClient主要是进行各种参数的初始化,通过builder对象进行初始化再赋值给OkHttpClient。第二种也可以用建造者模式,初始化OkHttpClient,如下:

OkHttpClient okHttpClient=new OkHttpClient.Builder().build();

注意事项:OkHttpClient强烈建议全局单例使用,因为每一个OkHttpClient都有自己单独的连接池和线程池,复用连接池和线程池能够减少延迟、节省内存。

Request

每一个HTTP请求包含一个URL、一个方法(GET或POST或其他)、一些HTTP头。请求还可能包含一个特定内容类型的数据类的主体部分。而

Request request = new Request.Builder().url(url).build();

和OkHttpClient一样,只是做这些东西的初始化。因为很简单,所以这就不像上面列源码细讲了。

同步网络访问

Request request = new Request.Builder().url(url).build();
Response response = client.newCall(request).execute();
response.body().string();

接着我们拿同步网络进行下去,OkHttpClient调用了newCall();代码如下:

 @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
 }

然后调用了RealCall的execute()方法,方法源码如下:

@Override 
public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed= = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
}

首先executed=ture,确保每一个call对象只能使用一次原则,然后就调用了captureCallStackTrace(),源码如下:

private void captureCallStackTrace() {
    Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
    retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}

public Object getStackTraceForCloseable(String closer) {
    if (logger.isLoggable(Level.FINE)) {
      return new Throwable(closer); // These are expensive to allocate.
    }
    return null;
}

public final class RetryAndFollowUpInterceptor implements Interceptor {
  public void setCallStackTrace(Object callStackTrace) {
    this.callStackTrace = callStackTrace;
  }
}

可以看出captureCallStackTrace什么也没有做,只是把一个对象传进retryAndFollowUpInterceptor, 其中他这个的作用就像他名字一样,就是一个堆栈跟踪,捕获了这个请求的StackTrace。

接着 client.dispatcher().executed(this); 对应的源码如下:

public final class Dispatcher { 
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();    
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
}

他这也是把请求放在一个双向队列里。也没做什么操作。 然后就是重点了,可以说整个网络请求获取数据都是靠 Response result = getResponseWithInterceptorChain();这句代码,Response装载了所有的访问数据,而getResponseWithInterceptorChain()做了什么? 源码如下:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //添加开发者应用层自定义的Interceptor
    interceptors.addAll(client.interceptors());
    //这个Interceptor是处理请求失败的重试,重定向    
    interceptors.add(retryAndFollowUpInterceptor);
    //这个Interceptor工作是添加一些请求的头部或其他信息
    //并对返回的Response做一些友好的处理(有一些信息你可能并不需要)
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //这个Interceptor的职责是判断缓存是否存在,读取缓存,更新缓存等等
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //这个Interceptor的职责是建立客户端和服务器的连接
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      //添加开发者自定义的网络层拦截器
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //一个包裹这request的chain
    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    //把chain传递到第一个Interceptor手中
    return chain.proceed(originalRequest);
}

看上面的代码,不断的添加各种拦截器,最后就创建RealInterceptorChain然后调用proceed(),先看一下RealInterceptorChain类,

public final class RealInterceptorChain implements Interceptor.Chain {
  private final List<Interceptor> interceptors;
  private final StreamAllocation streamAllocation;
  private final HttpCodec httpCodec;
  private final RealConnection connection;
  private final int index;
  private final Request request;
  private int calls;

  public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
      HttpCodec httpCodec, RealConnection connection, int index, Request request) {
    this.interceptors = interceptors;
    this.connection = connection;
    this.streamAllocation = streamAllocation;
    this.httpCodec = httpCodec;
    this.index = index;
    this.request = request;
  }

  @Override public Connection connection() {
    return connection;
  }

  public StreamAllocation streamAllocation() {
    return streamAllocation;
  }

  public HttpCodec httpStream() {
    return httpCodec;
  }

  @Override public Request request() {
    return request;
  }

  @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    return response;
  }
}

从proceed()看创建RealInterceptorChain对象时候httpCodec直接赋予了null,所以略过判断,然后调用了这个interceptor.intercept(next);方法,就是执行前面添加的拦截器的intercept()方法; 而每一个拦截器的intercept()又会调用下一个拦截器的proceed(),下一个拦截器的proceed()又调用这个拦截器的intercept(),由此类推一个一个往下调。最后一个返回结果,在一层一层向上返回.

如下面的关键代码,每个拦截器都有对应的代码一步步的调下一个拦截器。

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

    @Nullable Connection connection();
  }
}
RealInterceptorChain implements Interceptor.Chain{
    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
        RealInterceptorChain next = new RealInterceptorChain(
            interceptors, streamAllocation, httpCodec, connection, index + 1, request);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
      }
}
public final class RetryAndFollowUpInterceptor implements Interceptor {
    @Override 
    public Response intercept(Chain chain) throws IOException {
        Response response = ((RealInterceptorChain) chain).proceed(request,streamAllocation,null,null);
    }
}
public final class BridgeInterceptor implements Interceptor {
    @Override 
    public Response intercept(Chain chain) throws IOException {
        Request.Builder requestBuilder = userRequest.newBuilder();
        Response networkResponse = chain.proceed(requestBuilder.build());
    }
}
public final class CacheInterceptor implements Interceptor {
    @Override 
    public Response intercept(Chain chain) throws IOException {
    Response networkResponse = null;
    networkResponse = chain.proceed(networkRequest);
    }
}

用别人的一张图就是这样:

流程图.PNG

各个拦截器形成拦截器链,OkHttp的这种拦截器链采用的是责任链模式,这样的好处是将请求的发送和处理分开,并且可以动态添加中间的处理方实现对请求的处理、短路等操作。

最后的client.dispatcher().finished(this);源码如下:

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
}

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
}

最后通过calls.remove(call),移走了对了,因为readyAsyncCalls,readyAsyncCalls都为空,所以promoteCalls()不会触发。 起始上面最核心的就是拦截器的责任链模式。是值得我们学习的。

异步网络访问

因为很多和同步是一样的,所以就讲关键代码:

client.dispatcher().enqueue(new AsyncCall(responseCallback));

dispatcher()方法的源码是

 public Dispatcher dispatcher() {
    return dispatcher;
  }

没什么讲的,接着就是说重要的enqueue(),

private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
private int maxRequestsPerHost = 5;
private int maxRequests = 64;

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
}

private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
}

但异步请求数量小于65并且请求访问的域名小于5,就会添加到runningAsyncCalls队列中,然后executorService线程池去运行,否则就添加到readyAsyncCalls等待队列中,executorService具体是什么线程池呢,看如下源码:

private ExecutorService executorService;
  
public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

这是一个阀值为Integer.MAX_VALUE,不保留任何核心线程,用多少创多少,最多只能存活60秒,他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。 接着我们传进executorService里的AsyncCall,源码如下:

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }
  
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

AsyncCall继承了Runnable,就像我之前解析AsyncTask一样,先线程池先执行NamedRunnable的run()方法,中途再执行AsyncCall的execute()方法,而整个网络访问还是像我们同步访问那样, Response response = getResponseWithInterceptorChain();通过责任获取访问然后再接口回调,获取服务器返回的数据。 最后又是执行client.dispatcher().finished(this);先执行calls.remove(call)删除call,当异步的缓存队列readyAsyncCalls有缓存请求时且满足条件时,就会执行promoteCalls()方法里的代码,就是在readyAsyncCalls取出一个call,并把这个call放入runningAsyncCalls,然后执行execute.

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

扫码关注云+社区

领取腾讯云代金券