订阅和发布是注册中心的核心功能之一。当一个已有服务的提供者下线,或者新的提供者加入。订阅对应接口的消费者和服务治理中心能及时收到注册中心的通知,并更新本地配置信息。整个过程自动完成,不需要人工参与。
提供者和消费者都需要把自己注册到注册中心。
提供者的注册为了让消费者感知服务的存在,从而发起远程调用。也让服务治理中心感知有新的服务加入。
消费者的发布,为了让服务治理中心发现。
发布代码:
zkClinet.create(toUrlPath(url)) url.getParamter(Contants.DYNAMIC_KEY,true) 复制代码
zkClinet.delete(toUrlPath(url)) 复制代码
订阅通常有pull和push两种方式,一种是客户端定时轮询注册中心拉取配置,另一种是注册中心主动推送给客户端。各有利弊,Dubbo采用的是第一次启动拉取,后续接收事件重新拉取数据。
在服务暴露时,服务端会订阅configurators用于监听动态配置。在消费端启动时,消费端会订阅providers,routers和configurators这三个目录,分别对应服务提供者、路由和动态配置变更通知。
dubbo定义了两种两种连接zookeeper客户端:Apache Curator 和zkClient。默认是Curator.
Zookeeper采取的是 时间通知和客户端拉取方式。
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
//全量订阅
if ("*".equals(url.getServiceInterface())) {
String root = this.toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
if (listeners == null) {
this.zkListeners.putIfAbsent(url, new ConcurrentHashMap());
listeners = (ConcurrentMap)this.zkListeners.get(url);
}
ChildListener zkListener = (ChildListener)listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
Iterator var3 = currentChilds.iterator();
while(var3.hasNext()) {
String child = (String)var3.next();
child = URL.decode(child);
if (!ZookeeperRegistry.this.anyServices.contains(child)) {
ZookeeperRegistry.this.anyServices.add(child);
ZookeeperRegistry.this.subscribe(url.setPath(child).addParameters(new String[]{"interface", child, "check", String.valueOf(false)}), listener);
}
}
}
});
zkListener = (ChildListener)listeners.get(listener);
}
this.zkClient.create(root, false);
List<String> services = this.zkClient.addChildListener(root, zkListener);
if (services != null && services.size() > 0) {
Iterator var7 = services.iterator();
while(var7.hasNext()) {
String service = (String)var7.next();
service = URL.decode(service);
this.anyServices.add(service);
this.subscribe(url.setPath(service).addParameters(new String[]{"interface", service, "check", String.valueOf(false)}), listener);
}
}
} else {
//根据url的类别订阅
List<URL> urls = new ArrayList();
String[] var13 = this.toCategoriesPath(url);
int var14 = var13.length;
for(int var15 = 0; var15 < var14; ++var15) {
String path = var13[var15];
ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
if (listeners == null) {
this.zkListeners.putIfAbsent(url, new ConcurrentHashMap());
listeners = (ConcurrentMap)this.zkListeners.get(url);
}
ChildListener zkListener = (ChildListener)listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = (ChildListener)listeners.get(listener);
}
this.zkClient.create(path, false);
List<String> children = this.zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(this.toUrlsWithEmpty(url, path, children));
}
}
this.notify(url, listener, urls);
}
} catch (Throwable var11) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + this.getUrl() + ", cause: " + var11.getMessage(), var11);
}
}
复制代码