Github: okhttp 分析版本: 930d4d0
This is the last interceptor in the chain. It makes a network call to the server
ConnectInterceptor
拦截器的功能就是负责与服务器建立 Socket 连接,并且创建了一个 Exchange
它包括通向服务器的输入流和输出流。而接下来的 CallServerInterceptor
拦截器的功能使用 Exchange
与服务器进行数据的读写操作的
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange()
val request = realChain.request()
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
// 写入请求头信息
exchange.writeRequestHeaders(request)
var responseHeadersStarted = false
var responseBuilder: Response.Builder? = null
// 写入请求体信息(有请求体的情况)
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseHeadersStarted = true
exchange.responseHeadersStart()
responseBuilder = exchange.readResponseHeaders(true)
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection()!!.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
// 结束请求
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (!responseHeadersStarted) {
exchange.responseHeadersStart()
}
// 得到响应头
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(false)!!
}
// 构建初步响应
var response = responseBuilder
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis) // 发送请求的时间
.receivedResponseAtMillis(System.currentTimeMillis()) // 接收到响应的时间
.build()
var code = response.code()
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
response = exchange.readResponseHeaders(false)!!
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code()
}
exchange.responseHeadersEnd(response)
// 构建响应体
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request().header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body()?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body()?.contentLength()}")
}
return response
}
}
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun writeRequestHeaders(request: Request) {
try {
eventListener.requestHeadersStart(call)
codec.writeRequestHeaders(request)
eventListener.requestHeadersEnd(call, request)
} catch (e: IOException) {
eventListener.requestFailed(call, e)
trackFailure(e)
throw e
}
}
}
这里调用了 ExchangeCodec
的 writeRequestHeaders()
,对应的使用策略模式分别根据是 Http 还是 Http/2 请求
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
/**
* Prepares the HTTP headers and sends them to the server.
*
* For streaming requests with a body, headers must be prepared **before** the output stream has
* been written to. Otherwise the body would need to be buffered!
*
* For non-streaming requests with a body, headers must be prepared **after** the output stream
* has been written to and closed. This ensures that the `Content-Length` header field receives
* the proper value.
*/
override fun writeRequestHeaders(request: Request) {
val requestLine = RequestLine.get(
request, realConnection!!.route().proxy().type())
writeRequest(request.headers, requestLine)
}
/** Returns bytes of a request header for sending on an HTTP transport. */
fun writeRequest(headers: Headers, requestLine: String) {
check(state == STATE_IDLE) { "state: $state" }
sink.writeUtf8(requestLine).writeUtf8("/r/n")
for (i in 0 until headers.size) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("/r/n")
}
sink.writeUtf8("/r/n")
state = STATE_OPEN_REQUEST_BODY
}
}
遍历 Header,然后通过前一个拦截器建立的链接得到的 Sink
来进行写操作,同时将 state 变量赋值为了 STATE_OPEN_REQUEST_BODY
,运用了状态模式的思想
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun createRequestBody(request: Request, duplex: Boolean): Sink {
this.isDuplex = duplex
val contentLength = request.body!!.contentLength()
eventListener.requestBodyStart(call)
val rawRequestBody = codec.createRequestBody(request, contentLength)
return RequestBodySink(rawRequestBody, contentLength)
}
}
通过 createRequestBody 创建一个 Sink 对象,本质还是使用在 ConnectIntercept
创建的 ExchangeCodec
内部封装 Sink 对象进行写操作的
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return when {
request.body != null && request.body!!.isDuplex() -> throw ProtocolException(
"Duplex connections are not supported for HTTP/1")
request.isChunked() -> newChunkedSink() // Stream a request body of unknown length.
contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.
else -> // Stream a request body of a known length.
throw IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!")
}
}
private fun newChunkedSink(): Sink {
check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
state = STATE_WRITING_REQUEST_BODY
return ChunkedSink()
}
private fun newKnownLengthSink(): Sink {
check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
state = STATE_WRITING_REQUEST_BODY
return KnownLengthSink()
}
/** An HTTP request body. */
private inner class KnownLengthSink : Sink {
private val timeout = ForwardingTimeout(sink.timeout())
private var closed: Boolean = false
override fun timeout(): Timeout = timeout
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
checkOffsetAndCount(source.size, 0, byteCount)
sink.write(source, byteCount)
}
override fun flush() {
if (closed) return // Don't throw; this stream might have been closed on the caller's behalf.
sink.flush()
}
override fun close() {
if (closed) return
closed = true
detachTimeout(timeout)
state = STATE_READ_RESPONSE_HEADERS
}
}
/**
* An HTTP body with alternating chunk sizes and chunk bodies. It is the caller's responsibility
* to buffer chunks; typically by using a buffered sink with this sink.
*/
private inner class ChunkedSink : Sink {
private val timeout = ForwardingTimeout(sink.timeout())
private var closed: Boolean = false
override fun timeout(): Timeout = timeout
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
if (byteCount == 0L) return
sink.writeHexadecimalUnsignedLong(byteCount)
sink.writeUtf8("/r/n")
sink.write(source, byteCount)
sink.writeUtf8("/r/n")
}
@Synchronized
override fun flush() {
if (closed) return // Don't throw; this stream might have been closed on the caller's behalf.
sink.flush()
}
@Synchronized
override fun close() {
if (closed) return
closed = true
sink.writeUtf8("0/r/n/r/n")
detachTimeout(timeout)
state = STATE_READ_RESPONSE_HEADERS
}
}
}
根据请求头 Transfer-Encoding 是否为 chunked 的方式,来创建不同 Sink 实现类,如果是 chunked 方式那么就创建 ChunkedSink
;如果不是 chunked 就表示内容的大小是固定的,那么就根据 content-length 创建指定大小的 KnownLengthSink
,然后又在外边包装了一层 RequestBodySink
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
/** A request body that fires events when it completes. */
private inner class RequestBodySink internal constructor(
delegate: Sink,
/** The exact number of bytes to be written, or -1L if that is unknown. */
private val contentLength: Long
) : ForwardingSink(delegate) {
private var completed = false
private var bytesReceived = 0L
private var closed = false
@Throws(IOException::class)
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
if (contentLength != -1L && bytesReceived + byteCount > contentLength) {
throw ProtocolException(
"expected $contentLength bytes but received ${bytesReceived + byteCount}")
}
try {
super.write(source, byteCount)
this.bytesReceived += byteCount
} catch (e: IOException) {
throw complete(e)
}
}
@Throws(IOException::class)
override fun flush() {
try {
super.flush()
} catch (e: IOException) {
throw complete(e)
}
}
@Throws(IOException::class)
override fun close() {
if (closed) return
closed = true
if (contentLength != -1L && bytesReceived != contentLength) {
throw ProtocolException("unexpected end of stream")
}
try {
super.close()
complete(null)
} catch (e: IOException) {
throw complete(e)
}
}
private fun <E : IOException?> complete(e: E): E {
if (completed) return e
completed = true
return bodyComplete(bytesReceived, false, true, e)
}
}
}
最终调用的 write(source: Buffer, byteCount: Long)
是调用的传入的 sink 的 write(source: Buffer, byteCount: Long)
class FormBody internal constructor(
encodedNames: List<String>,
encodedValues: List<String>
) : RequestBody() {
@Throws(IOException::class)
override fun writeTo(sink: BufferedSink) {
writeOrCountBytes(sink, false)
}
/**
* Either writes this request to `sink` or measures its content length. We have one method
* do double-duty to make sure the counting and content are consistent, particularly when it comes
* to awkward operations like measuring the encoded length of header strings, or the
* length-in-digits of an encoded integer.
*/
private fun writeOrCountBytes(sink: BufferedSink?, countBytes: Boolean): Long {
var byteCount = 0L
val buffer: Buffer = if (countBytes) Buffer() else sink!!.buffer
for (i in 0 until encodedNames.size) {
if (i > 0) buffer.writeByte('&'.toInt())
buffer.writeUtf8(encodedNames[i])
buffer.writeByte('='.toInt())
buffer.writeUtf8(encodedValues[i])
}
if (countBytes) {
byteCount = buffer.size
buffer.clear()
}
return byteCount
}
}
具体是如何写入数据的,要根据传入到 Request 中的 RequestBody 的实现类来定,如果是表单类型的则是有 FormBody 负责具体的写操作,如果是文件类型的则是由 MutilPartBody 负责具体的写操作
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun finishRequest() {
try {
codec.finishRequest()
} catch (e: IOException) {
eventListener.requestFailed(call, e)
trackFailure(e)
throw e
}
}
}
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
override fun finishRequest() {
sink.flush()
}
}
sink.flush()
将缓存区中数据写入到底层的 sink 中,其实就是写入到 server 中去了,相当于一个刷新缓冲区的功能
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
try {
val result = codec.readResponseHeaders(expectContinue)
result?.initExchange(this)
return result
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
}
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) {
"state: $state"
}
try {
val statusLine = StatusLine.parse(readHeaderLine())
val responseBuilder = Response.Builder()
.protocol(statusLine.protocol) // 协议,也就是 http 的版本例如 http1/2 /spdy
.code(statusLine.code) // 响应码
.message(statusLine.message) // 响应消息
.headers(readHeaders()) // 响应头
return when {
expectContinue && statusLine.code == HTTP_CONTINUE -> {
null
}
statusLine.code == HTTP_CONTINUE -> {
state = STATE_READ_RESPONSE_HEADERS
responseBuilder
}
else -> {
state = STATE_OPEN_RESPONSE_BODY
responseBuilder
}
}
} catch (e: EOFException) {
// Provide more context if the server ends the stream before sending a response.
val address = realConnection?.route()?.address()?.url?.redact() ?: "unknown"
throw IOException("unexpected end of stream on $address", e)
}
}
}
当客户端将请求数据发送给服务端之后,服务端做了处理之后会将结果返回给客户端,这是客户端需要根据这些返回的数据构造出一个 Response 对象出来然后返回给调用者
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun openResponseBody(response: Response): ResponseBody {
try {
eventListener.responseBodyStart(call)
val contentType = response.header("Content-Type")
val contentLength = codec.reportedContentLength(response)
val rawSource = codec.openResponseBodySource(response)
val source = ResponseBodySource(rawSource, contentLength)
return RealResponseBody(contentType, contentLength, source.buffer())
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
/** A response body that fires events when it completes. */
internal inner class ResponseBodySource(
delegate: Source,
private val contentLength: Long
) : ForwardingSource(delegate) {
private var bytesReceived = 0L
private var completed = false
private var closed = false
init {
if (contentLength == 0L) {
complete(null)
}
}
@Throws(IOException::class)
override fun read(sink: Buffer, byteCount: Long): Long {
check(!closed) { "closed" }
try {
val read = delegate.read(sink, byteCount)
if (read == -1L) {
complete(null)
return -1L
}
val newBytesReceived = bytesReceived + read
if (contentLength != -1L && newBytesReceived > contentLength) {
throw ProtocolException("expected $contentLength bytes but received $newBytesReceived")
}
bytesReceived = newBytesReceived
if (newBytesReceived == contentLength) {
complete(null)
}
return read
} catch (e: IOException) {
throw complete(e)
}
}
@Throws(IOException::class)
override fun close() {
if (closed) return
closed = true
try {
super.close()
complete(null)
} catch (e: IOException) {
throw complete(e)
}
}
fun <E : IOException?> complete(e: E): E {
if (completed) return e
completed = true
return bodyComplete(bytesReceived, true, false, e)
}
}
}
返回一个 ResponseBody 对象,该对象封装了连接服务端的输入流对 Source 对象
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
override fun openResponseBodySource(response: Response): Source {
return when {
!response.promisesBody() -> newFixedLengthSource(0)
response.isChunked() -> newChunkedSource(response.request().url)
else -> {
val contentLength = response.headersContentLength()
if (contentLength != -1L) {
newFixedLengthSource(contentLength)
} else {
newUnknownLengthSource()
}
}
}
}
private fun newFixedLengthSource(length: Long): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
return FixedLengthSource(length)
}
private fun newChunkedSource(url: HttpUrl): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
return ChunkedSource(url)
}
private fun newUnknownLengthSource(): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
realConnection!!.noNewExchanges()
return UnknownLengthSource()
}
/** An HTTP body with a fixed length specified in advance. */
private inner class FixedLengthSource internal constructor(private var bytesRemaining: Long) :
AbstractSource() {
init {
if (bytesRemaining == 0L) {
responseBodyComplete()
}
}
override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
check(!closed) { "closed" }
if (bytesRemaining == 0L) return -1
val read = super.read(sink, minOf(bytesRemaining, byteCount))
if (read == -1L) {
realConnection!!.noNewExchanges() // The server didn't supply the promised content length.
val e = ProtocolException("unexpected end of stream")
responseBodyComplete()
throw e
}
bytesRemaining -= read
if (bytesRemaining == 0L) {
responseBodyComplete()
}
return read
}
override fun close() {
if (closed) return
if (bytesRemaining != 0L &&
!discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
realConnection!!.noNewExchanges() // Unread bytes remain on the stream.
responseBodyComplete()
}
closed = true
}
}
/** An HTTP body with alternating chunk sizes and chunk bodies. */
private inner class ChunkedSource internal constructor(private val url: HttpUrl) :
AbstractSource() {
private var bytesRemainingInChunk = NO_CHUNK_YET
private var hasMoreChunks = true
override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
check(!closed) { "closed" }
if (!hasMoreChunks) return -1
if (bytesRemainingInChunk == 0L || bytesRemainingInChunk == NO_CHUNK_YET) {
readChunkSize()
if (!hasMoreChunks) return -1
}
val read = super.read(sink, minOf(byteCount, bytesRemainingInChunk))
if (read == -1L) {
realConnection!!.noNewExchanges() // The server didn't supply the promised chunk length.
val e = ProtocolException("unexpected end of stream")
responseBodyComplete()
throw e
}
bytesRemainingInChunk -= read
return read
}
private fun readChunkSize() {
// Read the suffix of the previous chunk.
if (bytesRemainingInChunk != NO_CHUNK_YET) {
source.readUtf8LineStrict()
}
try {
bytesRemainingInChunk = source.readHexadecimalUnsignedLong()
val extensions = source.readUtf8LineStrict().trim()
if (bytesRemainingInChunk < 0L || extensions.isNotEmpty() && !extensions.startsWith(";")) {
throw ProtocolException("expected chunk size and optional extensions" +
" but was /"$bytesRemainingInChunk$extensions/"")
}
} catch (e: NumberFormatException) {
throw ProtocolException(e.message)
}
if (bytesRemainingInChunk == 0L) {
hasMoreChunks = false
trailers = readHeaders()
client!!.cookieJar().receiveHeaders(url, trailers!!)
responseBodyComplete()
}
}
override fun close() {
if (closed) return
if (hasMoreChunks &&
!discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
realConnection!!.noNewExchanges() // Unread bytes remain on the stream.
responseBodyComplete()
}
closed = true
}
}
/** An HTTP message body terminated by the end of the underlying stream. */
private inner class UnknownLengthSource : AbstractSource() {
private var inputExhausted: Boolean = false
override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
check(!closed) { "closed" }
if (inputExhausted) return -1
val read = super.read(sink, byteCount)
if (read == -1L) {
inputExhausted = true
responseBodyComplete()
return -1
}
return read
}
override fun close() {
if (closed) return
if (!inputExhausted) {
responseBodyComplete()
}
closed = true
}
}
}
根据响应的不同请求创建不同的 Source 对象