OKHttp源码解析(三)--中阶之线程池和消息队列

上篇文章已经说明了OKHttp有两种调用方式,一种是阻塞的同步请求,一种是异步的非阻塞的请求。今天主要是讲解的是异步的请求。其中涉及了Dispatcher这个类,其他的类基本已经在上篇文章介绍过了。所以本片文章的大体思路如下:

  • 1.线程池的理解
  • 2.Dispatcher类详解
  • 3.OKHttp的任务调度
  • 4.OKHttp调度的理解

在讲解线程池和消息队列的时候有必要讲下线程池的基本概念

一、线程池的理解

(一)android中的异步任务

android的异步任务一般都是用Thread+Handler或者AsyncTask来实现,其中笔者当初经历过各种各样坑,特别是内存泄漏,当初笔者可是相当的欲死欲仙啊!所以现在很少有开发者还在用这一套来做异步任务,现在一般都是Rxjava为主,当然还有自己自定义的异步任务框架(比如笔者),像RxJava都帮我们写好了对应场景的线程池,这是为什么?

1、线程池的理解

我对线程池的理解是有两个层次,一种是狭隘的,一种是广义的,那么咱们各自都说下

(1)狭义上的线程池:

线程池是一种多线程处理形式,处理过程中将任务添加到队列中,后面再创建线程去处理这些任务,线程池里面的线程都是后台线程,每个线程都是默认的优先级下运。如果某个线程处于空闲中,将添加一个任务进来,让空闲线程去处理任务。如果所有线程都很繁忙,消息队列会挂起,等待某个线程池空闲后再处理任务。这样可以保证线程数量不能超多最大数量。

(2)广义上的线程池:

多线程技术主要是解决处理器单元内多个线程执行的问题,它可以显著减少处理的单元闲置时间,增加处理器单元的吞吐能力。如果对多线程应用不当,会增加对单个任务的的处理时间。

举例说明: 假如一个服务器完成一项任务的时间为T:

T1 创建线程的时间 T2 在线程中执行任务的时间,包括线程同步所需要的时间 T3 线程销毁的时间

显然 T= T1+T2+T3. 注意:这是一个理想化的情况

可以看出,T1,T3是多线程自身带来的开销(在Java中,通过映射pThread,并进一步通过SystemCall实现native线程),我们渴望减少T1和T3的时间,从而减少T的时间。但是一些线程的使用者并没有注意到这一点,所以在线程中频繁的创建或者销毁线程,这导致T1和T3在T中占有相当比例。这显然突出的线程池的弱点(T1,T3),而不是有点(并发性)。 取自IBM知识库

所以线程池的技术正是如何关注缩短或调整T1,T3时间的技术,从而提高服务器程序的性能。

  • 1、通过对线程进行缓存,减少创建和销毁时间的损失
  • 2、通过控制线程数量的阀值,减少线程过少带来的CPU闲置(比如长时间卡在I/O上了)与线程过多给JVM内存与线程切换时系统调用的压力。

在平时我们可以通过线程工厂来创建线程池来尽量避免上述的问题。

二 Dispatcher 类详解

Dispatcher负责请求的分发

1、线程池executeService

/** Executes calls. Created lazily. */
private ExecutorService executorService;

public synchronized ExecutorService executorService() {
   if (executorService == null) {
     executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
   }
   return executorService;
 }

由上面代码可以得出Dispatcher内部实现了懒加载的无边界限制的线程池。参数解析

  • 1、0:核心线程数量,保持在线程池中的线程数量(即使已经空闲),为0代表线程空闲后不会保留,等待一段时间后停止。
  • 2、Integer.MAX_VALUE:表示线程池可以容纳最大线程数量
  • 3、TimeUnit.SECOND:当线程池中的线程数量大于核心线程时,空闲的线程就会等待60s才会被终止,如果小于,则会立刻停止。
  • 4、new SynchronousQueue<Runnable>():线程等待队列。同步队列,按序排队,先来先服务
  • 5、Util.threadFactory("OkHttp Dispatcher", false):线程工厂,直接创建一个名为OkHttp Dispatcher的非守护线程。

