fun load() {
//1.创建请求(包含url,method,headers,body)
val request = Request
.Builder()
.url("")
.build()
//2.创建OkHttpClient (包含分发器、拦截器、DNS等)
val okHttpClient = OkHttpClient.Builder().build()
//3.创建Call(用于调用请求)
val newCall = okHttpClient.newCall(request)
//4.通过异步请求数据
newCall.enqueue(object :Callback{
override fun onFailure(call: Call, e: IOException) {}
override fun onResponse(call: Call, response: Response) {}
})
//4.通过同步请求数据
val response = newCall.execute()
}
我们会按照顺序来分析一下请求的流程
前面1,2,3步很多文章已经分析过很多遍了 也比较简单 同学们可以自己看一下 我们就不再赘述 我们直接看第四步进入今天的主要流程
Okhttp请求分为同步方式和异步方式 不过最终都是殊途同归 我们以异步的方式分析一下请求流程
话不多说 先看一眼代码
RealCall.enqueue()->
Dispatcher.enqueue()->
Dispatcher.promoteAndExecute()->
RealCall.executeOn()
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
//检查是否已经开始运行
check(!executed) { "Already Executed" }
executed = true
}
callStart()
//封装AsyncCall对象 并放入队列中等待执行
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//放入等待队列
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//推进执行
promoteAndExecute()
}
//将readyAsyncCalls中合格的请求过渡升级到runningAsyncCalls中
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.//运行最大64
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.每个主机最大5
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
上面的流程主要是将我们的异步任务放入队列中 并且将可以运行的任务开启运行
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//将当前Runnable放到线程池中运行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//终于到这篇文章的重头戏了
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
我们知道OkHttp采用了拦截链模式 看一下何为拦截器模式
借用一下大佬的图(懒得画图了😁)
3631399-0626631d246373a4.png
我们可以看到 请求链会以链的形式调用下去 直到到链尾或者return response
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
//加入我们自己的拦截器
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
//如果不是websocket的话 加入网络拦截器
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
//开始拦截链调用
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// Call the next interceptor in the chain.
//循环拿取下一个拦截器
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
//这里会调用拦截器的方法 接下来我们会分析各个拦截器的作用
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
我们看到拦截器的链路模式 其实就是遍历调用各拦截器 对Request进行作用 直到return response
我们来逐个分析一下各个拦截器的作用 本文只分析处理Request,response的处理会在下文讲解
作用:处理错误 重定向 所以在请求的过程中 没有做太多事情 下文会分析如何处理错误 重定向
这个拦截器在请求的过程中几乎没做什么事情 只做了一件事 就是创建ExchangeFinder
这个对象在后面的ConnectInterceptor用来生成exchange对象
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
check(interceptorScopedExchange == null)
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}
作用:添加必要的请求头信息、gzip处理等
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
//添加contentType
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
//默认keep-Alive
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
//传输流的压缩方式 默认gzip方式
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
//添加cookie
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
//默认UA
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
......handle response
}
okhttp默认的缓存机制会加快响应流程 我们看一下缓存策略 首先 我们要解释几个变量
//缓存策略类
class CacheStrategy{
//如果我们需要请求网络 则networkRequest不为null 否则为null
val networkRequest: Request?
//请求的返回或者请求的响应 如果无法使用缓存(一般是过期或者无缓存 则为null)
val cacheResponse: Response?
}
上面两个变量是缓存策略中比较重要的两个变量 我们会根据这两个变量来选择是否命中缓存
先看一下结论
networkRequest\cacheResponse | cacheResponse is null | cacheResponse is not
null
---|---|---
networkRequest is null | ① 返回HTTP_GATEWAY_TIMEOUT 504错误 | ② 直接使用缓存
networkRequest is not null | ③ 进行网络请求 并且缓存新response | ④ 先请求 根据code(304)
判断是否需要重新request
再看一下intercept()
方法 可以对照上面两个变量的解释和表格来观看
override fun intercept(chain: Interceptor.Chain): Response {
//获取缓存 如果我们配置了缓存 那么会去查找是否存在cache
//这里需要注意的一点是 okhttp默认并不会配置缓存 只是规范了一套缓存策略 我们可以自己通过OkHttpClient.Builder 的 cache 方法设置
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//这里的策略 会自动判断是否使用缓存 是否存在缓存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
//这里就是上面我们解释过的两个变量
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
//LruCache没有hit cache 并且网络缓存不可用
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
//关闭cacheCandidate.body
cacheCandidate.body?.closeQuietly()
}
//按照我们上面对缓存的解释 不允许使用网络请求 并且当前没有缓存 对应表格中①
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
//不允许使用网络 仅直接使用缓存 对应表格②
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
var networkResponse: Response? = null
try {
//使用网络
//对应表格③④
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
......缓存新 response
}
我们看完上面的代码 发现所有的缓存策略都是根据networkRequest
和cacheResponse
两个变量进行控制的
接下来我们看一下缓存策略的生成过程
fun compute(): CacheStrategy {
class Factory(
private val nowMillis: Long,
internal val request: Request,
private val cacheResponse: Response? //这边的cacheResponse和我们上面讲的还不太一样 这边完全是Cache类中缓存的对象 如果我们之前请求过 并且缓存 则不为null
)
//根据request的cache-control 和response 的cache-control判断
val candidate = computeCandidate()
//使用网络请求 但是请求的request的cache-control是onlyIfCached 表示仅使用缓存
//这就是个悖论了 所以直接返回504 对应表格①
// We're forbidden from using the network and the cache is insufficient.
if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
return CacheStrategy(null, null)
}
return candidate
}
private fun computeCandidate(): CacheStrategy {
// No cached response.
//这里的cacheResponse是缓存中命中的 所以如果为null 表示之前没有缓存
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
// Drop the cached response if it's missing a required handshake.
//如果缺少tls握手 直接请求网络
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
// If this response shouldn't have been stored, it should never be used as a response source.
// This check should be redundant as long as the persistence store is well-behaved and the
// rules are constant.
//根据cacheResponse的code 判断是否允许cache
//判断expires是否过期 并且request和reponse的cache-control都是noStore
//这里就不往下追踪了 感兴趣的同学可以自己阅读一下
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
//如果是nocache或者根据If-Modified-Since来判断
//If-Modified-Since会重新请求服务器 然后获取last-modified-since 判断是否修改过
val requestCaching = request.cacheControl
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
val responseCaching = cacheResponse.cacheControl
val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
var minFreshMillis: Long = 0
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}
var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
// Find a condition to add to the request. If the condition is satisfied, the response body
// will not be transmitted.
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
//这里会重新request 然后判断modified-time
else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
//直接使用缓存
return CacheStrategy(conditionalRequest, cacheResponse)
}
我们可以根据上面的判断,确定缓存策略
大致是根据cache-control或者handshake是否过期来判断是否需要重新request
例如:
noStore,noCache,If-Modified-Since等等 所有的 cache-
control可以参考这篇
如果在CacheInterceptor中hit cache的话 就不会再往下面的拦截器传递 而是直接原路返回 return response
作用:负责与服务器连接 这个拦截器的过程分析其实相当复杂undefined 简单来说流程是从连接池中查找连接 如果不存在 就创建连接 并完成TCP,TLS握手undefined 然后等待下一个CallServerInterceptor进行数据的交互
我们分析一下源码 拦截器里的代码真的很少 不过不要被表象欺骗了😌 我第一次看OkHttp源码时 看到这里直接就跳过了
然后分析了CallServerInterceptor源码之后 发现没有获取连接过程
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
//获取exchange对象 exchange是我们用来和服务端交互的对象封装 看一下initExchange方法
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
initExchange()
中主要会调用ExchangeFinder#find()
然后根据下面的调用链
ExchangeFinder#find()
->
ExchangeFinder#findHealthyConnection
->
ExchangeFinder#findConnection
然后我们看一下findConnection()
这个方法内部就实现了connection的查找或创建
前方高能 下面代码会又臭又长😉
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
val callConnection = call.connection // changes within this overall method
releasedConnection = callConnection
//如果url不一致或者callConnection为null 就断开链接
toClose = if (callConnection != null && (callConnection.noNewExchanges ||
!sameHostAndPort(callConnection.route().address.url))) {
call.releaseConnectionNoEvents()
} else {
null
}
if (call.connection != null) {
// We had an already-allocated connection and it's good.
//经过判断上面验证 如果不为null 发现当前connection可用 那么就会直接复用connection
result = call.connection
releasedConnection = null
}
if (result == null) {
// The connection hasn't had any problems for this call.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
//第一次试从连接池获取
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
foundPooledConnection = true
result = call.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
}
}
}
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result!!
}
// If we need a route selection, make one. This is a blocking operation.
// 查看是否有新的路由信息
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
newRouteSelection = true
routeSelection = localRouteSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection!!.routes
// 如果有新的路由 继续从连接池中查找试试
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
foundPooledConnection = true
result = call.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// Do TCP + TLS handshakes. This is a blocking operation.
//现在发现可能连接池中没有我们要的connection
//进行TCP和TLS连接
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
call.client.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
//最后一次尝试从连接池中获取 如果能获取到 就使用连接池中 否则使用连接的connection 并且放入连接池中
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = call.connection
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute
} else {
//放入连接池中
connectionPool.put(result!!)
call.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
上面的注释写的也比较多 流程其实也比较清晰了 我们接下来分析一下 如何从连接池中查找以及如何建立连接
//从连接池中获取
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
this.assertThreadHoldsLock()
for (connection in connections) {
//判断connection是否支持多路复用
if (requireMultiplexed && !connection.isMultiplexed) continue
//判断connection的host是否匹配
if (!connection.isEligible(address, routes)) continue
call.acquireConnectionNoEvents(connection)
return true
}
return false
}
还有一个TCP和TLS握手流程
fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
check(protocol == null) { "already connected" }
var routeException: RouteException? = null
val connectionSpecs = route.address.connectionSpecs
val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
......
while (true) {
try {
//注释的意思是 如果是通过HTTP代理HTTPS 那么需要连接Tunnel
//这里我是在是没看懂什么意思 告辞😞 有知道的大佬可以留言告诉我一下
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break
}
} else {
//先建立socket连接 包括代理的配置
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
//如果是Http2协议还会创建Http2连接 或者 TLS握手
//TLS流程大家可以参考一下我之前写的一篇文章
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
socket?.closeQuietly()
rawSocket?.closeQuietly()
socket = null
rawSocket = null
source = null
sink = null
handshake = null
protocol = null
http2Connection = null
allocationLimit = 1
eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)
if (routeException == null) {
routeException = RouteException(e)
} else {
routeException.addConnectException(e)
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
throw RouteException(ProtocolException(
"Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
}
idleAtNs = System.nanoTime()
}
TLS握手流程有疑问的同学 可以看一下我之前写的一篇文章呐
现在connection就获取完成了 接下来就是与服务器的交互啦
还有创建了新connection
之后会put
到ConnectionPool
中 其中还有一个clean操作 同学们可以自己看一下代码呐
最后还有一个小点需要说明一下 因为关系到我们下一个拦截器的阅读
在我们最上面的说到 我们会通过ExchangeFinder#find
来生成ExchangeCodec
对象
首先我们解释一下ExchangeCodec
作用okhttp
会使用ExchangeCodec
封装了与服务器的IO操作
ExchangeCodec
的实现类分别对应协议是Http1ExchangeCodec
和Http2ExchangeCodec
看一下find
方法实现
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
//newCodec方法会对应不同的HTTP协议生成ExchangeCodeC对象
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
在阅读CallServerInterceptor之前 我们有必要看一下Http2.0相关知识 因为在CallServerInterceptor中会根据不同的Http协议 使用不同的传输方式 我们看一下Http2.0发展的几个阶段
作用:负责与服务器进行数据交互
在了解和服务器交互的流程之前 我想先介绍一下okio
这是Square公司开发的一款对java输入输出流的封装框架
JAVA输入输出流真的是非常的复杂 子类繁多 而okio主要分为两个接口Sink
和Source
分别对应输出和输入相关
接下来我们看一下实现代码 流程也比较简单 就是发送请求+获取响应
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
//将header写入socket
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
//如果支持复用 传输request body
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
//request完成
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
//获取reponse header
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
//100表示继续获取
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
//获取response body
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
上面的代码 流程也比较简单 就是request+response
我们分析一下分析一下写入Request
流程
传输头部在HTTP1.x和HTTP2.0有点区别,HTTP1.x就直接将header通过写入Sink
Buffer,而HTTP2.0会先创建http2Connection.newStream()
对象
@Synchronized @Throws(IOException::class)
fun headers(
outFinished: Boolean,
streamId: Int,
headerBlock: List<Header>
) {
if (closed) throw IOException("closed")
//Hpack压缩算法 将压缩后数据存入hpackBuffer
hpackWriter.writeHeaders(headerBlock)
val byteCount = hpackBuffer.size
val length = minOf(maxFrameSize.toLong(), byteCount)
var flags = if (byteCount == length) FLAG_END_HEADERS else 0
if (outFinished) flags = flags or FLAG_END_STREAM
//HTTP2.0特性 帧传输
frameHeader(
streamId = streamId,
length = length.toInt(),
type = TYPE_HEADERS,
flags = flags
)
sink.write(hpackBuffer, length)
if (byteCount > length) writeContinuationFrames(streamId, byteCount - length)
}
这里有两个可以HTTP2.0特性可以关注一下
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。