OkHttp – ConnectInterceptor源码简析

Github: okhttp
分析版本: 930d4d0

Opens a connection to the target server and proceeds to the next interceptor

intercept(chain: Interceptor.Chain)

class ConnectInterceptor(val client: OkHttpClient) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val request = realChain.request()
    val transmitter = realChain.transmitter()

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    val doExtensiveHealthChecks = request.method != "GET"
    val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)

    return realChain.proceed(request, transmitter, exchange)
  }
}

  • doExtensiveHealthChecks
    为非 GET 请求
  • 通过 transmitter.newExchange()
    来创建 Exchange
  • realChain.proceed()
    告知下一个拦截器开始去执行

Transmitter#newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean)

class Transmitter(
  private val client: OkHttpClient,
  private val call: Call
) {
  /** Returns a new exchange to carry a new request and response. */
  internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
    synchronized(connectionPool) {
      check(!noMoreExchanges) { "released" }
      check(exchange == null) {
        "cannot make a new request because the previous response is still open: " +
            "please call response.close()"
      }
    }

    val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
    val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)

    synchronized(connectionPool) {
      this.exchange = result
      this.exchangeRequestDone = false
      this.exchangeResponseDone = false
      return result
    }
  }
}

通过 exchangeFinder!!.find()
来创建 ExchangeCodec

ExchangeFinder#find(client: OkHttpClient, chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean)

class ExchangeFinder(
  private val transmitter: Transmitter,
  private val connectionPool: RealConnectionPool,
  private val address: Address,
  private val call: Call,
  private val eventListener: EventListener
) {
  fun find(
    client: OkHttpClient,
    chain: Interceptor.Chain,
    doExtensiveHealthChecks: Boolean
  ): ExchangeCodec {
    val connectTimeout = chain.connectTimeoutMillis()
    val readTimeout = chain.readTimeoutMillis()
    val writeTimeout = chain.writeTimeoutMillis()
    val pingIntervalMillis = client.pingIntervalMillis()
    val connectionRetryEnabled = client.retryOnConnectionFailure()

    try {
      // 寻找一个链接(在链接池中寻找或者在新创建一个连接)
      val resultConnection = findHealthyConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled,
          doExtensiveHealthChecks = doExtensiveHealthChecks
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure()
      throw e
    } catch (e: IOException) {
      trackFailure()
      throw RouteException(e)
    }
  }
}

通过 findHealthyConnection()
找到一条『健康』的链接,然后通过 RealConnection#newCodec()
来创建 ExchangeCodec

ExchangeFinder#findHealthyConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean)

class ExchangeFinder(
  private val transmitter: Transmitter,
  private val connectionPool: RealConnectionPool,
  private val address: Address,
  private val call: Call,
  private val eventListener: EventListener
) {
  
  /**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  @Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) { // 循环查找一个链接
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // 如果是新链接,跳过 healthy 判断直接返回
      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized(connectionPool) {
        if (candidate.successCount == 0) {
          return candidate
        }
      }

      // 这条链接是否可用
      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges() // 禁止这条链接,将 noNewExchanges 置为 true
        continue
      }

      return candidate
    }
  }
}

findHealthyConnection()
是负责死循环去检测获取到的 RealConnection
是否可用,如果是新创建的则跳过检测,当 RealConnection
不可用的话就继续去调用 findConnection 去重新获取一个连接

ExchangeFinder#findConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean)

class ExchangeFinder(
  private val transmitter: Transmitter,
  private val connectionPool: RealConnectionPool,
  private val address: Address,
  private val call: Call,
  private val eventListener: EventListener
) {
  
  /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null
    var selectedRoute: Route? = null
    var releasedConnection: RealConnection?
    val toClose: Socket?
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled") // 取消了
      hasStreamFailure = false // This is a fresh attempt.

      releasedConnection = transmitter.connection
      // 若不可用了,则关闭
      toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
        transmitter.releaseConnectionNoEvents()
      } else {
        null
      }

      // 从 transmitter 获取
      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        result = transmitter.connection
        releasedConnection = null
      }

      if (result == null) {
        // 从链接池中取,取到赋值给 transmitter
        // Attempt to get a connection from the pool.
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        } else if (nextRouteToTry != null) {
          // 路由
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection!!.route()
        }
      }
    }
    // 释放没用的 connection
    toClose?.closeQuietly()

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection!!)
    }
    // 如果找到复用的,则使用这条链接,回调
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
    }
    if (result != null) {
      // 找到一条可复用的链接
      // If we found an already-allocated or pooled connection, we're done.
      return result!!
    }

    // 切换路由再在链接池里面找,如果有则返回
    // If we need a route selection, make one. This is a blocking operation.
    var newRouteSelection = false
    if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
      newRouteSelection = true
      routeSelection = routeSelector.next()
    }

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection!!.routes
        if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        }
      }

      if (!foundPooledConnection) {
        // 没找到则创建一条
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!
    }

    // 建立链接
    // Do TCP + TLS handshakes. This is a blocking operation.
    result!!.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
    // 将这条路由从错误缓存中清除
    connectionPool.routeDatabase.connected(result!!.route())

    var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
      // 检测一下,若多并发情况下同 address 下导致创建多个,则将当前这个释放掉
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result!!.noNewExchanges = true
        socket = result!!.socket()
        result = transmitter.connection
      } else {
        // 将这个请求加入链接池
        connectionPool.put(result!!)
        transmitter.acquireConnectionNoEvents(result!!)
      }
    }
    // 释放掉 socket
    socket?.closeQuietly()

    eventListener.connectionAcquired(call, result!!)
    return result!!
  }
}

RealConnectionPool#transmitterAcquirePooledConnection(address: Address, transmitter: Transmitter, routes: List ?, requireMultiplexed: Boolean)

class RealConnectionPool(
  /** The maximum number of idle connections for each address. */
  private val maxIdleConnections: Int,
  keepAliveDuration: Long,
  timeUnit: TimeUnit
) {
  /**
   * Attempts to acquire a recycled connection to `address` for `transmitter`. Returns true if a
   * connection was acquired.
   *
   * If `routes` is non-null these are the resolved routes (ie. IP addresses) for the connection.
   * This is used to coalesce related domains to the same HTTP/2 connection, such as `square.com`
   * and `square.ca`.
   */
  fun transmitterAcquirePooledConnection(
    address: Address,
    transmitter: Transmitter,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
    assert(Thread.holdsLock(this))
    for (connection in connections) {
      if (requireMultiplexed && !connection.isMultiplexed) continue
      if (!connection.isEligible(address, routes)) continue
      transmitter.acquireConnectionNoEvents(connection)
      return true
    }
    return false
  }
}

