转载

Spring Cloud Alibaba集成ElasticAPM实战

继上一篇ElasticAPM初体验我们知道了什么是 可观察性 ,并领略了 ElasticAPM 的强大功能,但是仅仅是上篇文章中单机模式的使用时远远不够的。还记得上一篇最后提出的两个问题:

1、本文在单机版的环境中,测试通过,但是在分布式环境中,请求会串联起很多应用,那服务跟踪能否实现?实现的原理是什么?

2、Elastic APM可以自动采集http请求,在PRC分布式环境中,Elastic APM能否正常工作?是否必须采用 public API 来实现?

重点是 分布式RPC ,即在分布式情况下,ElasticAPM能否良好工作?在RPC环境下,ElasticAPM是不是也能正常工作呢?

先说答案: 在默认情况下,ElasticAPM能够支持分布式的Http方式调用,但是不支持RPC协议 。但是很多公司都采用RPC协议作为其内部系统的通信协议,比如我司就采用Spring Cloud Alibaba作为搜索服务的框架,框架内应用的通信是借助RPC框架Dubbo来实现的。所以问题就变成了如何把 ElasticAPM 集成进Spring Cloud Alibaba中。

架构讲解&问题分析

首先,我先大概图示下Spring Cloud Alibaba和ElasticAPM的架构和工作流程。

如架构图所示,搜索系统分为了 网关应用(Gateway)US应用AS应用BS应用 ,用户的请求会先到达网关,网关会把请求,以Http协议转发给US应用,US应用会采用Dubbo协议调用AS应用,AS应用采用Dubbo协议调用BS应用。

Request--- http ---> US --- RPC ----> AS ----- RPC ------> BS

每一个应用启动的时候都已经集成了Apm-agent(如果不知道怎么集成请参考ElasticAPM初体验),如果APM-agent默认支持 Dubbo 就完美了(但是并没有)。所以整个链路追踪,到了US之后,就没有上报之后应用的锚点数据。在查看ElasticAPM官方文档的时候,我注意到了 Public API ,文档中交代了这样一件事情:

The public API of the Elastic APM Java agent lets you customize and manually create spans and transactions, as well as track errors.

没错,你可以自定义 SpanTransaction ,如果不懂什么是 SpanTransaction 请参考ElasticAPM初体验或直接读一遍 官方文档 。既然Agent默认不支持Dubbo,那么我们使用Public API来实现功能。

Spring Cloud Alibaba集成ElasticAPM实战

设计思想

基于Spring Cloud Alibaba的架构,我们可以如下图方式实现。

  • 首先用户的请求一定要经过微服务网关,在网关的过滤器中,首先埋入父级 Transaction
  • 请求经过网关,会被网关转发到第一层应用中,注意这次转发是 http 请求,如果是用SpringMVC实现的话,需要在 Controller 处,上报 子Transaction
  • 请求被第一层应用处理之后,下层的应用全部是 Dubbo 协议的。这时可以采用Dubbo的过滤器机制,对 ConcumerProvider 都进行拦截,通过这种方式做到不侵入业务代码。
  • 最终,请求返回到微服务网关,调用 transaction.end() 上报根 Transaction
  • 所有流程完毕。
Spring Cloud Alibaba集成ElasticAPM实战

核心实现讲解

微服务网关

微服务网关需要做这样几件事情:

  • 开启根 Transaction
  • POST 请求body中增加追踪ID, GET 请求 Parameter 中增加追踪ID
  • 在请求返回之后调用 transaction.end() 完成上报
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        HttpMethod httpMethod = exchange.getRequest().getMethod();
				//第一步 开启一个Transaction
        Transaction transaction = ElasticApm.startTransaction();
        transaction.setName("mainSearch");
        transaction.setType(Transaction.TYPE_REQUEST);
				//第二步 创建Span
        Span span = transaction.startSpan("gateway", "filter", "gateway action");
        span.setName("com.mfw.search.gateway.filter.PostBodyCacheFilter#filter");

        LOGGER.info("APM埋点成功transactionId:{}", transaction.getId());
        //第三步 判定Http请求是POST还是GET
        if (HttpMethod.POST.equals(httpMethod)) {
            ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
            MediaType mediaType = exchange.getRequest().getHeaders().getContentType();
            //第四步 定义Http body的处理逻辑
            Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> {            //判定body类型
                if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {

                    //重要!获取到了body的数据,传给callback函数,做业务逻辑处理
                    Map<String, String> bodyMap = decodeBody(body);
                    //设置最新的bodyMap进入exchange
                    exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap);
                    //重点!动态增加body的transaction标记,为下游应用Controller使用
                    span.injectTraceHeaders((name, value) -> {
                        bodyMap.put(name, value);
                        LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value);
                    });
                    //不要忘记span.end()否则会丢失上报
                    span.end();
                    return Mono.just(encodeBody(bodyMap));
                } else if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
                    // origin body map
                    Map<String, String> bodyMap = decodeJsonBody(body);
                    exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap);
                    //重点!动态增加body的transaction标记,为下游应用Controller使用
                    span.injectTraceHeaders((name, value) -> {
                        bodyMap.put(name, value);
                        LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value);
                    });
                    span.end();
                    return Mono.just(encodeJsonBody(bodyMap));
                }
                return Mono.empty();
            });

            BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
            HttpHeaders headers = new HttpHeaders();
            headers.putAll(exchange.getRequest().getHeaders());

            // the new content type will be computed by bodyInserter
            // and then set in the request decorator
            headers.remove(HttpHeaders.CONTENT_LENGTH);

            CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
            return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
                ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {

                    public HttpHeaders getHeaders() {
                        long contentLength = headers.getContentLength();
                        HttpHeaders httpHeaders = new HttpHeaders();
                        httpHeaders.putAll(super.getHeaders());
                        if (contentLength > 0) {
                            httpHeaders.setContentLength(contentLength);
                        } else {
                            httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                        }
                        return httpHeaders;
                    }

                    public Flux<DataBuffer> getBody() {
                        return outputMessage.getBody();
                    }
                };
                //第五步 在请求返回之后,上报transaction
                return chain.filter(exchange.mutate().request(decorator).build()).then(Mono.fromRunnable(() -> transaction.end()));
            }));
        } else if (HttpMethod.GET.equals(httpMethod)) {
            span.injectTraceHeaders((name, value) -> {
                exchange.getRequest().getQueryParams().set(name, transaction.getId());
                LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value);
            });
            return chain.filter(exchange).then(Mono.fromRunnable(() -> {
                span.end();
                transaction.end();
                LOGGER.info("APM买点完成,transactionId:{}", transaction.getId());
            }));
        } else {
            //not support other Http Method
            exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE);
            return exchange.getResponse().setComplete();
        }

    }