(1)SynchronousQueue每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量为0,严格说并不是一种容器,由于队列没有容量,因此不能调用peek等操作,因此只有移除元素才有元素,显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入者(生产者)传递给移除者(消费者),这在多任务队列中最快的处理任务方式。对于高频请求场景,无疑是最合适的。

(2)在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。

2、发起请求

整个框架主要通过Call来封装每一次的请求。同时Call持有OkHttpClient和一份Request。而每一次的同步或者异步请求都会有Dispatcher的参与。

(1)、同步

Dispatcher在执行同步的Call:直接加入到runningSyncCall队列中,实际上并没有执行该Call,而是交给外部执行

  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
(2)、异步

将Call加入队列:如果当前正在执行的call的数量大于maxRequest(64),或者该call的Host上的call超过maxRequestsPerHos(5),则加入readyAsyncCall排队等待,否则加入runningAsyncCalls并执行

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

3、结束请求

从ready到running,在每个call结束的时候都会调用finished

 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //每次remove完后,执行promoteCalls来轮转。
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    //线程池为空时,执行回调
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  private void promoteCalls() {
     //如果当前执行的线程大于maxRequests(64),则不操作
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

通过上面代码,大家可以知道finished先执行calls.remove(call)删除call,然后执行promoteCalls(),在promoteCalls()方法里面:如果当前线程大于maxRequest则不操作,如果小于maxRequest则遍历readyAsyncCalls,取出一个call,并把这个call放入runningAsyncCalls,然后执行execute。在遍历过程中如果runningAsyncCalls超过maxRequest则不再添加,否则一直添加。所以可以这样说:

promoteCalls()负责ready的Call到running的Call的转化

具体的执行请求则在RealCall里面实现的,同步的在RealCall的execute里面实现的,而异步的则在AsyncCall的execute里面实现的。里面都是调用RealCall的getResponseWithInterceptorChain的方法来实现责任链的调用。

三、OKHttp的任务调度

在说调度任务之前先说下

1、Dispatcher任务调度

在OKHttp中,它使用Dispatcher作为任务的调度器。

如下图所示

Dispatcher.png

在整个调度流程中涉及的成员如下: 其中

  • Dispatcher 对象是分发者,也是生产者(默认在主线程中)
  • AsyncCall 对象其实是一个任务即Runnable(内部做了包装异步接口)
// Dispatcher.java 
maxRequests = 64   // 最大并发请求数为64
maxRequestsPerHost = 5 //每个主机最大请求数为5
ExecutorService executorService  //消费者池(也就是线程池)
Deque<AsyncCall> readyAsyncCalls: // 异步的缓存,正在准备被消费的(用数组实现,可自动扩容,无大小限制)
Deque<AsyncCall> runningAsyncCalls //正在运行的 异步的任务集合,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存
Deque<RealCall> runningSyncCalls  //正在运行的,同步的任务集合。仅仅是用来引用正在运行的同步任务以判断并发量

通过将请求任务分发给多个线程,可以显著的减少I/O等待时间

2、OKHttp调度的具体流程分析

(1)同步调度分析
第一步是:是调用了RealCall的execute()方法里面调用executed(this);
  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }
第二步:在Dispatcher里面的executed执行入队操作
 /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
第三步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response,并返回Response result 。
第四步:执行client.dispatcher().finished(this)操作
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

这里其实做的是出队操作。至此同步的调度就已经结束了

(2)异步调度分析
AsyncCall类简介

在讲解异步调度之前不得不提到AsyncCall这个类,AsyncCall,他其实是RealCall的内部类

 //RealCall.java
final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        //执行耗时任务
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          //retryAndFollowUpInterceptor取消了 执行失败
          signalledCallback = true;
         //回调,注意这里回调是在线程池中,不是主线程
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          //一切正常走入正常流程
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
       //最后执行出队
        client.dispatcher().finished(this);
      }
    }
  }
