转载

Spring Cloud Netflix Zuul源码分析之请求处理篇-下

微信公众号:如有问题或建议,请在下方留言;

最近更新:2019-01-03

前言

因篇幅原因,上一部分内容请看: Spring Cloud Netflix Zuul源码分析之请求处理篇-上

PreDecorationFilter

该类的作用就是查找对应的路由信息,获取后端微服务的地址,保存到请求上下文,提供给路由过滤器使用。

简化版run方法

 1@Override
 2public Object run() {
 3    RequestContext ctx = RequestContext.getCurrentContext();
 4    final String requestURI = this.urlPathHelper.getPathWithinApplication(ctx.getRequest());
 5    // 根据URI从路由规则里获取对应的路由
 6    Route route = this.routeLocator.getMatchingRoute(requestURI);
 7    if (route != null) {
 8        // 将获取到路由信息放入请求上下文
 9    } else{
10        // 找不到路由,则fallback到DispatcherServlet
11    }
12}
复制代码

查找路由

时序图

Spring Cloud Netflix Zuul源码分析之请求处理篇-下
查找路由时序图

源码

这里特地列出简单URL是如何查找路由的源码,请看:

 1// SimpleRouteLocator
 2protected ZuulRoute getZuulRoute(String adjustedPath) {
 3    if (!matchesIgnoredPatterns(adjustedPath)) {
 4        for (Entry<String, ZuulRoute> entry : getRoutesMap().entrySet()) {
 5            String pattern = entry.getKey();
 6            log.debug("Matching pattern:" + pattern);
 7            // 利用正则表达式进行path匹配,找到对应的路由
 8            if (this.pathMatcher.match(pattern, adjustedPath)) {
 9                return entry.getValue();
10            }
11        }
12    }
13    return null;
14}
复制代码

断点图

经过该过滤器执行后,请求上下文RequestContext内容为:

Spring Cloud Netflix Zuul源码分析之请求处理篇-下
前置过滤器请求上下文

SimpleHostRoutingFilter

该过滤器的作用是调用原生httpClient发送请求到后端微服务,解析响应结果,写入请求上下文,提供给后置过滤器使用。

执行

 1@Override
 2public Object run() {
 3    RequestContext context = RequestContext.getCurrentContext();
 4    HttpServletRequest request = context.getRequest();
 5    // 根据请求构建Zuul请求的Headers
 6    MultiValueMap<String, String> headers = this.helper
 7            .buildZuulRequestHeaders(request);
 8    // 根据请求构建Zuul请求的queryParams
 9    MultiValueMap<String, String> params = this.helper
10            .buildZuulRequestQueryParams(request);
11    String verb = getVerb(request);
12    InputStream requestEntity = getRequestBody(request);
13    if (getContentLength(request) < 0) {
14        context.setChunkedRequestBody();
15    }
16    // 根据请求构建Zuul请求的URI
17    String uri = this.helper.buildZuulRequestURI(request);
18    this.helper.addIgnoredHeaders();
19
20    try {
21        // 调用原生httpClient转发请求
22        CloseableHttpResponse response = forward(this.httpClient, verb, uri, request,
23                headers, params, requestEntity);
24        // 保存响应结果
25        setResponse(response);
26    }
27    catch (Exception ex) {
28        throw new ZuulRuntimeException(handleException(ex));
29    }
30    return null;
31}
复制代码

转发请求

 1private CloseableHttpResponse forward(CloseableHttpClient httpclient, String verb,
 2        String uri, HttpServletRequest request, MultiValueMap<String, String> headers,
 3        MultiValueMap<String, String> params, InputStream requestEntity)
 4        throws Exception {
 5    Map<String, Object> info = this.helper.debug(verb, uri, headers, params,
 6            requestEntity);
 7    // routeHost就是在前置过滤器PreDecorationFilter中添加的
 8    URL host = RequestContext.getCurrentContext().getRouteHost();
 9    // 创建HttpHost,指定请求目标地址
10    HttpHost httpHost = getHttpHost(host);
11    uri = StringUtils.cleanPath(MULTIPLE_SLASH_PATTERN.matcher(host.getPath() + uri).replaceAll("/"));
12    long contentLength = getContentLength(request);
13
14    ContentType contentType = null;
15
16    if (request.getContentType() != null) {
17        contentType = ContentType.parse(request.getContentType());
18    }
19
20    InputStreamEntity entity = new InputStreamEntity(requestEntity, contentLength,
21            contentType);
22
23    // 创建HttpRequest
24    HttpRequest httpRequest = buildHttpRequest(verb, uri, entity, headers, params,
25            request);
26    try {
27        log.debug(httpHost.getHostName() + " " + httpHost.getPort() + " "
28                + httpHost.getSchemeName());
29        // 调用原生httpClient发送请求
30        CloseableHttpResponse zuulResponse = forwardRequest(httpclient, httpHost,
31                httpRequest);
32        this.helper.appendDebug(info, zuulResponse.getStatusLine().getStatusCode(),
33                revertHeaders(zuulResponse.getAllHeaders()));
34        return zuulResponse;
35    }
36    finally {
37        // When HttpClient instance is no longer needed,
38        // shut down the connection manager to ensure
39        // immediate deallocation of all system resources
40        // httpclient.getConnectionManager().shutdown();
41    }
42}
43
44private CloseableHttpResponse forwardRequest(CloseableHttpClient httpclient,
45        HttpHost httpHost, HttpRequest httpRequest) throws IOException {
46    return httpclient.execute(httpHost, httpRequest);
47}
复制代码

