转载

Dubbo 服务端接收请求过程分析

接上个部分 (Dubbo 客户端调用链路过程分析) 讲到了客户端发送请求的过程,这个部分我们分析服务端接收请求并发送响应的过程。

在分析 服务暴露 的过程中,provider启动netty服务端的时候(NettyServer.doOpen),会在在ChannelPipeline链中加入了4个ChannelHandler。

- NettyCodecAdapter.InternalEncoder:编码器
- NettyCodecAdapter.InternalDecoder:解码器
- IdleStateHandler:心跳处理器
- NettyServerHandler:请求处理器
复制代码
  1. 在服务端接收客户端响应时,首先会经过解码器NettyCodecAdapter.InternalDecoder,然后经过NettyServerHandler.channelRead进行请求处理。
  2. 服务端接收请求处理之后,接着响应结果客户端,经过NettyServerHandler.write进行结果响应,然后经过NettyCodecAdapter.InternalEncoder编码器进行编码处理,最终响应给客户端。

接下来们将其拆分为处理请求和响应结果进行分析。

处理客户端请求

解码过程

解码过程不做具体分析

NettyCodecAdapter.InternalDecoder.decode
-->DubboCountCodec.decode
    -->ExchangeCodec.decode
        -->DubboCodec.decodeBody
复制代码

经过解码得到请求对象Request。

读取请求过程

1. NettyServerHandler.channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
复制代码

2. AbstractPeer.received

装饰角色,维护了closed状态

public void received(Channel ch, Object msg) throws RemotingException {
    // 如果通道已经关闭,则直接返回
    if (closed) {
        return;
    }
    handler.received(ch, msg);
}
复制代码

3. MultiMessageHandler.received

对多消息的处理。

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        //如果消息是MultiMessage类型的,做下类型转换
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            遍历发送
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }
复制代码

4. HeartbeatHandler.received

对心跳事件做了处理。 如果不是心跳请求,那么接下去走到AllChannelHandler的received。否则直接回复响应,不再继续往下走。

public void received(Channel channel, Object message) throws RemotingException {
        setReadTimestamp(channel);
        //是否属于心跳的请求
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(Response.HEARTBEAT_EVENT);
                channel.send(res);
            }
            return;
        }
        //是否属于心跳的响应
        if (isHeartbeatResponse(message)) {
            return;
        }
        handler.received(channel, message);
    }
复制代码

5. AllChannelHandler.received

这里io事件的派发策略和客户端接收响应结果逻辑一样,这里不再赘述。

将接收到的消息分发到线程池,线程池名称为:DubboServerHandler-10.204.246.187:20880-thread-。

提交给线程的任务是:ChannelEventRunnable

