转载

OkHttp源码解析

Github: okhttp 分析版本: 930d4d0

An HTTP client for Android, Kotlin, and Java.

OkHttp is an HTTP client that’s efficient by default:

  • HTTP/2 support allows all requests to the same host to share a socket.
  • Connection pooling reduces request latency (if HTTP/2 isn’t available).
  • Transparent GZIP shrinks download sizes.
  • Response caching avoids the network completely for repeat requests.

使用

Get a URL

public class GetExample {
  OkHttpClient client = new OkHttpClient();

  String run(String url) throws IOException {
    Request request = new Request.Builder()
        .url(url)
        .build();

    try (Response response = client.newCall(request).execute()) {
      return response.body().string();
    }
  }

  public static void main(String[] args) throws IOException {
    GetExample example = new GetExample();
    String response = example.run("https://raw.github.com/square/okhttp/master/README.md");
    System.out.println(response);
  }
}

Post to a Server

public class PostExample {
  public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");

  OkHttpClient client = new OkHttpClient();

  String post(String url, String json) throws IOException {
    RequestBody body = RequestBody.create(json, JSON);
    Request request = new Request.Builder()
        .url(url)
        .post(body)
        .build();
    try (Response response = client.newCall(request).execute()) {
      return response.body().string();
    }
  }

  String bowlingJson(String player1, String player2) {
    return "{'winCondition':'HIGH_SCORE',"
        + "'name':'Bowling',"
        + "'round':4,"
        + "'lastSaved':1367702411696,"
        + "'dateStarted':1367702378785,"
        + "'players':["
        + "{'name':'" + player1 + "','history':[10,8,6,7,8],'color':-13388315,'total':39},"
        + "{'name':'" + player2 + "','history':[6,10,5,10,10],'color':-48060,'total':41}"
        + "]}";
  }

  public static void main(String[] args) throws IOException {
    PostExample example = new PostExample();
    String json = example.bowlingJson("Jesse", "Jake");
    String response = example.post("http://www.roundsapp.com/post", json);
    System.out.println(response);
  }
}

更多

okhttp

源码

OkHttpClient

OkHttpClient client = new OkHttpClient();

根据名字我们就能看出,OkHttpClient 为 OkHttp 的客户端,在使用的时候首先要做的就是要创建这样一个客户端

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  constructor() : this(Builder())
}

默认构造方法使用的是默认配置的 Builder:

class Builder constructor() {
  internal var dispatcher: Dispatcher = Dispatcher() // 调度器
  internal var proxy: Proxy? = null // 代理
  internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS // 协议
  internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS // 传输层版本和连接协议
  internal val interceptors: MutableList<Interceptor> = mutableListOf() // 拦截器
  internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() // 网络拦截器
  internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
  internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector() // 代理选择器
  internal var cookieJar: CookieJar = CookieJar.NO_COOKIES // cookie
  internal var cache: Cache? = null // cache 缓存
  internal var internalCache: InternalCache? = null // 内部缓存
  internal var socketFactory: SocketFactory = SocketFactory.getDefault() // socket 工厂
  internal var sslSocketFactory: SSLSocketFactory? = null // socket工厂 用于https
  internal var certificateChainCleaner: CertificateChainCleaner? = null // 验证确认响应书,适用HTTPS 请求连接的主机名
  internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier // 主机名字确认
  internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT // 证书链
  internal var proxyAuthenticator: Authenticator = Authenticator.NONE // 代理身份验证
  internal var authenticator: Authenticator = Authenticator.NONE // 身份验证
  internal var connectionPool: ConnectionPool = ConnectionPool() //链接复用池
  internal var dns: Dns = Dns.SYSTEM // DNS
  internal var followSslRedirects: Boolean = true // 重定向
  internal var followRedirects: Boolean = true // 本地重定向
  internal var retryOnConnectionFailure: Boolean = true // 重试连接失败
  internal var callTimeout: Int = 0 // 请求超时
  internal var connectTimeout: Int = 10000 // 连接超时
  internal var readTimeout: Int = 10000 // 读取超时
  internal var writeTimeout: Int = 10000 // 写入超时
  internal var pingInterval: Int = 0 // Web socket and HTTP/2 ping interval
// ...
  fun build(): OkHttpClient = OkHttpClient(this)
}

okhttp 的最佳表现就是创建一个 OkHttpClient 实例,并将其重用到所有的 http 请求调用上之所以所有请求公用一个 OkHttpClient,因为每个 OkHttpClient 都有自己的的连接池和线程池,这样的话可以重用连接和线程可减少延迟并节省内存

Request

Request request = new Request.Builder()
    .url(url)
    .build();

发送一个 HTTP 请求类要构建一个 Request 对象

