Tomcat源码解析系列(十六)Http11Processor

前言

上篇文章讲到了 ConnectionHandler#process 方法,其中最关键的步骤是获取一个 org.apache.coyote.Processor 对象,然后调用这个对象的 process 方法,传入的参数就是它自己接收的参数,也就是 和 NioSocketWrapper 对象 和 SocketEvent 对象。tomcat 中 用于处理 http 请求的 Processor 的实现类有 Http11Processor 和 StreamProcessor,这两者的父类都是 AbstractProcessor,而 AbstractProcessor 的父类是 AbstractProcessorLight,AbstractProcessorLight 直接实现了 Processor。StreamProcessor 是用于处理 Http/2 的,本文以 Http11Processor 为例进行分析。StreamProcessor 和 Http11Processor 在大体处理逻辑上是一样的。

1. Http11Processor 构造方法

public Http11Processor(AbstractHttp11Protocol<?> protocol, Adapter adapter) {
    super(adapter);
    this.protocol = protocol;

    httpParser = new HttpParser(protocol.getRelaxedPathChars(),
            protocol.getRelaxedQueryChars());

    inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(),
            protocol.getRejectIllegalHeaderName(), httpParser);
    request.setInputBuffer(inputBuffer);

    outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());
    response.setOutputBuffer(outputBuffer);

    // Create and add the identity filters.
    inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize()));
    outputBuffer.addFilter(new IdentityOutputFilter());

    // Create and add the chunked filters.
    inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(),
            protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(),
            protocol.getMaxSwallowSize()));
    outputBuffer.addFilter(new ChunkedOutputFilter());

    // Create and add the void filters.
    inputBuffer.addFilter(new VoidInputFilter());
    outputBuffer.addFilter(new VoidOutputFilter());

    // Create and add buffered input filter
    inputBuffer.addFilter(new BufferedInputFilter());

    // Create and add the chunked filters.
    //inputBuffer.addFilter(new GzipInputFilter());
    outputBuffer.addFilter(new GzipOutputFilter());

    pluggableFilterIndex = inputBuffer.getFilters().length;
}
public AbstractProcessor(Adapter adapter) {
    this(adapter, new Request(), new Response());
}

protected AbstractProcessor(Adapter adapter, Request coyoteRequest, Response coyoteResponse) {
    this.adapter = adapter;
    asyncStateMachine = new AsyncStateMachine(this);
    request = coyoteRequest;
    response = coyoteResponse;
    response.setHook(this);
    request.setResponse(response);
    request.setHook(this);
    userDataHelper = new UserDataHelper(getLog());
}

Http11Processor 的构造方法里初始化了 request(org.apache.coyote.Request),response(org.apache.coyote.Response), httpParser(HttpParser)、inputBuffer(Http11InputBuffer)、outputBuffer(Http11OutputBuffer),以及一些 InputFilter 和 OutputFilter 等,这些是处理 http 协议必需的。

2. AbstractProcessorLight#process

Processor#process 方法在 AbstractProcessorLight 中被实现。

@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
        throws IOException {

    SocketState state = SocketState.CLOSED;
    Iterator<DispatchType> dispatches = null;
    do {
        if (dispatches != null) {
            DispatchType nextDispatch = dispatches.next();
            state = dispatch(nextDispatch.getSocketStatus());
        } else if (status == SocketEvent.DISCONNECT) {
            // Do nothing here, just wait for it to get recycled
        } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
            state = dispatch(status);
            if (state == SocketState.OPEN) {
                // There may be pipe-lined data to read. If the data isn't
                // processed now, execution will exit this loop and call
                // release() which will recycle the processor (and input
                // buffer) deleting any pipe-lined data. To avoid this,
                // process it now.
                state = service(socketWrapper);
            }
        } else if (status == SocketEvent.OPEN_WRITE) {
            // Extra write event likely after async, ignore
            state = SocketState.LONG;
        } else if (status == SocketEvent.OPEN_READ){
            state = service(socketWrapper);
        } else {
            // Default to closing the socket if the SocketEvent passed in
            // is not consistent with the current state of the Processor
            state = SocketState.CLOSED;
        }

        if (getLog().isDebugEnabled()) {
            getLog().debug("Socket: [" + socketWrapper +
                    "], Status in: [" + status +
                    "], State out: [" + state + "]");
        }

        if (state != SocketState.CLOSED && isAsync()) {
            state = asyncPostProcess();
            if (getLog().isDebugEnabled()) {
                getLog().debug("Socket: [" + socketWrapper +
                        "], State after async post processing: [" + state + "]");
            }
        }

        if (dispatches == null || !dispatches.hasNext()) {
            // Only returns non-null iterator if there are
            // dispatches to process.
            dispatches = getIteratorAndClearDispatches();
        }
    } while (state == SocketState.ASYNC_END ||
            dispatches != null && state != SocketState.CLOSED);

    return state;
}

