转载

Dubbo RPC在Provider端是如何跑起来的

戳蓝字「TopCoder 」关注我们哦!

Dubbo RPC在Provider端是如何跑起来的

dubbo RCP请求到达provider后,首先经过数据接收、解码(NettyWokerThread/NioEventLoop),然后传递到RPC后续流程(DubboServerHandler),即filter、service invoke过程,service invoke过程也就是执行真正服务的逻辑,执行完毕后再经过编码作为响应返回给RPC调用者。

在分析之前,首先来看一下整体的RPC流程:

Dubbo RPC在Provider端是如何跑起来的

处理流程

NettyServerHandler 是dubbo在netty设置的 ChannelHandlerNettyServerHandler 的处理逻辑是将请求message提交给 DubboServerHandler 线程池。

// MultiMessageHandler
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        handler.received(channel, message);
    }
}

// AllChannelHandler
public void received(Channel channel, Object message) throws RemotingException {
    // 提交给DubboServerHandler线程池
    ExecutorService cexecutor = getExecutorService();
    cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}

ChannelEventRunnable 事件的处理逻辑是 org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run ,事件类型包括连接、断开、发送、异常等:

public enum ChannelState {
    CONNECTED,
    DISCONNECTED,
    SENT,
    RECEIVED,
    CAUGHT
}

RPC执行

下面是调用HelloServiceImpl的一个调用栈信息:

Dubbo RPC在Provider端是如何跑起来的

Filter执行

进入DubboServerHandler线程池的流程首先就是dubbo中各个filter, EchoFilter、ClassLoaderFilter、ContextFilter、TraceFilter、TimeoutFilter、MonitorFilter

// EchoFilter
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
    if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
        // 是否是echo
        return new RpcResult(inv.getArguments()[0]);
    }
    return invoker.invoke(inv);
}

// ClassLoaderFilter
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    ClassLoader ocl = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
    try {
        return invoker.invoke(invocation);
    } finally {
        Thread.currentThread().setContextClassLoader(ocl);
    }
}

// GenericFilter
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
    return invoker.invoke(inv);
}

// ContextFilter
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    Map<String, String> attachments = invocation.getAttachments();
    RpcContext.getContext()
            .setInvoker(invoker)
            .setInvocation(invocation)
            .setLocalAddress(invoker.getUrl().getHost(),
                    invoker.getUrl().getPort());

    // merged from dubbox
    // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
    if (attachments != null) {
        if (RpcContext.getContext().getAttachments() != null) {
            RpcContext.getContext().getAttachments().putAll(attachments);
        } else {
            RpcContext.getContext().setAttachments(attachments);
        }
    }

    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(invoker);
    }
    try {
        return invoker.invoke(invocation);
    } finally {
        // IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
        RpcContext.removeContext();
        RpcContext.removeServerContext();
    }
}

Filter的包装类如下所示,它会将各个filter依次串起来:

new Invoker<T>() {
    @Override
    public Class<T> getInterface() {
        return invoker.getInterface();
    }
    @Override
    public URL getUrl() {
        return invoker.getUrl();
    }
    @Override
    public boolean isAvailable() {
        return invoker.isAvailable();
    }
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // 调用各个filter,filter组成了一个filter链,最后一个会调用服务的方法
        return filter.invoke(next, invocation);
    }
    @Override
    public void destroy() {
        invoker.destroy();
    }
    @Override
    public String toString() {
        return invoker.toString();
    }
};

RPC反射调用

RPC反射调用流程:

// AbstractProxyInvoker
public Result invoke(Invocation invocation) throws RpcException {
    try {
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        CompletableFuture<Object> future = wrapWithFuture(value, invocation);
        CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
            AppResponse result = new AppResponse();
            if (t != null) {
                if (t instanceof CompletionException) {
                    result.setException(t.getCause());
                } else {
                    result.setException(t);
                }
            } else {
                result.setValue(obj);
            }
            return result;
        });
        return new AsyncRpcResult(appResponseFuture, invocation);
    } catch (InvocationTargetException e) {
    } catch (Throwable e) {
    }
}

// JavassistProxyFactory
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

// provider服务类
public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello(String name) {
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
        return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
    }
}

通过反射执行,就到达了业务处理逻辑,获取到返回结果之后,会组装 AppResponse 结果返回给客户端。

Filter链初始化

Filter链的初始化是在初始化线程模型之前就已完成的:

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}

// ChannelHandlers
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

从上述代码来看,handler已经在初始化NettyServer时构建完成了,继续往上跟,代码如下:

public RemotingServer bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    // DecodeHandler -> HeaderExchangeHandler -> 责任链handler
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
// DubboProtocol
private ProtocolServer createServer(URL url) {
    url = URLBuilder.from(url)
            // send readonly event when server closes, it's enabled by default
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // enable heartbeat by default
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            .addParameter(CODEC_KEY, DubboCodec.NAME)
            .build();
    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

    ExchangeServer server = Exchangers.bind(url, requestHandler);
    return new DubboProtocolServer(server);
}

requestHandler对应的是org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler,其实现了ChannelHandler接口(可处理各种事件),针对请求来说,对应代码如下:

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    Invocation inv = (Invocation) message;
    // 获取invocation对应的处理链
    Invoker<?> invoker = getInvoker(channel, inv);

    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    Result result = invoker.invoke(inv);
    return result.thenApply(Function.identity());
}

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    boolean isStubServiceInvoke = false;
    int port = channel.getLocalAddress().getPort();
    String path = inv.getAttachments().get(PATH_KEY);

    // 根据服务dubbo key(name+version等)获取对应的处理链
    String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    return exporter.getInvoker();
}

也就是说,处理链的获取是从provider端服务列表-处理链 仓库中获取的,该仓库是在dubbo provider启动流程中初始化的,关于这块不是本文的分析重点,因此不再赘述。

原文  http://mp.weixin.qq.com/s?__biz=MzIwNTI2ODY5OA==&mid=2649939037&idx=1&sn=8782aa6a9e83dd35acb79e258a08fd15
正文到此结束
Loading...