遍历 pool 中的 connections(ArrayQueue),如果链接是可以复用的则将这个连接返回

RealConnection#isEligible(address: Address, routes: List ?)

class RealConnection(
  val connectionPool: RealConnectionPool,
  private val route: Route
) : Http2Connection.Listener(), Connection {
  /**
   * Returns true if this connection can carry a stream allocation to `address`. If non-null
   * `route` is the resolved route for a connection.
   */
  internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
    // 如果当前这次连接的最大并发数达到上限,返回 false
    // If this connection is not accepting new exchanges, we're done.
    if (transmitters.size >= allocationLimit || noNewExchanges) return false

    // 如果两个 address 的其他参数不相同,返回 false
    // If the non-host fields of the address don't overlap, we're done.
    if (!this.route.address().equalsNonHost(address)) return false

    // 如果两个 address 的 url 的 host 相同,返回 true,
    // If the host exactly matches, we're done: this connection can carry the address.
    if (address.url.host == this.route().address().url.host) {
      return true // This connection is a perfect match.
    }

    // 如果上面的不符合,在下面的情况下可以合并链接
    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/

    // 首先这个链接需要时 HTTP/2
    // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false

    // 同一 IP
    // 2. The routes must share an IP address.
    if (routes == null || !routeMatchesAny(routes)) return false

    // 这个连接的服务器证书必须覆盖新的主机
    // 3. This connection's server certificate's must cover the new host.
    if (address.hostnameVerifier !== OkHostnameVerifier) return false
    if (!supportsUrl(address.url)) return false

    // 证书将必须匹配主机
    // 4. Certificate pinning must match the host.
    try {
      address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
    } catch (_: SSLPeerUnverifiedException) {
      return false
    }

    return true // The caller's address can be carried by this connection.
  }
}

  • 当前的链接的最大并发数不能达到上限,否则不能复用
  • 两个链接的 address 的参数不相同,不能复用
  • 两个链接的 url 的 host 相同则可以复用
  • 合并
    • 这个链接需要是 HTTP/2
    • IP 的 address 要相同
    • 这个链接的服务器证书必须覆盖新的主机
    • 证书将必须匹配主机

RealConnection#newCodec(client: OkHttpClient, chain: Interceptor.Chain)

class RealConnection(
  val connectionPool: RealConnectionPool,
  private val route: Route
) : Http2Connection.Listener(), Connection {
  @Throws(SocketException::class)
  internal fun newCodec(client: OkHttpClient, chain: Interceptor.Chain): ExchangeCodec {
    val socket = this.socket!!
    val source = this.source!!
    val sink = this.sink!!
    val http2Connection = this.http2Connection

    return if (http2Connection != null) {
      Http2ExchangeCodec(client, this, chain, http2Connection)
    } else {
      socket.soTimeout = chain.readTimeoutMillis()
      source.timeout().timeout(chain.readTimeoutMillis().toLong(), MILLISECONDS)
      sink.timeout().timeout(chain.writeTimeoutMillis().toLong(), MILLISECONDS)
      Http1ExchangeCodec(client, this, source, sink)
    }
  }
}

判断是 Http 还是 Http2,然后根据策略模式返回

原文 

http://yydcdut.com/2019/07/11/okhttp-connect-interceptor-analyse/

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » OkHttp – ConnectInterceptor源码简析

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址