前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >okhttp——任务模型

okhttp——任务模型

作者头像
Oceanlong
发布2019-05-05 11:25:58
5460
发布2019-05-05 11:25:58
举报

简介

okhttp是Android中应用最广的http网络请求框架。结构优雅,性能强大。我们通过阅读它,对网络库的架构进行学习。

基本结构

网络请求是一个典型的生产/消费模型。我们的每一个请求都会加入一个缓冲队列,然后在合适的时候进行消费。

我们以异步请求为例:

基本架构

OKHttpClient提供方法生产Call,Dispatcher负责管理、分发和执行AsyncCall

下面,我们来细致地看一下代码。

详细代码

基本调用

还是以异步任务为例,一次典型的okhttp的调用:

代码语言:javascript
复制
String url = "http://wwww.baidu.com";
OkHttpClient okHttpClient = new OkHttpClient();
final Request request = new Request.Builder()
        .url(url)
        .get()//默认就是GET请求,可以不写
        .build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        Log.d(TAG, "onFailure: ");
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        Log.d(TAG, "onResponse: " + response.body().string());
    }
});

我们可以看到,这个过程大致分为两部,生成一个新的call,将call加入队列。

Request

Request是okhttp中的http请求封装类,它管理了http协议中的参数,如Header,RequestBody等

代码语言:javascript
复制
class Request internal constructor(
  internal val url: HttpUrl,
  builder: Builder
) {
  internal val method: String = builder.method
  internal val headers: Headers = builder.headers.build()
  internal val body: RequestBody? = builder.body
  internal val tags: Map<Class<*>, Any> = Util.immutableMap(builder.tags)

  ......

}

OKHttpClient

OKHttpClient是okhttp中总管类,管理了okhttp几乎所有抽象模块和各种配置。

代码语言:javascript
复制
open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  private val dispatcher: Dispatcher = builder.dispatcher
  private val proxy: Proxy? = builder.proxy
  private val protocols: List<Protocol> = builder.protocols
  private val connectionSpecs: List<ConnectionSpec> = builder.connectionSpecs
  private val interceptors: List<Interceptor> =
      Util.immutableList(builder.interceptors)
  private val networkInterceptors: List<Interceptor> =
      Util.immutableList(builder.networkInterceptors)
  private val eventListenerFactory: EventListener.Factory = builder.eventListenerFactory
  private val proxySelector: ProxySelector = builder.proxySelector
  private val cookieJar: CookieJar = builder.cookieJar
  private val cache: Cache? = builder.cache
  private val socketFactory: SocketFactory = builder.socketFactory
  private val sslSocketFactory: SSLSocketFactory?
  private val hostnameVerifier: HostnameVerifier = builder.hostnameVerifier
  private val certificatePinner: CertificatePinner
  private val proxyAuthenticator: Authenticator = builder.proxyAuthenticator
  private val authenticator: Authenticator = builder.authenticator
  private val connectionPool: ConnectionPool = builder.connectionPool
  private val dns: Dns = builder.dns
  private val followSslRedirects: Boolean = builder.followSslRedirects
  private val followRedirects: Boolean = builder.followRedirects
  private val retryOnConnectionFailure: Boolean = builder.retryOnConnectionFailure
  private val callTimeout: Int = builder.callTimeout
  private val connectTimeout: Int = builder.connectTimeout
  private val readTimeout: Int = builder.readTimeout
  private val writeTimeout: Int = builder.writeTimeout
  private val pingInterval: Int = builder.pingInterval
  private val internalCache: InternalCache? = builder.internalCache
  private val certificateChainCleaner: CertificateChainCleaner?
  ......

  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call {
    return RealCall.newRealCall(this, request, false /* for web socket */)
  }

}

RealCall

RealCall是okhttp中任务的封装类

