转载

Dubbo源码分析(七)服务目录

在上一章节的内容中,我们分析了服务引用的具体流程。在大多数情况下,为避免单点故障,我们的应用会部署在多台服务器上。对于我们的Dubbo而言,就会出现多个服务提供者。而且这些服务也并非是一成不变的,那么就有这样一个问题: 有新的服务提供者加入或者禁用、修改已有的服务提供者,那么服务消费者怎么及时感知它们的变化呢?

一、服务目录

或许你还有印象 ,在服务引用的时候,我们曾经有用到它。这个就是服务目录。 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

那么,什么是服务目录呢?

简单来说, 服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。

服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。多个服务提供者,就构成了服务目录中的一个Invoker 集合。

假设我们有三个服务提供者,那么服务目录,RegistryDirectory对象中保存的Invoker如下:

Dubbo源码分析(七)服务目录

我们再看下这个类的继承关系:

Dubbo源码分析(七)服务目录

我们看到,RegistryDirectory实现了Directory接口和NotifyListener接口。那么,我们重点关注两个实现。

1、获取Invocation集合

上面我们看到,RegistryDirectory会保存服务提供者Invocation的集合。那么,就得有获取的方法。获取的方法很简单,就是从本地缓存中查询即可。

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
	public List<Invoker<T>> doList(Invocation invocation) {
		List<Invoker<T>> invokers = null;
		
		//方法名和List<Invoker<T>>的映射
		Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
		if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
			//获取请求的方法名,比如insertInfoUser
			String methodName = RpcUtils.getMethodName(invocation);
			//参数列表
			Object[] args = RpcUtils.getArguments(invocation);
			if (args != null && args.length > 0 && args[0] != null
					&& (args[0] instanceof String || args[0].getClass().isEnum())) {
				invokers = localMethodInvokerMap.get(methodName + "." + args[0]);
			}
			if (invokers == null) {
				 // 通过方法名获取 Invoker 列表
				invokers = localMethodInvokerMap.get(methodName);
			}
			if (invokers == null) {
				// 通过星号 * 获取 Invoker 列表
				invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
			}
			if (invokers == null) {
				Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
				if (iterator.hasNext()) {
					invokers = iterator.next();
				}
			}
		}
		return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
	}
}
复制代码

2、动态更新

服务目录并非是一成不变的,如果有的新的服务提供者加入,或者剔除已有的服务提供者,那么服务目录需要及时更新信息。所以它实现了NotifyListener接口,用于刷新Invoker集合。

  • 触发点

既然是通知,首先我们就要弄清楚它是在哪里触发的。以zookeeper为例,在服务引用的时候,它会监听服务提供者数据节点的数据变化。

public void childChanged(String parentPath, List<String> currentChilds) {
	ZookeeperRegistry.this.notify(url, listener, 
			toUrlsWithEmpty(url, parentPath, currentChilds));
}
复制代码

如上代码, toUrlsWithEmpty 方法会将当前最新的子节点数据转换成List对象,然后调用 ZookeeperRegistry.this.notify ,此方法最终将调用到服务目录中的刷新方法。

  • 刷新 Invoker 列表

refreshInvoker 方法是保证 RegistryDirectory 随注册中心变化而变化的关键所在。

private void refreshInvoker(List<URL> invokerUrls) {
	
	//如果invokerUrls中只要一个元素,且协议头为empty,就销毁所有的服务
	if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
			&& "empty".equals(invokerUrls.get(0).getProtocol())) {
		this.forbidden = true; 
		this.methodInvokerMap = null; 
		//销毁所有服务引用
		destroyAllInvokers(); 
	} else {
		this.forbidden = false; 
		//原有的urlInvokerMap
		Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; 
		if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
			// 添加缓存 url 到 invokerUrls 中
			invokerUrls.addAll(this.cachedInvokerUrls);
		} else {
			this.cachedInvokerUrls = new HashSet<URL>();
			// 缓存 invokerUrls
			this.cachedInvokerUrls.addAll(invokerUrls);
		}
		if (invokerUrls.isEmpty()) {
			return;
		}
		//将invokerUrls转换为Invoker
		//这里会根据url中的协议名称调用对应的Protocol实现
		Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
		//将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
		Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); 
		if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
			logger.error(new IllegalStateException("......");
			return;
		}
		this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : 
													newMethodInvokerMap;
		this.urlInvokerMap = newUrlInvokerMap;
		try {
			//销毁无用的服务引用Invoker
			destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
		} catch (Exception e) {
			logger.warn("destroyUnusedInvokers error. ", e);
		}
	}
}
复制代码

