注:文章中使用的 Dubbo 源码版本为2.5.4
在 Dubbo剖析:二 服务引用 中讲到,服务引用方根据引用接口 DemoService ,使用dubbo的代理工厂类 JavassistProxyFactory.getProxy() 创建出该接口的动态代理对象。
当用户想调用 DemoService 的相关方法时,实际是调用了代理对象的相关方法,从 InvokerInvocationHandler.invoke() 进入 Consumer 请求发送流程。
上图从上往下展示了服务引用方发送一个RPC请求的关键步骤,经历了“代理层”、“集群层”、“过滤监听扩展点”、“调用协议层”、“信息交换层”、“网络传输层”。
紫色实线条表示各层关键类的方法调用,蓝色虚线表示关键类的初始化过程。
!接收请求流程图
上图从上往下表示了服务提供方接收到一个网络请求时的处理步骤,经历了一个Handler处理器链,链中的每个Handler负责实现自己的处理功能。
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
public class FixedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
注意点:
a)线程池默认业务线程数为200
b)队列默认采用SynchronousQueue
if (message instanceof Request) {
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
//case a: 请求响应模型的请求处理
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
}
//case b: 单向消息接收的处理
else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//case c: 请求响应模型的响应处理
handleResponse(channel, (Response) message);
}
a)请求响应模型的Request消息:调用ExchangeHandlerAdapter.reply()获取执行结果Result -->
将本地执行结果Result封装成RPC响应Response --> 通过channel.send()发送RPC响应;
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
Object msg = req.getData();
try {
// 调用```ExchangeHandlerAdapter.reply()```获取执行结果```Result```
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
//将本地执行结果```Result```封装成RPC响应```Response```
return res;
}
b)单向请求消息的处理:调用ExchangeHandlerAdapter.received()处理请求消息,如果该消息是Invocation则执行reply()逻辑但不主动发送RPC响应Response;
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
c)请求响应模型的Response消息:调用DefaultFuture.received()处理响应消息。
...注:请求响应模型(Request,Response,DufaultFuture)相关后续专门分析,此处不展开...
ExchangeHandlerAdapter 由 DubboProtocol 创建,并实现了 reply() 方法; reply() 方法,实际通过RPC调用参数 Invocation从DubboProtocol.exporterMap 中获取到对应的本地实现 DubboExporter --> 进而获取到对应的本地执行 AbstractProxyInvoker --> 最终通过 AbstractProxyInvoker.invoke() 方法,以反射的方式执行真正实现类的对应方法,完成RPC请求。 整体流程与 “ Provider 接收请求” 一样,唯一的区别是在 交换层请求响应处理器( HeaderExchangeHandler )步骤中会执行 “分支c:请求响应模型的Response消息” ,将 Response 交由 DefaultFuture 处理。