Github: okhttp 分析版本: 930d4d0
Opens a connection to the target server and proceeds to the next interceptor
class ConnectInterceptor(val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()
// We need the network to satisfy this request. Possibly for validating a conditional GET.
val doExtensiveHealthChecks = request.method != "GET"
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
}
}
doExtensiveHealthChecks
为非 GET 请求 transmitter.newExchange()
来创建 Exchange
realChain.proceed()
告知下一个拦截器开始去执行 class Transmitter(
private val client: OkHttpClient,
private val call: Call
) {
/** Returns a new exchange to carry a new request and response. */
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
synchronized(connectionPool) {
check(!noMoreExchanges) { "released" }
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
}
val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
}
通过 exchangeFinder!!.find()
来创建 ExchangeCodec
ExchangeFinder#find(client: OkHttpClient, chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean)
class ExchangeFinder(
private val transmitter: Transmitter,
private val connectionPool: RealConnectionPool,
private val address: Address,
private val call: Call,
private val eventListener: EventListener
) {
fun find(
client: OkHttpClient,
chain: Interceptor.Chain,
doExtensiveHealthChecks: Boolean
): ExchangeCodec {
val connectTimeout = chain.connectTimeoutMillis()
val readTimeout = chain.readTimeoutMillis()
val writeTimeout = chain.writeTimeoutMillis()
val pingIntervalMillis = client.pingIntervalMillis()
val connectionRetryEnabled = client.retryOnConnectionFailure()
try {
// 寻找一个链接(在链接池中寻找或者在新创建一个连接)
val resultConnection = findHealthyConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled,
doExtensiveHealthChecks = doExtensiveHealthChecks
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure()
throw e
} catch (e: IOException) {
trackFailure()
throw RouteException(e)
}
}
}
通过 findHealthyConnection()
找到一条『健康』的链接,然后通过 RealConnection#newCodec()
来创建 ExchangeCodec
ExchangeFinder#findHealthyConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean)
class ExchangeFinder(
private val transmitter: Transmitter,
private val connectionPool: RealConnectionPool,
private val address: Address,
private val call: Call,
private val eventListener: EventListener
) {
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) { // 循环查找一个链接
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// 如果是新链接,跳过 healthy 判断直接返回
// If this is a brand new connection, we can skip the extensive health checks.
synchronized(connectionPool) {
if (candidate.successCount == 0) {
return candidate
}
}
// 这条链接是否可用
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges() // 禁止这条链接,将 noNewExchanges 置为 true
continue
}
return candidate
}
}
}
findHealthyConnection()
是负责死循环去检测获取到的 RealConnection
是否可用,如果是新创建的则跳过检测,当 RealConnection
不可用的话就继续去调用 findConnection 去重新获取一个连接
ExchangeFinder#findConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean)
class ExchangeFinder(
private val transmitter: Transmitter,
private val connectionPool: RealConnectionPool,
private val address: Address,
private val call: Call,
private val eventListener: EventListener
) {
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled") // 取消了
hasStreamFailure = false // This is a fresh attempt.
releasedConnection = transmitter.connection
// 若不可用了,则关闭
toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
transmitter.releaseConnectionNoEvents()
} else {
null
}
// 从 transmitter 获取
if (transmitter.connection != null) {
// We had an already-allocated connection and it's good.
result = transmitter.connection
releasedConnection = null
}
if (result == null) {
// 从链接池中取,取到赋值给 transmitter
// Attempt to get a connection from the pool.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true
result = transmitter.connection
} else if (nextRouteToTry != null) {
// 路由
selectedRoute = nextRouteToTry
nextRouteToTry = null
} else if (retryCurrentRoute()) {
selectedRoute = transmitter.connection!!.route()
}
}
}
// 释放没用的 connection
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
// 如果找到复用的,则使用这条链接,回调
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// 找到一条可复用的链接
// If we found an already-allocated or pooled connection, we're done.
return result!!
}
// 切换路由再在链接池里面找,如果有则返回
// If we need a route selection, make one. This is a blocking operation.
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
newRouteSelection = true
routeSelection = routeSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection!!.routes
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true
result = transmitter.connection
}
}
if (!foundPooledConnection) {
// 没找到则创建一条
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// 建立链接
// Do TCP + TLS handshakes. This is a blocking operation.
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
// 将这条路由从错误缓存中清除
connectionPool.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// 检测一下,若多并发情况下同 address 下导致创建多个,则将当前这个释放掉
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = transmitter.connection
} else {
// 将这个请求加入链接池
connectionPool.put(result!!)
transmitter.acquireConnectionNoEvents(result!!)
}
}
// 释放掉 socket
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
}
RealConnectionPool#transmitterAcquirePooledConnection(address: Address, transmitter: Transmitter, routes: List
class RealConnectionPool(
/** The maximum number of idle connections for each address. */
private val maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) {
/**
* Attempts to acquire a recycled connection to `address` for `transmitter`. Returns true if a
* connection was acquired.
*
* If `routes` is non-null these are the resolved routes (ie. IP addresses) for the connection.
* This is used to coalesce related domains to the same HTTP/2 connection, such as `square.com`
* and `square.ca`.
*/
fun transmitterAcquirePooledConnection(
address: Address,
transmitter: Transmitter,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
assert(Thread.holdsLock(this))
for (connection in connections) {
if (requireMultiplexed && !connection.isMultiplexed) continue
if (!connection.isEligible(address, routes)) continue
transmitter.acquireConnectionNoEvents(connection)
return true
}
return false
}
}
遍历 pool 中的 connections(ArrayQueue),如果链接是可以复用的则将这个连接返回
class RealConnection(
val connectionPool: RealConnectionPool,
private val route: Route
) : Http2Connection.Listener(), Connection {
/**
* Returns true if this connection can carry a stream allocation to `address`. If non-null
* `route` is the resolved route for a connection.
*/
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
// 如果当前这次连接的最大并发数达到上限,返回 false
// If this connection is not accepting new exchanges, we're done.
if (transmitters.size >= allocationLimit || noNewExchanges) return false
// 如果两个 address 的其他参数不相同,返回 false
// If the non-host fields of the address don't overlap, we're done.
if (!this.route.address().equalsNonHost(address)) return false
// 如果两个 address 的 url 的 host 相同,返回 true,
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url.host == this.route().address().url.host) {
return true // This connection is a perfect match.
}
// 如果上面的不符合,在下面的情况下可以合并链接
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 首先这个链接需要时 HTTP/2
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false
// 同一 IP
// 2. The routes must share an IP address.
if (routes == null || !routeMatchesAny(routes)) return false
// 这个连接的服务器证书必须覆盖新的主机
// 3. This connection's server certificate's must cover the new host.
if (address.hostnameVerifier !== OkHostnameVerifier) return false
if (!supportsUrl(address.url)) return false
// 证书将必须匹配主机
// 4. Certificate pinning must match the host.
try {
address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
} catch (_: SSLPeerUnverifiedException) {
return false
}
return true // The caller's address can be carried by this connection.
}
}
class RealConnection(
val connectionPool: RealConnectionPool,
private val route: Route
) : Http2Connection.Listener(), Connection {
@Throws(SocketException::class)
internal fun newCodec(client: OkHttpClient, chain: Interceptor.Chain): ExchangeCodec {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
val http2Connection = this.http2Connection
return if (http2Connection != null) {
Http2ExchangeCodec(client, this, chain, http2Connection)
} else {
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis().toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis().toLong(), MILLISECONDS)
Http1ExchangeCodec(client, this, source, sink)
}
}
}
判断是 Http 还是 Http2,然后根据策略模式返回