转载

dubbo之Zookeeper注册中心

目前dubbo支持多种注册中心:Zookeeper、Redis、Simple、Multicast、Etcd3。

本编文章是分析使用Zookeeper作为注册中心,dubbo如何整合Zookeeper进行服务注册和订阅服务。

首先dubbo将服务注册到Zookeeper后,目录结构如下所示:(注册接口名:com.bob.dubbo.service.CityDubboService)

dubbo之Zookeeper注册中心

在consumer和provider服务启动的时候,去把自身URL格式化成字符串,然后注册到zookeeper相应节点下,作为临时节点,断开连接后,节点删除;consumer启动时,不仅会订阅服务,同时也会将自己的URL注册到zookeeper中;

ZookeeperRegistry

ZookeeperRegistry:dubbo与zookeeper交互主要的类,已下结合源码进行分析,先来看doSubcribe()方法:

@Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 处理所有service层发起的订阅,例如监控中心的订阅
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
                // 处理指定service层发起的订阅,例如服务消费者的订阅
            } else {
                List<URL> urls = new ArrayList<>();
                // 循环分类数组 , router, configurator, provider
                for (String path : toCategoriesPath(url)) {
                    // 获得 url 对应的监听器集合
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {// 不存在,进行创建
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    // 获得 ChildListener 对象
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {//  不存在子目录的监听器,进行创建 ChildListener 对象
                        // 订阅父级目录, 当有子节点发生变化时,触发此回调函数,回调listener中的notify()方法
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    // 向 Zookeeper ,PATH 节点,发起订阅,返回此节点下的所有子元素 path : /根节点/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener, 在这一步从连接Provider,实例化Invoker
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
原文  https://segmentfault.com/a/1190000017916181
正文到此结束
Loading...