Github: okhttp 分析版本: 930d4d0
An HTTP client for Android, Kotlin, and Java.
OkHttp is an HTTP client that’s efficient by default:
public class GetExample {
OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
public static void main(String[] args) throws IOException {
GetExample example = new GetExample();
String response = example.run("https://raw.github.com/square/okhttp/master/README.md");
System.out.println(response);
}
}
public class PostExample {
public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
OkHttpClient client = new OkHttpClient();
String post(String url, String json) throws IOException {
RequestBody body = RequestBody.create(json, JSON);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
String bowlingJson(String player1, String player2) {
return "{'winCondition':'HIGH_SCORE',"
+ "'name':'Bowling',"
+ "'round':4,"
+ "'lastSaved':1367702411696,"
+ "'dateStarted':1367702378785,"
+ "'players':["
+ "{'name':'" + player1 + "','history':[10,8,6,7,8],'color':-13388315,'total':39},"
+ "{'name':'" + player2 + "','history':[6,10,5,10,10],'color':-48060,'total':41}"
+ "]}";
}
public static void main(String[] args) throws IOException {
PostExample example = new PostExample();
String json = example.bowlingJson("Jesse", "Jake");
String response = example.post("http://www.roundsapp.com/post", json);
System.out.println(response);
}
}
okhttp
OkHttpClient client = new OkHttpClient();
根据名字我们就能看出,OkHttpClient 为 OkHttp 的客户端,在使用的时候首先要做的就是要创建这样一个客户端
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
constructor() : this(Builder())
}
默认构造方法使用的是默认配置的 Builder:
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher() // 调度器
internal var proxy: Proxy? = null // 代理
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS // 协议
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS // 传输层版本和连接协议
internal val interceptors: MutableList<Interceptor> = mutableListOf() // 拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() // 网络拦截器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector() // 代理选择器
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES // cookie
internal var cache: Cache? = null // cache 缓存
internal var internalCache: InternalCache? = null // 内部缓存
internal var socketFactory: SocketFactory = SocketFactory.getDefault() // socket 工厂
internal var sslSocketFactory: SSLSocketFactory? = null // socket工厂 用于https
internal var certificateChainCleaner: CertificateChainCleaner? = null // 验证确认响应书,适用HTTPS 请求连接的主机名
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier // 主机名字确认
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT // 证书链
internal var proxyAuthenticator: Authenticator = Authenticator.NONE // 代理身份验证
internal var authenticator: Authenticator = Authenticator.NONE // 身份验证
internal var connectionPool: ConnectionPool = ConnectionPool() //链接复用池
internal var dns: Dns = Dns.SYSTEM // DNS
internal var followSslRedirects: Boolean = true // 重定向
internal var followRedirects: Boolean = true // 本地重定向
internal var retryOnConnectionFailure: Boolean = true // 重试连接失败
internal var callTimeout: Int = 0 // 请求超时
internal var connectTimeout: Int = 10000 // 连接超时
internal var readTimeout: Int = 10000 // 读取超时
internal var writeTimeout: Int = 10000 // 写入超时
internal var pingInterval: Int = 0 // Web socket and HTTP/2 ping interval
// ...
fun build(): OkHttpClient = OkHttpClient(this)
}
okhttp 的最佳表现就是创建一个 OkHttpClient 实例,并将其重用到所有的 http 请求调用上之所以所有请求公用一个 OkHttpClient,因为每个 OkHttpClient 都有自己的的连接池和线程池,这样的话可以重用连接和线程可减少延迟并节省内存
Request request = new Request.Builder()
.url(url)
.build();
发送一个 HTTP 请求类要构建一个 Request 对象
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl, // 请求地址
@get:JvmName("method") val method: String, // 请求方法[GET/POST/PUT/PATCH/...]
@get:JvmName("headers") val headers: Headers, // 请求头
@get:JvmName("body") val body: RequestBody?, // 请求体
internal val tags: Map<Class<*>, Any> // 请求标签
) {
// ...
open class Builder {
internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null
//...
constructor() {
this.method = "GET"
this.headers = Headers.Builder()
}
/**
* Sets the URL target of this request.
*
* @throws IllegalArgumentException if [url] is not a valid HTTP or HTTPS URL. Avoid this
* exception by calling [HttpUrl.parse]; it returns null for invalid URLs.
*/
open fun url(url: String): Builder {
// Silently replace web socket URLs with HTTP URLs.
val finalUrl: String = when {
url.startsWith("ws:", ignoreCase = true) -> {
"http:${url.substring(3)}"
}
url.startsWith("wss:", ignoreCase = true) -> {
"https:${url.substring(4)}"
}
else -> url
}
return url(finalUrl.toHttpUrl())
}
// ...
open fun build(): Request {
return Request(
checkNotNull(url) { "url == null" },
method,
headers.build(),
body,
tags.toImmutableMap()
)
}
}
Request 也是通过 Builder 形式来创建的
Call call = client.newCall(request);
Call 即调用是一个准备好去执行的请求 Request
interface Call : Cloneable {
fun request(): Request
@Throws(IOException::class)
fun execute(): Response
fun enqueue(responseCallback: Callback)
fun cancel()
fun isExecuted(): Boolean
fun isCanceled(): Boolean
fun timeout(): Timeout
override fun clone(): Call
interface Factory {
fun newCall(request: Request): Call
}
}
request() execute() enqueue(responseCallback: Callback) cancel() isExecuted() isCanceled() timeout()
OkHttpClient 实现了 Call.Factory,使用工厂模式将构建的细节交给具体实现,顶层只需要拿到 Call 对象即可
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
// ...
/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, false /* for web socket */)
}
// ...
}
继续看 RealCall 中的 newRealCall 方法:
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
/**
* There is a cycle between the [Call] and [Transmitter] that makes this awkward.
* This is set after immediately after creating the call instance.
*/
private lateinit var transmitter: Transmitter
// ...
companion object {
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
}
}
RealCall 为具体产品,实现了 Call 接口;其中 Transmitter 是 OkHttp 的应用层和网络层的一个桥梁类,包含了连接,请求,响应和流
class Transmitter(
private val client: OkHttpClient,
private val call: Call
) {
private val connectionPool: RealConnectionPool = client.connectionPool().delegate
private val eventListener: EventListener = client.eventListenerFactory().create(call)
private val timeout = object : AsyncTimeout() {
override fun timedOut() {
cancel()
}
}.apply {
timeout(client.callTimeoutMillis().toLong(), MILLISECONDS)
}
// ...
}
在创建 Transmitter 对象的时候设置了相关指标的监听器和 ConnectionPool
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// ...
override fun execute(): Response {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.timeoutEnter()
transmitter.callStart()
try {
client.dispatcher().executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher().finished(this)
}
}
// ...
}
AsyncTimeout 类中的 enter() 方法 getStackTraceForCloseable()
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// ...
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = ArrayList<Interceptor>()
interceptors.addAll(client.interceptors())
interceptors.add(RetryAndFollowUpInterceptor(client)) // 失败重试以及重定向
interceptors.add(BridgeInterceptor(client.cookieJar())) // 用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应
interceptors.add(CacheInterceptor(client.internalCache())) // 读取缓存直接返回、更新缓存
interceptors.add(ConnectInterceptor(client)) // 和服务器建立连接
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors())
}
interceptors.add(CallServerInterceptor(forWebSocket)) // 向服务器发送请求数据、从服务器读取响应数据
val chain = RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis())
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}
// ...
}
proceed() ,开启链式调用请求,并最终返回响应 response Transmitter 对象的 noMoreExchanges() ,释放请求连接 Interceptor 使用的是责任链模式
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// ...
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.callStart()
client.dispatcher().enqueue(AsyncCall(responseCallback))
}
// ...
}
最终调用的是 Dispatcher 中的 enqueue()
class Dispatcher constructor() {
// ...
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()
// ...
/** Used by `Call#execute` to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
// ...
}
直接将将 call 添加到正在执行的请求队列中去,runningSyncCalls 为正在请求的同步队列
class Dispatcher constructor() {
// ...
/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
// ...
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
// ...
}
封装到一个 AsyncCall 中传递进来,添加到正在等待的异步队列 readyAsyncCalls 中去,接着继续调用 promoteAndExecute() 方法执行相关操作
class Dispatcher constructor() {
// ...
/**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = ArrayList<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService())
}
return isRunning
}
@Synchronized fun executorService(): ExecutorService {
if (executorService == null) {
executorService = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorService!!
}
// ...
}
maxRequests maxRequestsPerHost
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
/**
* Attempt to enqueue this async call on [executorService]. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher()))
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher().finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} finally {
client.dispatcher().finished(this)
}
}
}
}
}
又回到了 getResponseWithInterceptorChain() 中
所有的 interceptor 都整合到了 RealInterceptorChain 中,执行拦截器链方法 proceed()
class RealInterceptorChain(
private val interceptors: List<Interceptor>,
private val transmitter: Transmitter,
private val exchange: Exchange?,
private val index: Int,
private val request: Request,
private val call: Call,
private val connectTimeout: Int,
private val readTimeout: Int,
private val writeTimeout: Int
) : Interceptor.Chain {
// ...
override fun proceed(request: Request): Response {
return proceed(request, transmitter, exchange)
}
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
calls++
// If we already have a stream, confirm that the incoming request will use it.
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
// Call the next interceptor in the chain.
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
// Confirm that the next interceptor made its required call to chain.proceed().
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
check(response.body() != null) { "interceptor $interceptor returned a response with no body" }
return response
}
}
在初始化的时候,会将所有拦截器组成的集合传递过来,同时将请求 Request 和 Call 也会传递过来, index 参数,最开始传入的是 0, exchange 参数,如果是应用拦截器,connection 必须是 null;如果是网络拦截器,connection 必须不为 null