process 方法在一个 do-while 循环里,根据不同的条件,分别处理,其中重要的处理是调用 dispatch 方法或者 service 方法。

2. AbstractProcessor#dispatch

/**
 * Process an in-progress request that is not longer in standard HTTP mode.
 * Uses currently include Servlet 3.0 Async and HTTP upgrade connections.
 * Further uses may be added in the future. These will typically start as
 * HTTP requests.
 * @param status The event to process
 * @return the socket state
 */
protected abstract SocketState dispatch(SocketEvent status);

注释里可以看出,dispatch 方法是处理非标准 HTTP 模式下的正在处理中的请求,这是在 Servlet 3.0 Async 和 HTTP 升级连接里用到的。

@Override
public final SocketState dispatch(SocketEvent status) {

    if (status == SocketEvent.OPEN_WRITE && response.getWriteListener() != null) {
        asyncStateMachine.asyncOperation();
        try {
            if (flushBufferedWrite()) {
                return SocketState.LONG;
            }
        } catch (IOException ioe) {
            if (getLog().isDebugEnabled()) {
                getLog().debug("Unable to write async data.", ioe);
            }
            status = SocketEvent.ERROR;
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ioe);
        }
    } else if (status == SocketEvent.OPEN_READ && request.getReadListener() != null) {
        dispatchNonBlockingRead();
    } else if (status == SocketEvent.ERROR) {
        // An I/O error occurred on a non-container thread. This includes:
        // - read/write timeouts fired by the Poller (NIO & APR)
        // - completion handler failures in NIO2

        if (request.getAttribute(RequestDispatcher.ERROR_EXCEPTION) == null) {
            // Because the error did not occur on a container thread the
            // request's error attribute has not been set. If an exception
            // is available from the socketWrapper, use it to set the
            // request's error attribute here so it is visible to the error
            // handling.
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, socketWrapper.getError());
        }

        if (request.getReadListener() != null || response.getWriteListener() != null) {
            // The error occurred during non-blocking I/O. Set the correct
            // state else the error handling will trigger an ISE.
            asyncStateMachine.asyncOperation();
        }
    }

    RequestInfo rp = request.getRequestProcessor();
    try {
        rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
        if (!getAdapter().asyncDispatch(request, response, status)) {
            setErrorState(ErrorState.CLOSE_NOW, null);
        }
    } catch (InterruptedIOException e) {
        setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        setErrorState(ErrorState.CLOSE_NOW, t);
        getLog().error(sm.getString("http11processor.request.process"), t);
    }

    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

    if (getErrorState().isError()) {
        request.updateCounters();
        return SocketState.CLOSED;
    } else if (isAsync()) {
        return SocketState.LONG;
    } else {
        request.updateCounters();
        return dispatchEndRequest();
    }
}

在 dispatch 方法里,首先根据传入的参数 SocketEvent 的值做不同的处理。其中 flushBufferedWrite() 方法就是把 Http11OutputBuffer 里的数据写回给客户端

@Override
protected boolean flushBufferedWrite() throws IOException {
    if (outputBuffer.hasDataToWrite()) {
        if (outputBuffer.flushBuffer(false)) {
            // The buffer wasn't fully flushed so re-register the
            // socket for write. Note this does not go via the
            // Response since the write registration state at
            // that level should remain unchanged. Once the buffer
            // has been emptied then the code below will call
            // Adaptor.asyncDispatch() which will enable the
            // Response to respond to this event.
            outputBuffer.registerWriteInterest();
            return true;
        }
    }
    return false;
}

Http11OutputBuffer#flushBuffer

protected boolean flushBuffer(boolean block) throws IOException  {
    return socketWrapper.flush(block);
}

asyncStateMachine 是 AsyncStateMachine 类型的对象

synchronized void asyncOperation() {
    if (state==AsyncState.STARTED) {
        state = AsyncState.READ_WRITE_OP;
    } else {
        throw new IllegalStateException(
                sm.getString("asyncStateMachine.invalidAsyncState",
                        "asyncOperation()", state));
    }
}

asyncStateMachine.asyncOperation() 就是把 AsyncStateMachine 里的 state 属性从 AsyncState.STARTED 改成 AsyncState.READ_WRITE_OP。READ_WRITE_OP 状态下表示这个请求已经准备好读写了。

if-else 语句块之后,就是 dispatch 方法的关键了。

首先调用 request.getRequestProcessor() 获取一个 RequestInfo 对象。

private final RequestInfo reqProcessorMX=new RequestInfo(this);

public RequestInfo getRequestProcessor() {
    return reqProcessorMX;
}