保存响应

 1private void setResponse(HttpResponse response) throws IOException {
 2    RequestContext.getCurrentContext().set("zuulResponse", response);
 3    this.helper.setResponse(response.getStatusLine().getStatusCode(),
 4            response.getEntity() == null ? null : response.getEntity().getContent(),
 5            revertHeaders(response.getAllHeaders()));
 6}
 7// ProxyRequestHelper
 8public void setResponse(int status, InputStream entity,
 9        MultiValueMap<String, String> headers) throws IOException {
10    RequestContext context = RequestContext.getCurrentContext();
11    context.setResponseStatusCode(status);
12    if (entity != null) {
13        context.setResponseDataStream(entity);
14    }
15
16    boolean isOriginResponseGzipped = false;
17    for (Entry<String, List<String>> header : headers.entrySet()) {
18        String name = header.getKey();
19        for (String value : header.getValue()) {
20            context.addOriginResponseHeader(name, value);
21
22            if (name.equalsIgnoreCase(HttpHeaders.CONTENT_ENCODING)
23                    && HTTPRequestUtils.getInstance().isGzipped(value)) {
24                isOriginResponseGzipped = true;
25            }
26            if (name.equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) {
27                context.setOriginContentLength(value);
28            }
29            if (isIncludedHeader(name)) {
30                context.addZuulResponseHeader(name, value);
31            }
32        }
33    }
34    context.setResponseGZipped(isOriginResponseGzipped);
35}
复制代码

断点图

Spring Cloud Netflix Zuul源码分析之请求处理篇-下
路由过滤器请求上下文

SendResponseFilter

该过滤器的作用是将请求上下文里的响应信息写入到响应内容,返回给请求客户端。

执行

 1@Override
 2public Object run() {
 3    try {
 4        addResponseHeaders();
 5        writeResponse();
 6    }
 7    catch (Exception ex) {
 8        ReflectionUtils.rethrowRuntimeException(ex);
 9    }
10    return null;
11}
复制代码

添加响应Header

 1private void addResponseHeaders() {
 2    RequestContext context = RequestContext.getCurrentContext();
 3    HttpServletResponse servletResponse = context.getResponse();
 4    if (this.zuulProperties.isIncludeDebugHeader()) {
 5        @SuppressWarnings("unchecked")
 6        List<String> rd = (List<String>) context.get(ROUTING_DEBUG_KEY);
 7        if (rd != null) {
 8            StringBuilder debugHeader = new StringBuilder();
 9            for (String it : rd) {
10                debugHeader.append("[[[" + it + "]]]");
11            }
12            servletResponse.addHeader(X_ZUUL_DEBUG_HEADER, debugHeader.toString());
13        }
14    }
15    List<Pair<String, String>> zuulResponseHeaders = context.getZuulResponseHeaders();
16    if (zuulResponseHeaders != null) {
17        for (Pair<String, String> it : zuulResponseHeaders) {
18            servletResponse.addHeader(it.first(), it.second());
19        }
20    }
21    if (includeContentLengthHeader(context)) {
22        Long contentLength = context.getOriginContentLength();
23        if(useServlet31) {
24            servletResponse.setContentLengthLong(contentLength);
25        } else {
26            //Try and set some kind of content length if we can safely convert the Long to an int
27            if (isLongSafe(contentLength)) {
28                servletResponse.setContentLength(contentLength.intValue());
29            }
30        }
31    }
32}
复制代码