第一步 是调用了RealCall的enqueue()方法
  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

在enqueue里面调用了client.dispatcher().enqueue(new AsyncCall(responseCallback));方法

第二步:在Dispatcher里面的enqueue执行入队操作
  synchronized void enqueue(AsyncCall call) {
    //判断是否满足入队的条件(立即执行)
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      //正在运行的异步集合添加call
      runningAsyncCalls.add(call);
      //执行这个call
      executorService().execute(call);
    } else {
      //不满足入队(立即执行)条件,则添加到等待集合中
      readyAsyncCalls.add(call);
    }
  }

上述代码发现想要入队需要满足下面的条件

(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost)

如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。 runningAsyncCalls.size() < maxRequests 表示当前正在运行的AsyncCall是否小于maxRequests = 64 runningCallsForHost(call) < maxRequestsPerHos 表示同一个地址访问的AsyncCall是否小于maxRequestsPerHost = 5; 即 当前正在并发的请求不能超过64且同一个地址的访问不能超过5个

第三步:这里分两种情况

情况1 第三步 可以直接入队
runningAsyncCalls.add(call);
第四步:线程池executorService执行execute()方法
executorService().execute(call);

由于AsyncCall继承于NamedRunnable类,而NamedRunnable类又是Runnable类的实现类,所以走到了AsyncCall的execute()方法里面

第五步:执行AsyncCall的execute()方法
     @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
第六步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response。
第七步:如果是正常的获取到Response,则执行responseCallback.onResponse()
第八步:执行client.dispatcher().finished(this)操作 进行出队操作
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

PS:注意这里面第三个参数 同步是false,异步是true,如果是异步则需要进行是否添加继续入队的情景

情况2 第三步 不能直接入队,需要等待
readyAsyncCalls.add(call);
第四步 触发条件

能进入等待则说明当前要么有64条正在进行的并发,要么同一个地址有5个请求,所以要等待。

当有如下条件被满足或者触发的时候则执行promoteCalls操作

  • 1 Dispatcher的setMaxRequestsPerHost()方法被调用时
   public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
    //设置的maxRequestsPerHost不能小于1
    if (maxRequestsPerHost < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
    }
    this.maxRequestsPerHost = maxRequestsPerHost;
    promoteCalls();
  }
  • 2 Dispatcher的setMaxRequests()被调用时
public synchronized void setMaxRequests(int maxRequests) {
     //设置的maxRequests不能小于1
    if (maxRequests < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequests);
    }
    this.maxRequests = maxRequests;
    promoteCalls();
  }
  • 3当有一条请求结束了,执行了finish()的出队操作,这时候会触发promoteCalls()进行调整
 if (promoteCalls) 
    promoteCalls();
第五步 执行Dispatcher的promoteCalls()方法
private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
第六步 先判断是否满足 初步入队条件
 if (runningAsyncCalls.size() >= maxRequests) 
    return;
 if (readyAsyncCalls.isEmpty()) 
    return; // No ready calls to promote.

如果此时 并发的数量还是大于maxRequests=64则return并继续等待 如果此时,没有等待的任务,则直接return并继续等待

第七步 满足初步的入队条件,进行遍历,然后进行第二轮入队判断
  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }

进行同一个host是否已经有5请求在了,如果在了,则return返回并继续等待

第八步 此时已经全部满足条件,则从等待队列面移除这个call,然后添加到正在运行的队列中
        i.remove();
        runningAsyncCalls.add(call);
第九步 线程池executorService执行execute()方法
executorService().execute(call);
第十步:执行AsyncCall的execute()方法
     @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
第十一步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response。
第十二步:如果是正常的获取到Response,则执行responseCallback.onResponse()
第十三步:执行client.dispatcher().finished(this)操作 进行出队操作
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

至此 异步任务调度已经结束了