这个 request 是 AbstractProcessor 构造方法里初始化的 org.apache.coyote.Request 对象。

然后就在 try-catch 语句块里调用 getAdapter().asyncDispatch(request, response, status) 方法。

getAdapter() 是 AbstractProcessor 里的方法,返回的是 Adapter 属性。

protected final Adapter adapter;

/**
 * Get the associated adapter.
 *
 * @return the associated adapter
 */
public Adapter getAdapter() {
    return adapter;
}

而 AbstractProcessor 里的 Adapter 类型的属性是在创建 Http11Processor 对象时赋值的,传入的 CoyoteAdapter 对象,

这个对象是在 Connector#initInternal 方法里创建并赋值给 AbstractProtocol 的 adapter 属性的。

所以 getAdapter().asyncDispatch(…) 调用的是 CoyoteAdapter#asyncDispatch 方法。

CoyoteAdapter 是处理请求的环节中重要的一环,后面的文章中会讲到,这里先略过。

3. Http11Processor#service

/**
 * Service a 'standard' HTTP request. This method is called for both new
 * requests and for requests that have partially read the HTTP request line
 * or HTTP headers. Once the headers have been fully read this method is not
 * called again until there is a new HTTP request to process. Note that the
 * request type may change during processing which may result in one or more
 * calls to {@link #dispatch(SocketEvent)}. Requests may be pipe-lined.
 *
 * @param socketWrapper The connection to process
 *
 * @return The state the caller should put the socket in when this method
 *         returns
 *
 * @throws IOException If an I/O error occurs during the processing of the
 *         request
 */
protected abstract SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException;

从注释中可以看出,与 dispatch 方法相对立,service 方法是用来处理标准的 HTTP 请求的。service 方法的实现 Http11Processor 里。

