public interface RegistryFactory {
Registry getRegistry(URL url);
}
核心方法就这一个。
public abstract class AbstractRegistryFactory implements RegistryFactory {
// Log output
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class); // The lock for the acquisition process of the registry
private static final ReentrantLock LOCK = new ReentrantLock(); // Registry Collection Map<RegistryAddress, Registry>
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>(); public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString(); // Lock the registry access process to ensure a single instance of the registry
LOCK.lock(); try {
Registry registry = REGISTRIES.get(key); if (registry != null) { return registry;
}
registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry); return registry;
} finally { // Release the lock
LOCK.unlock();
}
} public static void destroyAll() { if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
} // Lock up the registry shutdown process
LOCK.lock(); try { for (Registry registry : getRegistries()) { try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
REGISTRIES.clear();
} finally { // Release the lock
LOCK.unlock();
}
}protected abstract Registry createRegistry(URL url);
可以发现注册集合的操作通过ReentrantLock加锁实现,createRegistry注册实现则是交给对应实现方自己实现。
public Registry createRegistry(URL url) {
url = getRegistryURL(url);
List<URL> urls = new ArrayList<URL>();
urls.add(url.removeParameter(Constants.BACKUP_KEY));
String backup = url.getParameter(Constants.BACKUP_KEY); if (backup != null && backup.length() > 0) {
String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(backup); for (String address : addresses) {
urls.add(url.setAddress(address));
}
}
RegistryDirectory<RegistryService> directory = new RegistryDirectory<RegistryService>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));
Invoker<RegistryService> registryInvoker = cluster.join(directory);
RegistryService registryService = proxyFactory.getProxy(registryInvoker);
DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
directory.setRegistry(registry);
directory.setProtocol(protocol);
directory.notify(urls);
directory.subscribe(new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, RegistryService.class.getName(), url.getParameters())); return registry;
}
将所有url和backupurl全部放入集合,通过负载均衡算法进行使用。
public abstract class FailbackRegistry extends AbstractRegistry { // Scheduled executor service
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
private final ScheduledFuture<?> retryFuture; private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); private AtomicBoolean destroyed = new AtomicBoolean(false);
可以发现注册中心的容错,是通过定时线程对注册url进行重试,将可用url和不可用url放入不同集合。
<dubbo:application name="${dubbo.application.name}" owner="${dubbo.application.owner}"/>
<dubbo:protocol name="dubbo" port="${dubbo.protocol.port}" heartbeat="180000"/>
<dubbo:service id="registryServiceConfig" interface="com.alibaba.dubbo.registry.RegistryService"
ref="registryService" registry="N/A" ondisconnect="disconnect" callbacks="1000">
<dubbo:method name="subscribe">
<dubbo:argument index="1" callback="true"/>
</dubbo:method>
<dubbo:method name="unsubscribe">
<dubbo:argument index="1" callback="false"/>
</dubbo:method>
</dubbo:service>
<bean id="registryService" class="com.alibaba.dubbo.registry.simple.SimpleRegistryService"/>
public void register(URL url) {
String client = RpcContext.getContext().getRemoteAddressString();
Set<URL> urls = remoteRegistered.get(client);
if (urls == null) {
remoteRegistered.putIfAbsent(client, new ConcurrentHashSet<URL>());
urls = remoteRegistered.get(client);
}
urls.add(url);
super.register(url);
registered(url);
}
之前我们说过,在spring容器启动之后,整个Bean信息会被解析成多个Config和Model的Bean放入Context上下文中,所以在注册时,会通过Context获取上下文中注册的Mode进行Bean解析注册。