复制代码

Controller

Controller层的实现采用了SpringAOP方式实现,这样的好处是对业务代码不侵入,可扩展性高,对想要监控的方法直接配置上 @TransactionWithRemoteParent() 即可。

如下代码是通过 @TransactionWithRemoteParent() 实现对Controller方法的上报。

@PostMapping(value = "/search", consumes = "application/json", produces = "application/json")
@TransactionWithRemoteParent()
public String searchForm(@RequestBody String req) {
    String result = asService.helloAs(req);
    return result;
}
复制代码

AOP实现

@Aspect
public class ApmAspect {

    private static final Logger LOGGER = LoggerFactory.getLogger(ApmAspect.class);

    @PostConstruct
    private void init() {
        LOGGER.info("ApmAspect加载完毕");
    }


    @Pointcut(value = "@annotation(transactionWithRemoteParent)", argNames = "transactionWithRemoteParent")
    public void pointcut(TransactionWithRemoteParent transactionWithRemoteParent) {

    }

    @Around(value = "pointcut(transactionWithRemoteParent)", argNames = "joinPoint,transactionWithRemoteParent")
    public Object around(ProceedingJoinPoint joinPoint, TransactionWithRemoteParent transactionWithRemoteParent) throws Throwable {
        Transaction transaction = null;
        try {
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
                String httpRequest = (String) joinPoint.getArgs()[0];
                JSONObject json = JSON.parseObject(httpRequest);
                String traceId = json.getString(key);
                LOGGER.info("切面添加了子Transaction,key={},value={}", key, traceId);
                RpcContext.getContext().setAttachment(key, traceId);
                return traceId;
            });
            transaction.setName(StringUtils.isNotBlank(transactionWithRemoteParent.name())
                    ? transactionWithRemoteParent.name() : signature.getName());
            transaction.setType(Transaction.TYPE_REQUEST);
            return joinPoint.proceed();
        } catch (Throwable throwable) {
            if (transaction != null) {
                transaction.captureException(throwable);
            }
            throw throwable;
        } finally {
            if (transaction != null) {
                LOGGER.info("切面执行完毕,上报Transaction:{}", transaction.getId());
                transaction.end();
            }
        }
    }
}
复制代码

Dubbo过滤器

如下代码是DubboConsumer过滤器,专门用于处理APM。DubboProvider的实现类似。

@Activate(group = "consumer")
public class DubboConsumerApmFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(DubboConsumerApmFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
            String traceId = invocation.getAttachment(key);
            LOGGER.info("key={},value={}", key, traceId);
            return traceId;
        });
        try (final Scope scope = transaction.activate()) {
            String name = "consumer:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName();
            transaction.setName(name);
            transaction.setType(Transaction.TYPE_REQUEST);

            Result result = invoker.invoke(invocation);

            return result;
        } catch (Exception e) {
            transaction.captureException(e);
            throw e;
        } finally {
            transaction.end();
        }
    }
}

@Activate(group = "provider")
public class DubboProviderApmFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // use startTransactionWithRemoteParent to create transaction with parent, which id from prc context
        Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> invocation.getAttachment(key));

        try (final Scope scope = transaction.activate()) {
            String name = "provider:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName();
            transaction.setName(name);
            transaction.setType(Transaction.TYPE_REQUEST);
            return invoker.invoke(invocation);
        } catch (Exception e) {
            transaction.captureException(e);
            throw e;
        } finally {
            transaction.end();
        }
    }
}
复制代码
原文  https://juejin.im/post/5e1c0883f265da3e4736b2aa
正文到此结束
Loading...