class Request internal constructor(
  @get:JvmName("url") val url: HttpUrl, // 请求地址
  @get:JvmName("method") val method: String, // 请求方法[GET/POST/PUT/PATCH/...]
  @get:JvmName("headers") val headers: Headers, // 请求头
  @get:JvmName("body") val body: RequestBody?, // 请求体
  internal val tags: Map<Class<*>, Any> // 请求标签
) {
  // ...
  open class Builder {
    internal var url: HttpUrl? = null
    internal var method: String
    internal var headers: Headers.Builder
    internal var body: RequestBody? = null
    
    //...
    
    constructor() {
      this.method = "GET"
      this.headers = Headers.Builder()
    }
    
    /**
     * Sets the URL target of this request.
     *
     * @throws IllegalArgumentException if [url] is not a valid HTTP or HTTPS URL. Avoid this
     *     exception by calling [HttpUrl.parse]; it returns null for invalid URLs.
     */
    open fun url(url: String): Builder {
      // Silently replace web socket URLs with HTTP URLs.
      val finalUrl: String = when {
        url.startsWith("ws:", ignoreCase = true) -> {
          "http:${url.substring(3)}"
        }
        url.startsWith("wss:", ignoreCase = true) -> {
          "https:${url.substring(4)}"
        }
        else -> url
      }

      return url(finalUrl.toHttpUrl())
    }
    
    // ...
    
    open fun build(): Request {
      return Request(
          checkNotNull(url) { "url == null" },
          method,
          headers.build(),
          body,
          tags.toImmutableMap()
      )
    }
}

Request 也是通过 Builder 形式来创建的

Call

Call call = client.newCall(request);

Call 即调用是一个准备好去执行的请求 Request

interface Call : Cloneable {

  fun request(): Request

  @Throws(IOException::class)
  fun execute(): Response

  fun enqueue(responseCallback: Callback)

  fun cancel()

  fun isExecuted(): Boolean

  fun isCanceled(): Boolean

  fun timeout(): Timeout

  override fun clone(): Call

  interface Factory {
    fun newCall(request: Request): Call
  }
}
request()
execute()
enqueue(responseCallback: Callback)
cancel()
isExecuted()
isCanceled()
timeout()

OkHttpClient 实现了 Call.Factory,使用工厂模式将构建的细节交给具体实现,顶层只需要拿到 Call 对象即可

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  // ...
  
  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call {
    return RealCall.newRealCall(this, request, false /* for web socket */)
  }
  
  // ...
}

继续看 RealCall 中的 newRealCall 方法:

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  /**
   * There is a cycle between the [Call] and [Transmitter] that makes this awkward.
   * This is set after immediately after creating the call instance.
   */
  private lateinit var transmitter: Transmitter
  
  // ...
  
  companion object {
    fun newRealCall(
      client: OkHttpClient,
      originalRequest: Request,
      forWebSocket: Boolean
    ): RealCall {
      // Safely publish the Call instance to the EventListener.
      return RealCall(client, originalRequest, forWebSocket).apply {
        transmitter = Transmitter(client, this)
      }
    }
  }
}

RealCall 为具体产品,实现了 Call 接口;其中 Transmitter 是 OkHttp 的应用层和网络层的一个桥梁类,包含了连接,请求,响应和流

Transmitter

class Transmitter(
  private val client: OkHttpClient,
  private val call: Call
) {
  private val connectionPool: RealConnectionPool = client.connectionPool().delegate
  private val eventListener: EventListener = client.eventListenerFactory().create(call)
  private val timeout = object : AsyncTimeout() {
    override fun timedOut() {
      cancel()
    }
  }.apply {
    timeout(client.callTimeoutMillis().toLong(), MILLISECONDS)
  }
  // ...
}

在创建 Transmitter 对象的时候设置了相关指标的监听器和 ConnectionPool

execute()

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  // ...
  
  override fun execute(): Response {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.timeoutEnter()
    transmitter.callStart()
    try {
      client.dispatcher().executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher().finished(this)
    }
  }
  
  // ...
}
  • 同步代码块,内部是做一个判断,判断是否已经执行execute方法,如果执行了抛出异常
  • 超时计时,最终调用的是 AsyncTimeout 类中的 enter() 方法
  • 请求开始的相关操作
    getStackTraceForCloseable()
    
  • 将请求 call 添加到调度器中的同步双端队列中
  • 通过拦截器链获取响应并返回
  • 请求结束时候回收移除同步请求