总结:

  • 1、异步流程总结 所以简单的描述下异步调度为:如果当前还能可以执行异步任务,则入队,并立即执行,否则加入readyAsyncCalls队列,当一个请求执行完毕后,会调用promoteCalls(),来把readyAsyncCalls队列中的Async移出来并加入到runningAsyncCalls,并开始执行。然后在当前线程中去执行Call的getResponseWithInterceptorChain()方法,直接获取当前的返回数据Response
  • 2、对比同步和异步任务,我们会发现:同步请求和异步请求原理都是一样的,都是在getResponseWithInterceptorChain()函数通过Interceptor链条来实现网络请求逻辑,而异步任务则通过ExecutorService来实现的。PS:在Dispatcher中添加一个封装了Callback的Call的匿名内部类AsyncCall来执行当前 的Call。这个AsyncCall是Call的匿名内部类。AsyncCall的execute方法仍然会回调到Call的 getResponseWithInterceptorChain方法来完成请求,同时将返回数据或者状态通过Callback来完成。

四、OKHttp调度的"优雅'之处:

1、采用Dispacher作为调度,与线程池配合实现了高并发,低阻塞的的运行 2、采用Deque作为集合,按照入队的顺序先进先出 3、最精彩的就是在try/catch/finally中调用finished函数,可以主动控制队列的移动。避免了使用锁而wait/notify操作。

OKHTTP_Dispatcher1.png

图片链接地址为:http://i1.piimg.com/1949/df333390bf92e381.png

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大内老A

[WCF REST] 通过ASP.NET Output Caching实现声明式缓存

ASP.NET的输出缓存(Output Caching)机制允许我们针对整个Web页面或者页面的某个部分(主要针对用户控件)最终呈现的HTML进行缓存。对于后续...

2057
来自专栏坚毅的PHP

jersey处理支付宝异步回调通知的问题:java.lang.IllegalArgumentException: Error parsing media type 'application/x-www

tcpflow以流为单位分析请求内容,非常适合服务器端接口类服务查问题 这次遇到的问题跟支付宝支付后的回调post结果有关 淘宝的代码例子: publi...

6105
来自专栏Golang语言社区

package http

要管理代理、TLS配置、keep-alive、压缩和其他设置,创建一个Transport:

2304
来自专栏Java3y

Servlet第五篇【介绍会话技术、Cookie的API、详解、应用】

什么是会话技术 基本概念: 指用户开一个浏览器,访问一个网站,只要不关闭该浏览器,不管该用户点击多少个超链接,访问多少资源,直到用户关闭浏览器,整个这个过程我们...

3305
来自专栏移动端开发

iOS 从实际出发理解多线程

前言 ----       多线程很多开发者多多少少相信也都有了解,以前有些东西理解的不是很透,慢慢的积累之后,这方面的东西也需要自己好好的总结一下。多线程从我...

2187
来自专栏Hellovass 的博客

社交化分享组件踩坑

问题是这样的,项目里的社交化分享是基于 UMShare 封装成的一个 ShareLib module,为了让这个 module 对调用者说更透明,我将 WXEn...

3345
来自专栏Linux驱动

第1阶段——uboot分析之硬件初始化start.S(4)

分析uboot第一个执行函数_start(cpu/arm920t/start.S)  打开cpu/arm920t/start.S 1 .globl _start...

2728
来自专栏大内老A

WCF技术剖析之三十二:一步步创建一个完整的分布式事务应用

在完成了对于WCF事务编程(《上篇》、《中篇》、《下篇》)的介绍后,本篇文章将提供一个完整的分布式事务的WCF服务应用,通过本例,读者不仅仅会了解到如何编程实现...

2087
来自专栏Janti

基础巩固——长连接 、短连接、心跳机制与断线重连

本文将从长连接和短连接的概念切入,再到长连接与短连接的区别,以及应用场景,引出心跳机制和断线重连,给出代码实现。

4231
来自专栏lzj_learn_note

Volley源码分析学习

2)根据SDK版本来创建HttpStack的实现,如果是2.3以上的,则使用基于HttpUrlConnection实现的HurlStack,反之,则利用Http...

1176

扫码关注云+社区

领取腾讯云代金券