转载

再学Android:OkHttp源码探究(六)ConnectInterceptor

不知不觉已经到了要分析ok内置拦截器第四个的文章了,本篇的主角就是ConnectInterceptor,看名字也就知道它是负责与服务器建立连接的拦截器也。

源码分析

话不多说,上代码:

@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val request = realChain.request()
    val transmitter = realChain.transmitter()

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    val doExtensiveHealthChecks = request.method != "GET"
    val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)

    return realChain.proceed(request, transmitter, exchange)
  }
复制代码

可以看到这个拦截器里面的代码是异常的精简,纵观全部发现关键步骤是在于调用transmitter.newExchange()方法。前面的文章中简单分析过transmitter类还有印象吗?这里再简单回顾下

Transmitter

Transmitter是应用层和网络层之间的桥梁,对外暴露了网络连接过程中的connection、request、response、streams。

我们接着看newExchange方法做了什么事情

/** Returns a new exchange to carry a new request and response. */
  internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
    synchronized(connectionPool) {
      check(!noMoreExchanges) { "released" }
      check(exchange == null) {
        "cannot make a new request because the previous response is still open: " +
            "please call response.close()"
      }
    }

    val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
    val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)

    synchronized(connectionPool) {
      this.exchange = result
      this.exchangeRequestDone = false
      this.exchangeResponseDone = false
      return result
    }
  }
复制代码

首先是通过exchangeFinder调动find方法拿到codec,那这几个又是什么东西呢?

ExchangeFinder

不知大家对介绍重试拦截器的过程是否还有印象?在重试拦截器中有通过transmitter调用prepareToConnect()。exchangeFinder对象同时也是那个时候初始化的,当时的介绍是为了后续的网络连接做准备。我们今天就来看看其在ConnectionInterceptor过程中是怎么做的网络连接。

还是简单翻译一下ExchangeFinder的类注释:

尝试找到请求队列中的已经存在的连接,使用下列策略:

1.如果当前请求已经连接上,那么就直接使用。 2.如果连接池中有可以服用的连接,那么根据RealConnection.isEligible来判断是否使用。 3.如果当前没有存在的连接,那么就尝试从新建立一个新的连接。

可以看到ok在建立连接的时候是遵循尽量服用连接池的做法,因为这样可以大大降低在网络连接过程解析DNS耗时、以及握手耗时。

那么find()方法是做了什么事情呢?还是先看一下代码:

fun find(client: OkHttpClient,chain: Interceptor.Chain,doExtensiveHealthChecks: Boolean
  ): ExchangeCodec {
    //参数设置代码省略。。。。。。。
    try {
    //找到目前可用的连接
      val resultConnection = findHealthyConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled,
          doExtensiveHealthChecks = doExtensiveHealthChecks
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure()
      throw e
    } catch (e: IOException) {
      trackFailure()
      throw RouteException(e)
    }
  }
复制代码

抛却一大推设置参数的代码可以发现主要是findHealthyConnection()来找到一个可用的connection。那它究竟是怎么找的呢?(为了分析请求流程这里可能会有大量代码)。进入到findHealthyConnection()方法中我们发现在while循环中,主要是通过findConnection()来寻找可用connection。并且方法注释如下:

/**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
复制代码

findConnection()

/**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
复制代码

这是每次请求过程中真正找到connection的方法,至于是使用已经存在的还是新建立的我们通过代码来分析:(方法代码有点长,为了流程连贯,将在代码中使用注释分析)

@Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null
    var selectedRoute: Route? = null
    var releasedConnection: RealConnection?
    val toClose: Socket?
    //使用代码块锁 保证线程同步 
    synchronized(connectionPool) {
    //前文分析过这里只会是用户主动取消,如果取消抛出异常
      if (transmitter.isCanceled) throw IOException("Canceled")
      hasStreamFailure = false // This is a fresh attempt.

      releasedConnection = transmitter.connection
      //这里判断如果数据传输已经完成就返回需要关闭的sokcet,反之为null
      toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
        transmitter.releaseConnectionNoEvents()
      } else {
        null
      }
        //已经有一个分配好的连接 并且处于可用状态 
      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        result = transmitter.connection
        releasedConnection = null
      }
    //按照之前的策略,如果当前没有已经连接好的连接,会尝试从连接池中找到一个可用的。
      if (result == null) {
        // Attempt to get a connection from the pool.
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection!!.route()
        }
      }
    }
    //关闭之前socket 
    toClose?.closeQuietly()

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection!!)
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result!!
    }
    
    // If we need a route selection, make one. This is a blocking operation.
    var newRouteSelection = false
    if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
      newRouteSelection = true
      routeSelection = routeSelector.next()
    }

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection!!.routes
        //再次尝试从连接池中找到可用连接
        if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        }
      } 
        
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()
        }
        //再次尝试从连接池中找到可用连接失败,创建一个connection 
        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!
    }
    //刚刚创建出来的connection 调用connect方法进行网络连接 
    // Do TCP + TLS handshakes. This is a blocking operation.
    result!!.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
    //将当前域名从失败的黑名单中移除掉
    connectionPool.routeDatabase.connected(result!!.route())

    var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
      //在连接过程中最后一次尝试从连接池中找到已经有的连接,防止同时有两个相对host的请求发出,这样就能复用已有的连接
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result!!.noNewExchanges = true
        socket = result!!.socket()
        result = transmitter.connection

        // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
        // that case we will retry the route we just successfully connected with.
        nextRouteToTry = selectedRoute
      } else {
      //将这次连接放入连接池中,为了后续可能直接复用
        connectionPool.put(result!!)
        transmitter.acquireConnectionNoEvents(result!!)
      }
    }
    socket?.closeQuietly()

    eventListener.connectionAcquired(call, result!!)
    return result!!
  }

复制代码

经过分析代码可以发现,ok在进行连接建立的时候不是直接去建立的connection,而是尽可能的寻找已经存在的连接去进行复用。如果实在没有可利用的再去创建一条新的连接并且将其放入连接池给后续请求去复用。

经过上面苦苦一番寻找,我们终于拿到了一条建立好的可使用的连接RealConnection。然后直接调用newCodec()方法返回一个ExchangeCodec对象。我们再来看一下是干嘛的:

@Throws(SocketException::class)
  internal fun newCodec(client: OkHttpClient, chain: Interceptor.Chain): ExchangeCodec {
    val socket = this.socket!!
    val source = this.source!!
    val sink = this.sink!!
    val http2Connection = this.http2Connection

    return if (http2Connection != null) {
      Http2ExchangeCodec(client, this, chain, http2Connection)
    } else {
      socket.soTimeout = chain.readTimeoutMillis()
      source.timeout().timeout(chain.readTimeoutMillis().toLong(), MILLISECONDS)
      sink.timeout().timeout(chain.writeTimeoutMillis().toLong(), MILLISECONDS)
      Http1ExchangeCodec(client, this, source, sink)
    }
  }
复制代码

可以看到关键方法就是根据当前请求协议是HTTP1还是HTTP2来返回一个codec,我们跟进到codec类去看一下注释,大概翻译如下:

一个用来发送http/1.1消息的socket连接,并且严格遵守下列的生命周期:

- 发送请求的headers[writeRequest]

- 打开一个水槽(直译)去写入请求体

- 开始写入请求体然后关闭这个水槽

- 读取响应头 - 申请一部分资源开始读取响应体 - 读取完成响应体后关闭资源

经过上面寻找可用连接的操作之后基本上到了newExchange的最后一步去构建出来一个Exchange并且返回。

Exchange

类注释很简单:传输一个http的请求和响应,是真正上处理IO操作的。事件的管理是被ExchangeCodec管控。

总结

到这里,关于ConnectInterceptor的源码探究基本上就告一段落了。我们大概来总结下:

  • 从chain对象中获取到Transmitter对象
  • 调用transmitter的newExchange()方法
  • 根据调度策略找到可用的连接并返回
  • 调用proceed调用下一个拦截器

上面基本上就是ConnectionInterceptor的调用流程,接下来我们会继续分析ok内置的最后一个拦截器。

原文  https://juejin.im/post/5dd9f81f518825733372d99a
正文到此结束
Loading...