介绍了Dubbo通信流程,跟着源码调试过来的,如果有问题还请各位大佬指出
服务暴露将做哪些事情?
服务引用将做哪些事情?
客户端请求
ConsumerProxyService -> Invoker【DubboInvoker】 -> Exchanger【HeaderExchangeClient】 -> Transporter【NettyClient】 -> 编码 -> SEND-TO-SERVER (创建了DefaultFuture,Request带唯一标识) 复制代码
服务端响应
解码 -> Transporter【NettyServer】-> 系列Handlers -> 线程池 -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback 复制代码
门面类,提供各种便捷方法,先通过SPI获取 Exchanger
,然后调用 Exchanger
的相关方法创建 ExchangeServer
、 ExchangeClient
SPI接口,默认实现类 HeaderExchanger
,提供了两个快捷方法创建 ExchangeServer
、 ExchangeClient
@SPI(HeaderExchanger.NAME)
public interface Exchanger {
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
复制代码
Server端使用,默认实现类 HeaderExchangeServer
,内部调用 Transporter
开启Server服务
public interface ExchangeServer extends Server {
Collection<ExchangeChannel> getExchangeChannels();
ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}
复制代码
Client端使用,默认实现类 HeaderExchangeClient
,核心 request
方法,内部调用 Transporter
发送请求
public interface ExchangeClient extends Client, ExchangeChannel {
}
复制代码
默认实现类 HeaderExchangeChannel
,作为 HeaderExchangeClient
的一个属性
门面类,提供各种便捷方法,先通过SPI获取 Transporter
,然后调用 Transporter
的相关方法创建 Server
、 Client
SPI接口,默认实现类 NettyTransporter
,提供了两个快捷方法创建 Server
、 Client
@SPI("netty")
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
复制代码
Server端使用,默认实现类 NettyServer
,用于开启Server服务,核心方法 doOpen
public class NettyServer extends AbstractServer implements Server {
}
复制代码
Client端使用,默认实现类 NettyClient
,核心 request
方法用于发送请求, doOpen
用于与服务端建立连接
public class NettyClient extends AbstractClient {
}
复制代码
DubboProtocol#export => DubboProtocol#openServer => DubboProtocol#createServer => Exchangers#bind => NettyServer#doOpen 复制代码
最终,在 NettyServer#doOpen
中通过Netty开启了一个Server端
DubboProtocol#createServer
=> Exchangers#bind(url, requestHandler)
=> HeaderExchanger#bind(url, requestHandler)
=> return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))
// Transporters#bind 语句可以拆解为
Transporters#bind
=> NettyTransporter#bind(url, handler)
=> return new NettyServer(url, handler)
=> NettyServer#doOpen【NettyServer构造函数中调用了doOpen方法】
复制代码
即 NettyServer
中的 hander
属性,最终指向的是 new DecodeHandler(new HeaderExchangeHandler(handler))
。最终Server端返回 HeaderExchangeServer
,然后在 NettyServer
的构造函数中,对 handle
其实还做了一些封装
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
复制代码
所以,最终 NettyServer
中的 hander
属性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler
调用链太长了,而且隐藏的非常深,重点省略了一些,在应用启动时为Reference对象生成Invoker时创建的
RegistryProtocol#doRefer => RegistryDirectory#subscribe => RegistryDirectory#toInvokers => ProtocolFilterWrapper#refer => AbstractProtocol#refer => DubboProtocol#protocolBindingRefer => DubboProtocol#getClients => DubboProtocol#getSharedClient => DubboProtocol#buildReferenceCountExchangeClientList => DubboProtocol#buildReferenceCountExchangeClient => DubboProtocol#initClient => Exchangers#connect => HeaderExchanger#connect => Transporters#connect => NettyTransporter#connect => NettyClient#<init> => NettyClient#doOpen 复制代码
最终,在 NettyClient#doOpen
中通过Netty与Server建立连接
Exchangers#connect
=> HeaderExchanger#connect(url, handler)
=> return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
// Transporters#connect 语句可以拆解为
Transporters#connect
=> NettyTransporter#connect(url, handler)
=> return new NettyClient(url, handler)
=> NettyClient#doOpen【NettyClient构造函数中调用了doOpen方法】
复制代码
即 NettyClient
中的 hander
属性,最终指向的是 new DecodeHandler(new HeaderExchangeHandler(handler))
。最终Client端返回 HeaderExchangeClient
,其中的 client
属性也对 NettyClient
做了包装处理
不过在 DubboProtocol#buildReferenceCountExchangeClient
方法中对 HeaderExchangeClient
包装了一层,最终Invoker中的Client类型是 ReferenceCountExchangeClient
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
复制代码
ReferenceCountExchangeClient
与 HeaderExchangeClient
没什么区别,只不过包装了一层,然后还有一个比较重要的属性 referenceCount
,用于记录客户端的个数?
调用方代理类 -> InvokerInvocationHandler#invoke -> MockClusterInvoker#invoke -> AbstractClusterInvoker#invoke【获取LoadBalance】 -> FailoverClusterInvoker#doInvoke【处理重试次数】 -> ProtocolFilterWrapper#invoke【处理Filter链路】 -> AbstractInvoker#invoke【设置Attachments参数】 -> DubboInvoker#doInvoke【Exchange交接层】 -> ReferenceCountExchangeClient#request -> HeaderExchangeClient#request -> HeaderExchangeChannel#request【return CompletableFuture】 -> AbstractPeer#send -> AbstractClient#send -> NettyChannel#send -> Channel#writeAndFlush【发消息给服务端】 复制代码
从 DubboInvoker#doInvoke
开始与Exchange层交互,核心代码如下
protected Result doInvoke(final Invocation invocation) throws Throwable {
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// return = false,即oneWay ,可以减少不必要的Future对象创建
if (isOneway) {
// send=true,即客户端发送之后再返回,否则直接返回
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
}
复制代码
ReferenceCountExchangeClient#request => HeaderExchangeClient#request => HeaderExchangeChannel#request 复制代码
// HeaderExchangeChannel.java
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
复制代码
在这个方法中,有以下几个需要注意的点:
Request
构造函数内部,会为 Request
生成一个递增唯一的ID,用于标识该请求 channel#send
调用过程中,涉及到 NettyChannel#getOrAddChannel
方法的调用, NettyChannel
中有一个 ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP
缓存,用于维护 io.netty.channel.Channel
和 NettyChannel
的关系 channel#send
调用过程中,最终会调用到 NettyChannel#send
方法,该方法真正的将消息发给Server端 DefaultFuture
是一个 CompletableFuture
// NettyChannel.java
public void send(Object message, boolean sent) throws RemotingException {
boolean success = true;
int timeout = 0;
try {
// 将消息发给Server
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
// 如果配置了 send=true 参数,客户端需要等待消息发出之后再返回
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
复制代码
从上面消息发送的流程中,好像没有看到对消息的编码工作,那是因为在Netty客户端初始化的时候,已经设置了编解码器
// NettyClient.java
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
复制代码
先经过编码器,即 InternalEncoder#encode
方法, InternalEncoder
实现了 MessageToByteEncoder
接口,该方法内部调用了 Codec2
的相关方法,而 Codec2
是一个SPI接口,默认实现 DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
}
}
复制代码
上面提到了 NettyServer
中的 hander
属性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandler
而 NettyServer
开启Server端的代码如下
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
复制代码
InternalDecoder#decode
方法, InternalDecoder
实现了 ByteToMessageDecoder
接口,该方法内部调用了 Codec2
的相关方法,而 Codec2
是一个SPI接口,默认实现 DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
}
}
复制代码
MultiMessageHandler
用于处理数组消息,如果是消息是 MultiMessage
类型, MultiMessage
实现了 Iterable
数组,则遍历调用handle的received方法;否则直接调用下一个handle的received方法 AllChannelHandler
收到消息,将 channel handler message
封装成state为 ChannelState.RECEIVED
类型的 ChannelEventRunnable
对象,然后交给线程池执行 ChannelEventRunnable#run
方法中判断state为 ChannelState.RECEIVED
类型,直接执行下一个handler的received方法,即 DecodeHandler
,这个过程是由线程池执行 DecodeHandler#received
方法中,如果消息是 Decodeable
类型,对整个消息进行解码;如果消息是 Request
类型,对 Request.getData()
进行解码;如果消息是 Response
类型,对 Response.getResult()
进行解码 HeaderExchangeHandler#received
-> HeaderExchangeHandler#handleRequest
-> requestHandler#reply
, requestHandler
是 DubboProtocol
中的一个属性, ExchangeHandlerAdapter
类型 HeaderExchangeHandler#handleRequest
中会创建一个 Response
对象,它的ID属性值,就是 Request
对象的ID值,这样请求和响应就关联起来了 requestHandler#reply
方法中,从 exporterMap
缓存中获取对应的 DubboExporter
对象,然后从 DubboExporter
获取 Invoker
,最后执行 Invoker#invoke
方法,然后返回一个 CompletableFuture
对象 HeaderExchangeHandler#handleRequest
方法中接收返回的 CompletableFuture
对象,对它添加回调处理,在回调中将返回结果封装到 Response
对象中,然后通过channel将 Response
发出 // ChannelEventRunnable.java
public void run() {
if (state == ChannelState.RECEIVED) {
try {
// RECEIVED 类型,直接执行下一个handle的received方法,即 DecodeHandler
handler.received(channel, message);
} catch (Exception e) {}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
复制代码
InternalDecoder#decode
=> NettyServerHandler#channelRead
=> AbstractPeer#received
=> MultiMessageHandler#received
=> HeartbeatHandler#received
=> AllChannelHandler#received
------------------ 异步执行,放到线程池 ----------------------
=> ChannelEventRunnable#run
=> DecodeHandler#received
=> DecodeHandler#decode
=> DecodeableRpcInvocation#decode
=> HeaderExchangeHandler#received
=> HeaderExchangeHandler#handleRequest
=> DubboProtocol.requestHandler#reply
------------------ 异步执行 -----------------------
----------------扩展点-------------------
=> ProtocolFilterWrapper.invoke
=> EchoFilter.invoke
=> ClassLoaderFilter.invoke
=> GenericFilter.invoke
=> TraceFilter.invoke
=> MonitorFilter.invoke
=> TimeoutFilter.invoke
=> ExceptionFilter.invoke
=> InvokerWrapper.invoke
-----------------扩展点-------------------
=> AbstractProxyInvoker#invoke
=> JavassistProxyFactory.AbstractProxyInvoker#doInvoke
=> 代理类#invokeMethod
=> 真正的service方法
//把接收处理的结果,数据发回consumer future#whenComplete
=> channel.send(response)
=> HeaderExchangeChannel
=> NettyChannel.send
=> NioSocketChannel#writeAndFlush(message)
复制代码
HeaderExchangeChannel#send => NettyChannel#send => NioSocketChannel#writeAndFlush(message) 复制代码
在客户端启动的时候,入参handler和服务端的handler是同一个
// DubboProtocol#initClient
Exchangers.connect(url, requestHandler);
// HeaderExchanger#connect
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters#connect =>
NettyTransporter#connect
return NettyClient
复制代码
在 NettyClient
构造函数中,对handler做了包装
ChannelHandlers.wrap(handler, url)
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
复制代码
所以,最终 NettyClient
中的handler属性指向 MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandler
,和服务端处理流程一样一样
MultiMessageHandler
、 HeartbeatHandler
处理,到达 AllDispatcher
AllChannelHandler
中将消息封装成 new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)
类型,交由线程池执行 DecodeHandler
到达 HeaderExchangeHandler
HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#received
, DefaultFuture
中维护了一个 请求ID和DefaultFuture的映射关系
,Request和Response通过请求ID可以一一对应 public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
t.cancel();
}
future.doReceived(response);
} else {
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
复制代码
response.Id
获取 DefaultFuture
CompletableFuture#complete
方法可以让 执行了 CompletableFuture#get
的用户线程得到响应,获取结果返回。至此整个调用过程完成
可是我们在代码中很多时候都是同步调用,很少自己去调用 CompletableFuture#get
方法,这一部分逻辑又是怎么处理的。在 DubboInvoker#doInvoke
方法中,返回的是一个 AsyncRpcResult
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
// return = false,即oneWay ,可以减少不必要的Future对象创建
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {c
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
// 订阅 responseFuture ,当 responseFuture 完成的之后,执行 asyncRpcResult 的complete方法, 这样用户线程就可以响应了
asyncRpcResult.subscribeTo(responseFuture);
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
复制代码
在 AsyncToSyncInvoker#invoke
方法中,会判断是同步调用还是异步调用,如果是同步调用,将调用 AsyncRpcResult#get
方法阻塞用户线程,以达到同步效果
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = invoker.invoke(invocation);
try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
// 如果是同步调用,调用 asyncResult#get 阻塞用户线程
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (t instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
return asyncResult;
}
复制代码