代码语言:javascript
复制
  companion object {
    fun newRealCall(
      client: OkHttpClient,
      originalRequest: Request,
      forWebSocket: Boolean
    ): RealCall {
      // Safely publish the Call instance to the EventListener.
      return RealCall(client, originalRequest, forWebSocket).apply {
        transmitter = Transmitter(client, this)
      }
    }
  }

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers.  */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  /**
   * There is a cycle between the [Call] and [Transmitter] that makes this awkward.
   * This is set after immediately after creating the call instance.
   */
  private lateinit var transmitter: Transmitter

  // Guarded by this.
  var executed: Boolean = false

  @Synchronized override fun isExecuted(): Boolean = executed

  override fun isCanceled(): Boolean = transmitter.isCanceled

  override fun request(): Request = originalRequest

  override fun execute(): Response {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.timeoutEnter()
    transmitter.callStart()
    try {
      client.dispatcher().executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher().finished(this)
    }
  }

  override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.callStart()
    client.dispatcher().enqueue(AsyncCall(responseCallback))
  }

  override fun cancel() {
    transmitter.cancel()
  }
...
}

在异步任务的例子中,我们通常使用enqueue将我们的请求加入请求队列。 client.dispatcher().enqueue(AsyncCall(responseCallback)) 这句话中,我们可以看到,我们新建了AsyncCall加入到OKHttpClient的dispatcher的队列中。

我们先看AsyncCall封装了什么

代码语言:javascript
复制
  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : NamedRunnable("OkHttp %s", redactedUrl()) {
    @Volatile private var callsPerHost = AtomicInteger(0)

    fun callsPerHost(): AtomicInteger = callsPerHost

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

    fun host(): String = originalRequest.url().host()

    fun request(): Request = originalRequest

    fun get(): RealCall = this@RealCall

    /**
     * Attempt to enqueue this async call on `executorService`. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    fun executeOn(executorService: ExecutorService) {
      assert(!Thread.holdsLock(client.dispatcher()))
      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        transmitter.noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher().finished(this) // This call is no longer running!
        }
      }
    }

    override fun execute() {
      var signalledCallback = false
      transmitter.timeoutEnter()
      try {
        val response = getResponseWithInterceptorChain()
        signalledCallback = true
        responseCallback.onResponse(this@RealCall, response)
      } catch (e: IOException) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
        } else {
          responseCallback.onFailure(this@RealCall, e)
        }
      } finally {
        client.dispatcher().finished(this)
      }
    }
  }

我们可以看到,AsyncCall主要提供了executeOnexecute两个方法。

executeOn

executeOnAsyncCall自身的方法。它的主要内容是对异常的处理。如果一切正常,就用传入的ExecutorService执行当前AsyncCall

execute

execute是实现的Runnable的run方法。

代码语言:javascript
复制
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

我们可以看到,这个方法,只是为了给线程起名。 当AsyncCall在线程池中执行时,execute方法就会被调用。execute方法的主要逻辑是: getResponseWithInterceptorChain,它的实现,我们再不展开,属于okhttp网络能力了。

铺垫了上面这么多,我们终于可以看看okhttp的任务队列如何设计的了。

Dispatcher

代码语言:javascript
复制
  private var idleCallback: Runnable? = null

  /** Executes calls. Created lazily.  */
  private var executorService: ExecutorService? = null

  /** Ready async calls in the order they'll be run.  */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet.  */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running synchronous calls. Includes canceled calls that haven't finished yet.  */
  private val runningSyncCalls = ArrayDeque<RealCall>()

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host())
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

这一段主要是AsyncCall的callsPerHost复用逻辑,注释说明比较清晰。如果有相同的AsyncCall存在于runningAsyncCalls中,则会增加callsPerHost。接下来,看一下promoteAndExecute的实现:

代码语言:javascript
复制
  private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))

    val executableCalls = ArrayList<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService())
    }

    return isRunning
  }

promoteAndExecute方法,是将readyAsyncCalls队列中的任务,在最大任务数没有超标的情况下,移入runningAsyncCalls队列中。并对涉及转移的方法,调用executeOn方法。executeOn被调用后,就会执行到,RealCall中AsyncCall中的execute方法了。

小结

以上就是okhttp在请求和任务上的基本结构,还没有涉及到具体的网络请求。核心类是RealCallDispatcher,通过ArrayDequeAsyncCall完成任务的调度。

如有问题,欢迎指正。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.04.18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 基本结构
  • 详细代码
    • 基本调用
      • Request
        • OKHttpClient
          • RealCall
            • executeOn
            • execute
          • Dispatcher
          • 小结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档