@Override
public SocketState service(SocketWrapperBase<?> socketWrapper)
    throws IOException {
    RequestInfo rp = request.getRequestProcessor();
    rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

    // Setting up the I/O
    setSocketWrapper(socketWrapper);
    inputBuffer.init(socketWrapper);
    outputBuffer.init(socketWrapper);

    // Flags
    keepAlive = true;
    openSocket = false;
    readComplete = true;
    boolean keptAlive = false;
    SendfileState sendfileState = SendfileState.DONE;

    while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
            sendfileState == SendfileState.DONE && !protocol.isPaused()) {

        // Parsing the request header
        try {
            if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),
                    protocol.getKeepAliveTimeout())) {
                if (inputBuffer.getParsingRequestLinePhase() == -1) {
                    return SocketState.UPGRADING;
                } else if (handleIncompleteRequestLineRead()) {
                    break;
                }
            }

            if (protocol.isPaused()) {
                // 503 - Service unavailable
                response.setStatus(503);
                setErrorState(ErrorState.CLOSE_CLEAN, null);
            } else {
                keptAlive = true;
                // Set this every time in case limit has been changed via JMX
                request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
                if (!inputBuffer.parseHeaders()) {
                    // We've read part of the request, don't recycle it
                    // instead associate it with the socket
                    openSocket = true;
                    readComplete = false;
                    break;
                }
                if (!protocol.getDisableUploadTimeout()) {
                    socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
                }
            }
        } catch (IOException e) {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("http11processor.header.parse"), e);
            }
            setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
            break;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            UserDataHelper.Mode logMode = userDataHelper.getNextMode();
            if (logMode != null) {
                String message = sm.getString("http11processor.header.parse");
                switch (logMode) {
                    case INFO_THEN_DEBUG:
                        message += sm.getString("http11processor.fallToDebug");
                        //$FALL-THROUGH$
                    case INFO:
                        log.info(message, t);
                        break;
                    case DEBUG:
                        log.debug(message, t);
                }
            }
            // 400 - Bad Request
            response.setStatus(400);
            setErrorState(ErrorState.CLOSE_CLEAN, t);
        }

        // Has an upgrade been requested?
        Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
        boolean foundUpgrade = false;
        while (connectionValues.hasMoreElements() && !foundUpgrade) {
            foundUpgrade = connectionValues.nextElement().toLowerCase(
                    Locale.ENGLISH).contains("upgrade");
        }

        if (foundUpgrade) {
            // Check the protocol
            String requestedProtocol = request.getHeader("Upgrade");

            UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);
            if (upgradeProtocol != null) {
                if (upgradeProtocol.accept(request)) {
                    response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
                    response.setHeader("Connection", "Upgrade");
                    response.setHeader("Upgrade", requestedProtocol);
                    action(ActionCode.CLOSE,  null);
                    getAdapter().log(request, response, 0);

                    InternalHttpUpgradeHandler upgradeHandler =
                            upgradeProtocol.getInternalUpgradeHandler(
                                    socketWrapper, getAdapter(), cloneRequest(request));
                    UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
                    action(ActionCode.UPGRADE, upgradeToken);
                    return SocketState.UPGRADING;
                }
            }
        }

        if (getErrorState().isIoAllowed()) {
            // Setting up filters, and parse some request headers
            rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
            try {
                prepareRequest();
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.request.prepare"), t);
                }
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
            }
        }

        int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
        if (maxKeepAliveRequests == 1) {
            keepAlive = false;
        } else if (maxKeepAliveRequests > 0 &&
                socketWrapper.decrementKeepAlive() <= 0) {
            keepAlive = false;
        }

        // Process the request in the adapter
        if (getErrorState().isIoAllowed()) {
            try {
                rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                getAdapter().service(request, response);
                // Handle when the response was committed before a serious
                // error occurred.  Throwing a ServletException should both
                // set the status to 500 and set the errorException.
                // If we fail here, then the response is likely already
                // committed, so we can't try and set headers.
                if(keepAlive && !getErrorState().isError() && !isAsync() &&
                        statusDropsConnection(response.getStatus())) {
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                }
            } catch (InterruptedIOException e) {
                setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
            } catch (HeadersTooLargeException e) {
                log.error(sm.getString("http11processor.request.process"), e);
                // The response should not have been committed but check it
                // anyway to be safe
                if (response.isCommitted()) {
                    setErrorState(ErrorState.CLOSE_NOW, e);
                } else {
                    response.reset();
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, e);
                    response.setHeader("Connection", "close"); // TODO: Remove
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("http11processor.request.process"), t);
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
        }

        // Finish the handling of the request
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
        if (!isAsync()) {
            // If this is an async request then the request ends when it has
            // been completed. The AsyncContext is responsible for calling
            // endRequest() in that case.
            endRequest();
        }
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

        // If there was an error, make sure the request is counted as
        // and error, and update the statistics counter
        if (getErrorState().isError()) {
            response.setStatus(500);
        }

        if (!isAsync() || getErrorState().isError()) {
            request.updateCounters();
            if (getErrorState().isIoAllowed()) {
                inputBuffer.nextRequest();
                outputBuffer.nextRequest();
            }
        }

        if (!protocol.getDisableUploadTimeout()) {
            int connectionTimeout = protocol.getConnectionTimeout();
            if(connectionTimeout > 0) {
                socketWrapper.setReadTimeout(connectionTimeout);
            } else {
                socketWrapper.setReadTimeout(0);
            }
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

        sendfileState = processSendfile(socketWrapper);
    }

    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

    if (getErrorState().isError() || protocol.isPaused()) {
        return SocketState.CLOSED;
    } else if (isAsync()) {
        return SocketState.LONG;
    } else if (isUpgrade()) {
        return SocketState.UPGRADING;
    } else {
        if (sendfileState == SendfileState.PENDING) {
            return SocketState.SENDFILE;
        } else {
            if (openSocket) {
                if (readComplete) {
                    return SocketState.OPEN;
                } else {
                    return SocketState.LONG;
                }
            } else {
                return SocketState.CLOSED;
            }
        }
    }
}

service 方法的逻辑有点复杂,先执行一下初步工作,也就是inputBuffer.init(socketWrapper) 、 outputBuffer.init(socketWrapper) 等,然后就进入一个 while 循环。

在 while 循环里,是在 try-catch 语句块里执行

inputBuffer.parseRequestLine(…)

inputBuffer.parseHeaders()

以及一些状态和属性的设置。

Http11InputBuffer#parseRequestLine 是用来处理请求行的,Http11InputBuffer#parseHeaders 是用来处理请求头的。

然后在 在请求头里找 Connection 参数,看是否为为 upgrade,如果是则进入 HTTP 升级步骤。

然后就执行调用 prepareRequest() 方法来对请求就行初步处理,也就是针对请求头里的一些属性加入一些 InputFilter 到 Http11InputBuffer 里。比如解析请求头里的 host,transfer-encoding,content-length 等。

最后就调用 Adapter 的方法进行处理了,也就是

getAdapter().service(request, response);

getAdapter() 就是上面提到的 CoyoteAdapter。关于 CoyoteAdapter#service 方法,会在后面的文章里单独解析,这里就不多做描述了。

调用完这个方法后就是一些收尾工作了,service 方法比较长,逻辑也比较复杂,本文在此省略了很多不关键的地方。

小结

本文介绍了 Http11Processor 的 process 方法是怎么处理请求的,其中分为 dispatch 和 service 方法来分别对不同类型的 HTTP 请求做处理。而在 dispatch 和 service 方法里,关键的地方就是调用 Adapter 的相关方法。

原文 

https://segmentfault.com/a/1190000022207628

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Tomcat源码解析系列(十六)Http11Processor

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址