本文主要讲解ZookeeperRegistry是如何实现注册和订阅功能。
上图是Dubbo注册中心的整体类图,基于接口的实现方式可以方便我们扩展注册中心的实现方式,下面简单介绍一下各个类的作用:
下面我们看一下ZookeeperRegistry的具体实现。
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
注册的逻辑很简单,就是简单在ZooKeeper中创建一个节点,默认创建的节点是一个临时节点(随会话的关闭自动删除)。
创建的节点路径格式为:{root}/{interface}/{category}/{protocol}://{username}:{password}@{host}:{port}/{path}?parameterKey=parameterValue&...
整个节点路径格式分为两大部分,一部分是父目录节点{root}/{interface}/{category},另一部分就是节点的名称{protocol}://{username}:{password}@{host}:{port}/{path}?parameterKey=parameterValue&...
# 第二部分示例
dubbo://127.0.0.1:20880/cn.sh.dubbo.user.service.UserService?anyhost=true&application=user-manager&application.version=1.0&bean.name=cn.sh.dubbo.user.service.UserService&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=cn.sh.dubbo.user.service.UserService&methods=login,register&pid=3551&qos.accept.foreign.ip=false&qos.port=22223&qosEnable=true®ister=true&release=2.7.2&side=provider×tamp=1563954893720
# 解释其中几个参数的来源
# bean.name:<dubbo:service>标签配置的id属性的值
下面讲解一下每个{}中的数据来源和默认值:
@Override
public void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
取消注册就是将注册时创建的节点进行删除。
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
订阅支持全局订阅和订阅具体某个接口,上述订阅的方法只展示了订阅某个接口的逻辑。简单描述一下其逻辑:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
// 省略了url、listener、urls的判断
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}
上述方法有三个参数,第一个参数为要URL(包含订阅的类型、订阅的服务等关键信息),第二个NotifyListener在订阅服务这一模块的具体实现类是RegistryDirectory,第三个是订阅的路径的子节点路径列表。整体逻辑如下:
@Override
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
从上面的源码中我们可以大体看出,根据URL解析每个类别(configurators、routers、providers),分别进行通知。
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
zkClient.removeChildListener(root, zkListener);
} else {
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
取消订阅的逻辑就比较简单了,将相应的ChildListener进行移除就好了。
上述内容只是本人的一些个人见解,最后附上一张Dubbo的架构图(来源:官网),上述内容只是简略的讲述了红框中的部分内容。