上篇文章已经说明了OKHttp有两种调用方式,一种是阻塞的同步请求,一种是异步的非阻塞的请求。今天主要是讲解的是异步的请求。其中涉及了Dispatcher这个类,其他的类基本已经在上篇文章介绍过了。所以本片文章的大体思路如下:
在讲解线程池和消息队列的时候有必要讲下线程池的基本概念
android的异步任务一般都是用Thread+Handler或者AsyncTask来实现,其中笔者当初经历过各种各样坑,特别是内存泄漏,当初笔者可是相当的欲死欲仙啊!所以现在很少有开发者还在用这一套来做异步任务,现在一般都是Rxjava为主,当然还有自己自定义的异步任务框架(比如笔者),像RxJava都帮我们写好了对应场景的线程池,这是为什么?
我对线程池的理解是有两个层次,一种是狭隘的,一种是广义的,那么咱们各自都说下
线程池是一种多线程处理形式,处理过程中将任务添加到队列中,后面再创建线程去处理这些任务,线程池里面的线程都是后台线程,每个线程都是默认的优先级下运。如果某个线程处于空闲中,将添加一个任务进来,让空闲线程去处理任务。如果所有线程都很繁忙,消息队列会挂起,等待某个线程池空闲后再处理任务。这样可以保证线程数量不能超多最大数量。
多线程技术主要是解决处理器单元内多个线程执行的问题,它可以显著减少处理的单元闲置时间,增加处理器单元的吞吐能力。如果对多线程应用不当,会增加对单个任务的的处理时间。
举例说明: 假如一个服务器完成一项任务的时间为T:
T1 创建线程的时间 T2 在线程中执行任务的时间,包括线程同步所需要的时间 T3 线程销毁的时间
显然 T= T1+T2+T3. 注意:这是一个理想化的情况
可以看出,T1,T3是多线程自身带来的开销(在Java中,通过映射pThread,并进一步通过SystemCall实现native线程),我们渴望减少T1和T3的时间,从而减少T的时间。但是一些线程的使用者并没有注意到这一点,所以在线程中频繁的创建或者销毁线程,这导致T1和T3在T中占有相当比例。这显然突出的线程池的弱点(T1,T3),而不是有点(并发性)。 取自IBM知识库
所以线程池的技术正是如何关注缩短或调整T1,T3时间的技术,从而提高服务器程序的性能。
在平时我们可以通过线程工厂来创建线程池来尽量避免上述的问题。
Dispatcher负责请求的分发
/** Executes calls. Created lazily. */
private ExecutorService executorService;
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
由上面代码可以得出Dispatcher内部实现了懒加载的无边界限制的线程池。参数解析
(1)SynchronousQueue每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量为0,严格说并不是一种容器,由于队列没有容量,因此不能调用peek等操作,因此只有移除元素才有元素,显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入者(生产者)传递给移除者(消费者),这在多任务队列中最快的处理任务方式。对于高频请求场景,无疑是最合适的。
(2)在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。
整个框架主要通过Call来封装每一次的请求。同时Call持有OkHttpClient和一份Request。而每一次的同步或者异步请求都会有Dispatcher的参与。
Dispatcher在执行同步的Call:直接加入到runningSyncCall队列中,实际上并没有执行该Call,而是交给外部执行
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
将Call加入队列:如果当前正在执行的call的数量大于maxRequest(64),或者该call的Host上的call超过maxRequestsPerHos(5),则加入readyAsyncCall排队等待,否则加入runningAsyncCalls并执行
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
从ready到running,在每个call结束的时候都会调用finished
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//每次remove完后,执行promoteCalls来轮转。
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//线程池为空时,执行回调
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
//如果当前执行的线程大于maxRequests(64),则不操作
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
通过上面代码,大家可以知道finished先执行calls.remove(call)删除call,然后执行promoteCalls(),在promoteCalls()方法里面:如果当前线程大于maxRequest则不操作,如果小于maxRequest则遍历readyAsyncCalls,取出一个call,并把这个call放入runningAsyncCalls,然后执行execute。在遍历过程中如果runningAsyncCalls超过maxRequest则不再添加,否则一直添加。所以可以这样说:
具体的执行请求则在RealCall里面实现的,同步的在RealCall的execute里面实现的,而异步的则在AsyncCall的execute里面实现的。里面都是调用RealCall的getResponseWithInterceptorChain的方法来实现责任链的调用。
在OKHttp中,它使用Dispatcher作为任务的调度器。
如下图所示
Dispatcher.png
在整个调度流程中涉及的成员如下: 其中
// Dispatcher.java
maxRequests = 64 // 最大并发请求数为64
maxRequestsPerHost = 5 //每个主机最大请求数为5
ExecutorService executorService //消费者池(也就是线程池)
Deque<AsyncCall> readyAsyncCalls: // 异步的缓存,正在准备被消费的(用数组实现,可自动扩容,无大小限制)
Deque<AsyncCall> runningAsyncCalls //正在运行的 异步的任务集合,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存
Deque<RealCall> runningSyncCalls //正在运行的,同步的任务集合。仅仅是用来引用正在运行的同步任务以判断并发量
通过将请求任务分发给多个线程,可以显著的减少I/O等待时间
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
这里其实做的是出队操作。至此同步的调度就已经结束了
在讲解异步调度之前不得不提到AsyncCall这个类,AsyncCall,他其实是RealCall的内部类
//RealCall.java
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
//执行耗时任务
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
//retryAndFollowUpInterceptor取消了 执行失败
signalledCallback = true;
//回调,注意这里回调是在线程池中,不是主线程
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//一切正常走入正常流程
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最后执行出队
client.dispatcher().finished(this);
}
}
}
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
在enqueue里面调用了client.dispatcher().enqueue(new AsyncCall(responseCallback));方法
synchronized void enqueue(AsyncCall call) {
//判断是否满足入队的条件(立即执行)
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//正在运行的异步集合添加call
runningAsyncCalls.add(call);
//执行这个call
executorService().execute(call);
} else {
//不满足入队(立即执行)条件,则添加到等待集合中
readyAsyncCalls.add(call);
}
}
上述代码发现想要入队需要满足下面的条件
(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost)
如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。 runningAsyncCalls.size() < maxRequests 表示当前正在运行的AsyncCall是否小于maxRequests = 64 runningCallsForHost(call) < maxRequestsPerHos 表示同一个地址访问的AsyncCall是否小于maxRequestsPerHost = 5; 即 当前正在并发的请求不能超过64且同一个地址的访问不能超过5个
第三步:这里分两种情况
runningAsyncCalls.add(call);
executorService().execute(call);
由于AsyncCall继承于NamedRunnable类,而NamedRunnable类又是Runnable类的实现类,所以走到了AsyncCall的execute()方法里面
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
PS:注意这里面第三个参数 同步是false,异步是true,如果是异步则需要进行是否添加继续入队的情景
readyAsyncCalls.add(call);
能进入等待则说明当前要么有64条正在进行的并发,要么同一个地址有5个请求,所以要等待。
当有如下条件被满足或者触发的时候则执行promoteCalls操作
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
//设置的maxRequestsPerHost不能小于1
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
}
public synchronized void setMaxRequests(int maxRequests) {
//设置的maxRequests不能小于1
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
if (promoteCalls)
promoteCalls();
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
if (runningAsyncCalls.size() >= maxRequests)
return;
if (readyAsyncCalls.isEmpty())
return; // No ready calls to promote.
如果此时 并发的数量还是大于maxRequests=64则return并继续等待 如果此时,没有等待的任务,则直接return并继续等待
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
进行同一个host是否已经有5请求在了,如果在了,则return返回并继续等待
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
至此 异步任务调度已经结束了
1、采用Dispacher作为调度,与线程池配合实现了高并发,低阻塞的的运行 2、采用Deque作为集合,按照入队的顺序先进先出 3、最精彩的就是在try/catch/finally中调用finished函数,可以主动控制队列的移动。避免了使用锁而wait/notify操作。
OKHTTP_Dispatcher1.png