如上代码,我们分为这样几个步骤来理解。

  • 从注册中心中,获取当前最新的 invokerUrls
  • 调用 toInvokers(invokerUrls) 方法。此方法根据URL中的协议名称,调用对应的Protocol实现,来引用服务。比如 DubboProtocol.refer ,它返回已经构建好的Invoker对象集合。
  • 调用 toMethodInvokers(newUrlInvokerMap) 方法,根据最新的InvokerMap,重置方法名和Invoker对象的映射关系。
  • 更新 methodInvokerMap 缓存。

我们刚刚看到,上面获取Invocation集合的时候,就是从 methodInvokerMap 对象中获取数据。那么在这里改变这个对象的属性,就相当于更新了服务目录的Invoker信息。

  • 销毁

在更新完服务目录Invoker之后,还要销毁无用的服务引用Invoker。

private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, 
					Map<String, Invoker<T>> newUrlInvokerMap) {
					
	//如果新的Invoker为空,则销毁全部
	if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
		destroyAllInvokers();
		return;
	}
	
	List<String> deleted = null;
	if (oldUrlInvokerMap != null) {
		Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
		//遍历旧的Invoker
		for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
			//判断新的newInvokers是否包含旧的Invoker对象
			if (!newInvokers.contains(entry.getValue())) {
				if (deleted == null) {
					deleted = new ArrayList<String>();
				}
				// 若不包含,则将旧的 Invoker 对应的 url 存入 deleted 列表中
				deleted.add(entry.getKey());
			}
		}
	}
	if (deleted != null) {
		for (String url : deleted) {
			if (url != null) {
				// 从 oldUrlInvokerMap 中移除 url 对应的 Invoker
				Invoker<T> invoker = oldUrlInvokerMap.remove(url);
				if (invoker != null) {
					try {
						//调用销毁方法
						invoker.destroy();
						if (logger.isDebugEnabled()) {
							logger.debug("destory invoker[" + invoker.getUrl() + "] success. ");
						}
					} 
				}
			}
		}
	}
}
复制代码

以上代码看起来比较长,其实也很简单。就是通过对比两个InvokerMap,如果新的InvokerMap中不包含旧的InvokerMap中的节点,那么这个Invoker就是要被销毁的。

我们在上一章节分析服务引用的时候,我们提到了可以对服务引用进行监听。那么这里,就是触发服务销毁监听方法的地方。

public class ListenerInvokerWrapper<T> implements Invoker<T> {

	public void destroy() {
		try {
			invoker.destroy();
		} finally {	
			//自定义监听器
			if (listeners != null && !listeners.isEmpty()) {
				for (InvokerListener listener : listeners) {
					if (listener != null) {
						try {
							//调用监听器方法
							listener.destroyed(invoker);
						} catch (Throwable t) {
							logger.error(t.getMessage(), t);
						}
					}
				}
			}
		}
	}
}
复制代码

服务目录是集群容错和负载均衡机制的基础部分,为什么这样说呢?

当有多个服务提供者的时候:

我们怎么选取其中的一个服务去调用,这是负载均衡机制 当调用服务失败后,我们怎么处理当前的请求?抛出异常亦或是重试?这是集群容错机制

有了服务目录,我们才能获取所有的服务提供者列表,并感知注册中心的数据变化,及时更新目录中的Invoker对象信息。

原文  https://juejin.im/post/5c98ca53f265da60f771d016
正文到此结束
Loading...