public void received(Channel channel, Object message) throws RemotingException {
        //获取处理线程
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //提交线程失败,可以确定是线程池满了,需要将该提示响应给客户端
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
        		if(request.isTwoWay()){
        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
复制代码

接下来的逻辑则是利用线程池去针对请求做了异步处理

6. ChannelEventRunnable.run

处理不同状态的请求,仅仅只是一个中转,针对不同状态交给指定的方法处理。 RECEIVED、CONNECTED、DISCONNECTED、SENT、CAUGHT

这里我们关注RECEIVED的处理链路。

public class ChannelEventRunnable implements Runnable {
        @Override
    public void run() {
        if (state == ChannelState.RECEIVED) {
                handler.received(channel, message);
        } else {
            switch (state) {
            case CONNECTED:
                    handler.connected(channel);
                break;
            case DISCONNECTED:
                try {
                   handler.disconnected(channel);
                break;
            case SENT:
                    handler.sent(channel, message);
                break;
            case CAUGHT:
                    handler.caught(channel, exception);
                break;
            default:
            }
        }

    }

}
复制代码

7. DecodeHandler.received

这里的解码主要是针对message对象是Decodeable对象的处理。前面以及解码为Request对象了,因此这里不会执行任何逻辑。

解码之后继续执行下一个handler的receive方法。

public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }
        if (message instanceof Request) {
            decode(((Request) message).getData());
        }
        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        handler.received(channel, message);
    }
复制代码

8. HeaderExchangeHandler.received

处理Request请求,并异步返回Response,这里也是provider返回响应的入口

该方法分3个过程来看:

1、分请求类型进行处理:根据messgage的类型做不同的处理,例如正常的Request请求、Resopnse、telnet命令请求。

2、处理Request:继续向后执行(交给DubboProtocol的reply),得到异步调用结果。

3、返回Response:将Request的调用id封装到Response,然后利用异步回调将结果封装到 Response 对象中,同时利用channel.send(res)方法将该结果发送给客户端(关于发送响应结果的过程会在下个部分进行分析);

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        //设置时间戳
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        //获取通道
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                // 处理事件
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    // 双向通信
                    if (request.isTwoWay()) {
                        handleRequest(exchangeChannel, request);
                    } else {
                    //如果是单向通信,仅向后调用指定服务即可,无需返回调用结果 handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
    
复制代码
//处理请求
     void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        //封装resopnse(把requests的id封装到resopnse了)
        Response res = new Response(req.getId(), req.getVersion());
        //如果请求被破坏了, 响应异常
        if (req.isBroken()) {
            ...
            ...
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
            channel.send(res);
            return;
        }
        // 正常,取得请求数据,也就是 RpcInvocation 对象
        Object msg = req.getData();
        try {
            //继续向下调用 返回一个future
            CompletionStage<Object> future = handler.reply(channel, msg);
            future.whenComplete((appResult, t) -> {
                try {
                    if (t == null) {
                        //设置调用结果状态为成功 res.setStatus(Response.OK);
                        res.setResult(appResult);
                    } else {
                        //如果服务调用有异常,则设置结果状态码为服务错误 res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    // 发送该响应
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                } finally {
                    // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
复制代码

9. DubboProtocol.requestHandler.reply

1、获取invoker; 关于获取invoker主要过程是:

  • 取得serivce对应的key
  • 根据key从exporterMap中获取对应的Export对象
  • 通过export获取invoker

2、进入invoker调用链;

3、返回执行结果CompletableFuture;

public class DubboProtocol extends AbstractProtocol {

      private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            Invocation inv = (Invocation) message;
            //获取invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.completionFuture().thenApply(Function.identity());
        }
        }
}
复制代码
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ...
        ...
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                    ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
        }

        return exporter.getInvoker();
    }
复制代码

服务端invoker调用链过程

这里大概梳理下调用链路:

ProtocolFilterWrapper.CallbackRegistrationInvoker.invoke
 ->EchoFilter.invoke
  ->ClassLoaderFilter.invoke
   ->GenericFilter.invoke
    ->ContextFilter.invoke
     ->TraceFilter.invoke
      ->TimeoutFilter.invoke
       ->MonitorFilter.invoke
        ->ExceptionFilter.invoke
         ->InvokerWrapper.invoke
          ->DelegateProviderMetaDataInvoker.invoke
           ->AbstractProxyInvoker.invoke
                                    
复制代码

关于经过的Filter这里不做说明,这里重点看下AbstractProxyInvoker.invoke方法:

public Result invoke(Invocation invocation) throws RpcException {
        try {
            //实际就是获取到代理类,然后调用对应的服务方法
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            //把返回结果用CompletableFuture包裹
            CompletableFuture<Object> future = wrapWithFuture(value, invocation);
            封装AsyncRpcResult
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
            future.whenComplete((obj, t) -> {
                AppResponse result = new AppResponse();
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                asyncRpcResult.complete(result);
            });
            return asyncRpcResult;
        } catch (InvocationTargetException e) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
复制代码

主要做了三件事情:

1. 获取代理类调用具体方法得到方法的返回;

2. 封装响应结果到CompletableFuture对象;

如果服务接口返回的就是CompletableFuture对象,则直接返回( Provider端异步执行 ),否则把服务接口同步返回的结果封装到CompletableFuture返回出去。

private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) {
        if (RpcContext.getContext().isAsyncStarted()) {
            return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
        } else if (value instanceof CompletableFuture) {
            return (CompletableFuture<Object>) value;
        }
        return CompletableFuture.completedFuture(value);
    }
复制代码

3. 将上一步得到CompletableFuture对象,通过异步通知将结果封装成AsyncRpcResult返回出去。

到这里服务端处理客户请求过程分析完成,接下来看如何将上一步得到的Response响应给客户端。

这里我用一个时序图表示从HeaderExchangeHandler到服务接口最终执行的调用链路:

Dubbo 服务端接收请求过程分析

响应结果给客户端

在上一部分HeaderExchangeHandler.received的接收过程中,我们知道在得到结果后会将其发送给客户端,因此我们从HeaderExchangeChannel.send方法开始分析

Dubbo 服务端接收请求过程分析

1. HeaderExchangeChannel.send

@Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
        }
        if (message instanceof Request
                || message instanceof Response
                || message instanceof String) {
            channel.send(message, sent);
        } else {
            Request request = new Request();
            request.setVersion(Version.getProtocolVersion());
            request.setTwoWay(false);
            request.setData(message);
            channel.send(request, sent);
        }
    }
复制代码

2. NettyChannel.send

这里通过调用netty的api向channel中异步写入结果。

public void send(Object message, boolean sent) throws RemotingException {
        // whether the channel is closed
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                // wait timeout ms
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
复制代码

接着则会经过NettyServerHandler的处理器以及编码器。NettyServerHandler.write方法将结果发送给客户端,后面的链路比较简单这里不做分析。

服务端的异步总结

1、首先服务端通过Netty接收到请求之后经过解码后派发给业务线程池(DubboServerHandler-ip:port-thread-)。这里是IO线程到业务线程的一次异步。

2、(==如果服务接口返回的是CompletableFuture==)则会异步将CompletableFuture的结果封装到AsyncRpcResult。

3、(==如果服务接口返回的是CompletableFuture==)AsyncRpcResult再异步执行channel.send();

4、channel.send()发送结果也是一次异步。

欲知更多,欢迎访问: silence.work/

原文  https://juejin.im/post/5f009e545188252e644ce7e6
正文到此结束
Loading...