写出响应

 1private void writeResponse() throws Exception {
 2    RequestContext context = RequestContext.getCurrentContext();
 3    // there is no body to send
 4    if (context.getResponseBody() == null
 5            && context.getResponseDataStream() == null) {
 6        return;
 7    }
 8    HttpServletResponse servletResponse = context.getResponse();
 9    if (servletResponse.getCharacterEncoding() == null) { // only set if not set
10        servletResponse.setCharacterEncoding("UTF-8");
11    }
12
13    OutputStream outStream = servletResponse.getOutputStream();
14    InputStream is = null;
15    try {
16        if (context.getResponseBody() != null) {
17            String body = context.getResponseBody();
18            is = new ByteArrayInputStream(
19                            body.getBytes(servletResponse.getCharacterEncoding()));
20        }
21        else {
22            is = context.getResponseDataStream();
23            if (is!=null && context.getResponseGZipped()) {
24                // if origin response is gzipped, and client has not requested gzip,
25                // decompress stream before sending to client
26                // else, stream gzip directly to client
27                if (isGzipRequested(context)) {
28                    servletResponse.setHeader(ZuulHeaders.CONTENT_ENCODING, "gzip");
29                }
30                else {
31                    is = handleGzipStream(is);
32                }
33            }
34        }
35
36        if (is!=null) {
37            writeResponse(is, outStream);
38        }
39    }
40    finally {
41        if (is != null) {
42            try {
43                is.close();
44            }
45            catch (Exception ex) {
46                log.warn("Error while closing upstream input stream", ex);
47            }
48        }
49
50        try {
51            Object zuulResponse = context.get("zuulResponse");
52            if (zuulResponse instanceof Closeable) {
53                ((Closeable) zuulResponse).close();
54            }
55            outStream.flush();
56            // The container will close the stream for us
57        }
58        catch (IOException ex) {
59            log.warn("Error while sending response to client: " + ex.getMessage());
60        }
61    }
62}
63
64private void writeResponse(InputStream zin, OutputStream out) throws Exception {
65    byte[] bytes = buffers.get();
66    int bytesRead = -1;
67    while ((bytesRead = zin.read(bytes)) != -1) {
68        out.write(bytes, 0, bytesRead);
69    }
70}
复制代码

小结

通过上述分析,简单URL请求,Zuul做的哪些事情,就一目了然了:

  • 根据请求URI正则匹配路由配置规则,找到后端微服务的具体地址
  • 调用原生httpClient发送请求到后端微服务
  • 解析后端微服务的响应结果,返回给请求方

不知道大家有没有注意到,从始至终有一个类一直贯穿整个处理的过程。谁?Filter?不,是RequestContext。ZuulServlet处理过程就是靠它在前置、路由、后置各过滤器间实现信息传递,由此可见它的重要性。接下来,我们就走进RequestContext,揭开这位“信使”的神秘“面纱”。

RequestContext

既然RequestContext用来传递信息,那么它的正确性就必须得保证。在多线程情况下,如何做到这一点呢?请往下看:

 1public class RequestContext extends ConcurrentHashMap<String, Object> {
 2
 3    private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class);
 4
 5    protected static Class<? extends RequestContext> contextClass = RequestContext.class;
 6
 7    private static RequestContext testContext = null;
 8
 9    protected static final ThreadLocal<? extends RequestContext> threadLocal = new ThreadLocal<RequestContext>() {
10        // 覆盖ThreadLocal里initialValue方法,设置ThreadLocal的值
11        @Override
12        protected RequestContext initialValue() {
13            try {
14                return contextClass.newInstance();
15            } catch (Throwable e) {
16                throw new RuntimeException(e);
17            }
18        }
19    };
20}
复制代码

是不是明白了一些,对,就是这两大并发神器:ConcurrentHashMap和ThreadLocal。前面提到的几个过滤器里,使用RequestContext时,都是如下写法:

1RequestContext context = RequestContext.getCurrentContext();
2context.set***();
3context.get***();
复制代码

getCurrentContext()就是入口:

1public static RequestContext getCurrentContext() {
2    if (testContext != null) return testContext;
3
4    RequestContext context = threadLocal.get();
5    return context;
6}
复制代码

从ThreadLocal里获取RequestContext:

 1public T get() {
 2    Thread t = Thread.currentThread();
 3    ThreadLocalMap map = getMap(t);
 4    if (map != null) {
 5        ThreadLocalMap.Entry e = map.getEntry(this);
 6        if (e != null) {
 7            @SuppressWarnings("unchecked")
 8            T result = (T)e.value;
 9            return result;
10        }
11    }
12    return setInitialValue();
13}
14
15private T setInitialValue() {
16    // 覆盖该方法,设置ThreadLocal的值
17    T value = initialValue();
18    Thread t = Thread.currentThread();
19    ThreadLocalMap map = getMap(t);
20    if (map != null)
21        map.set(this, value);
22    else
23        createMap(t, value);
24    return value;
25}
复制代码

通过覆盖ThreadLocal的initialValue方法,首次调用时设置值,后续调用判断当前线程已经绑定了值,则直接返回。这样就保证了不同的请求都有自己的RequestContext。因为RequestContext继承自ConcurrentHashMap,是一个线程安全的map,从而保证了并发下的正确性。

补充:这里只是简单讲解了ThreadLocal和ConcurrentHashMap,不在本文中详细展开,后续会写文章去做深入分析。

总结

到这里,我们就讲完了一个简单URL请求在Zuul中整个处理过程。写作过程中,笔者一直在思考,如何行文能让大家更好的理解。虽然修改了很多次,但是还是觉得不够完美,只能一边写一边总结一边改进。希望大家多多留言,给出意见和建议,那笔者真是感激不尽!!!最后,感谢大家的支持,祝新年快乐,祁琛,2019年1月3日。

原文  https://juejin.im/post/5c2db1f5f265da61682b7e5d
正文到此结束
Loading...