getResponseWithInterceptorChain()

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  // ...
  
  @Throws(IOException::class)
  fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = ArrayList<Interceptor>()
    interceptors.addAll(client.interceptors())
    interceptors.add(RetryAndFollowUpInterceptor(client)) // 失败重试以及重定向
    interceptors.add(BridgeInterceptor(client.cookieJar())) // 用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应
    interceptors.add(CacheInterceptor(client.internalCache())) // 读取缓存直接返回、更新缓存
    interceptors.add(ConnectInterceptor(client)) // 和服务器建立连接
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors())
    }
    interceptors.add(CallServerInterceptor(forWebSocket)) // 向服务器发送请求数据、从服务器读取响应数据

    val chain = RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis())

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (transmitter.isCanceled) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw transmitter.noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null)
      }
    }
  }
  
  // ...
}
  • 将自定义的拦截器和 okhttp 本身存在的拦截器添加到拦截器的集合
  • 创建一个拦截器链对象 Interceptor.Chain
  • 调用拦截器链对象的 proceed() ,开启链式调用请求,并最终返回响应 response
  • 结束请求,调用 Transmitter 对象的 noMoreExchanges() ,释放请求连接

Interceptor 使用的是责任链模式

enqueue()

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  // ...
  
  override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.callStart()
    client.dispatcher().enqueue(AsyncCall(responseCallback))
  }
  
  // ...
}

最终调用的是 Dispatcher 中的 enqueue()

Dispatcher

executed(call: RealCall)

class Dispatcher constructor() {
  // ...
  
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()
  
  // ...
  
  /** Used by `Call#execute` to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
  
  // ...
}

直接将将 call 添加到正在执行的请求队列中去,runningSyncCalls 为正在请求的同步队列

enqueue(call: AsyncCall)

class Dispatcher constructor() {
  // ...
  
  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()
  
  // ...
  
  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host())
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
  
  // ...
}

封装到一个 AsyncCall 中传递进来,添加到正在等待的异步队列 readyAsyncCalls 中去,接着继续调用 promoteAndExecute() 方法执行相关操作

promoteAndExecute()

class Dispatcher constructor() {
  // ...
  
  /**
   * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
   * executor service. Must not be called with synchronization because executing calls can call
   * into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
  private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))

    val executableCalls = ArrayList<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService())
    }

    return isRunning
  }
  
  @Synchronized fun executorService(): ExecutorService {
    if (executorService == null) {
      executorService = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
    }
    return executorService!!
  }
  
  // ...
}
maxRequests
maxRequestsPerHost

AsyncCall

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    /**
     * Attempt to enqueue this async call on [executorService]. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    fun executeOn(executorService: ExecutorService) {
      assert(!Thread.holdsLock(client.dispatcher()))
      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        transmitter.noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher().finished(this) // This call is no longer running!
        }
      }
    }
    
        override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        transmitter.timeoutEnter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } finally {
          client.dispatcher().finished(this)
        }
      }
    }
  }
}

又回到了 getResponseWithInterceptorChain()

RealInterceptorChain

所有的 interceptor 都整合到了 RealInterceptorChain 中,执行拦截器链方法 proceed()

class RealInterceptorChain(
  private val interceptors: List<Interceptor>,
  private val transmitter: Transmitter,
  private val exchange: Exchange?,
  private val index: Int,
  private val request: Request,
  private val call: Call,
  private val connectTimeout: Int,
  private val readTimeout: Int,
  private val writeTimeout: Int
) : Interceptor.Chain {
  
  // ...
  
  override fun proceed(request: Request): Response {
    return proceed(request, transmitter, exchange)
  }

  @Throws(IOException::class)
  fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
    if (index >= interceptors.size) throw AssertionError()

    calls++

    // If we already have a stream, confirm that the incoming request will use it.
    check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
      "network interceptor ${interceptors[index - 1]} must retain the same host and port"
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    check(this.exchange == null || calls <= 1) {
      "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
    }

    // Call the next interceptor in the chain.
    val next = RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    // Confirm that the next interceptor made its required call to chain.proceed().
    check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
      "network interceptor $interceptor must call proceed() exactly once"
    }

    check(response.body() != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }
}

在初始化的时候,会将所有拦截器组成的集合传递过来,同时将请求 RequestCall 也会传递过来, index 参数,最开始传入的是 0, exchange 参数,如果是应用拦截器,connection 必须是 null;如果是网络拦截器,connection 必须不为 null

  • 首先判断 index 是否大于总的拦截器个数,大于抛出 AssertionError()
  • 对 calls 进行 +1 操作
  • 判断是否是一个网络拦截器,并且判断其 host 和 port 是否一致
  • 判断是否是一个网络拦截器,如果是判断是否该网络拦截器的 proceed 方法调用次数是否超过一次
  • 继续创建一个新的拦截器链对象,此时传入的 index 会进行 index+1 操作,表示开始真正的调用相关拦截器操作
  • 调用拦截器的 intercept 方法,将新的拦截器链对象塞进去
  • 返回 response 后进行校验

OkHttp源码解析

原文  http://yydcdut.com/2019/07/07/okhttp-analyse/
正文到此结束
Loading...