前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo源码学习笔记----registry

dubbo源码学习笔记----registry

作者头像
春哥大魔王
发布2018-04-16 10:46:02
5840
发布2018-04-16 10:46:02
举报

注册工厂

代码语言:javascript
复制
public interface RegistryFactory {   
    Registry getRegistry(URL url);
}

核心方法就这一个。

注册抽象类

代码语言:javascript
复制
public abstract class AbstractRegistryFactory implements RegistryFactory {

    // Log output
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class);    // The lock for the acquisition process of the registry
    private static final ReentrantLock LOCK = new ReentrantLock();    // Registry Collection Map<RegistryAddress, Registry>
    private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();    public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceString();        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();        try {
            Registry registry = REGISTRIES.get(key);            if (registry != null) {                return registry;
            }
            registry = createRegistry(url);            if (registry == null) {                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);            return registry;
        } finally {            // Release the lock
            LOCK.unlock();
        }
    }    public static void destroyAll() {        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }        // Lock up the registry shutdown process
        LOCK.lock();        try {            for (Registry registry : getRegistries()) {                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {            // Release the lock
            LOCK.unlock();
        }
    }protected abstract Registry createRegistry(URL url);

可以发现注册集合的操作通过ReentrantLock加锁实现,createRegistry注册实现则是交给对应实现方自己实现。

代码语言:javascript
复制
    public Registry createRegistry(URL url) {
        url = getRegistryURL(url);
        List<URL> urls = new ArrayList<URL>();
        urls.add(url.removeParameter(Constants.BACKUP_KEY));
        String backup = url.getParameter(Constants.BACKUP_KEY);        if (backup != null && backup.length() > 0) {
            String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(backup);            for (String address : addresses) {
                urls.add(url.setAddress(address));
            }
        }
        RegistryDirectory<RegistryService> directory = new RegistryDirectory<RegistryService>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));
        Invoker<RegistryService> registryInvoker = cluster.join(directory);
        RegistryService registryService = proxyFactory.getProxy(registryInvoker);
        DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        directory.notify(urls);
        directory.subscribe(new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, RegistryService.class.getName(), url.getParameters()));        return registry;
    }

将所有url和backupurl全部放入集合,通过负载均衡算法进行使用。

容错机制

代码语言:javascript
复制
public abstract class FailbackRegistry extends AbstractRegistry {    // Scheduled executor service
    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    private final ScheduledFuture<?> retryFuture;    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();    private AtomicBoolean destroyed = new AtomicBoolean(false);

可以发现注册中心的容错,是通过定时线程对注册url进行重试,将可用url和不可用url放入不同集合。

注册实现

代码语言:javascript
复制
    <dubbo:application name="${dubbo.application.name}" owner="${dubbo.application.owner}"/>

    <dubbo:protocol name="dubbo" port="${dubbo.protocol.port}" heartbeat="180000"/>

    <dubbo:service id="registryServiceConfig" interface="com.alibaba.dubbo.registry.RegistryService"
                   ref="registryService" registry="N/A" ondisconnect="disconnect" callbacks="1000">
        <dubbo:method name="subscribe">
            <dubbo:argument index="1" callback="true"/>
        </dubbo:method>
        <dubbo:method name="unsubscribe">
            <dubbo:argument index="1" callback="false"/>
        </dubbo:method>
    </dubbo:service>

    <bean id="registryService" class="com.alibaba.dubbo.registry.simple.SimpleRegistryService"/>


    public void register(URL url) {
        String client = RpcContext.getContext().getRemoteAddressString();
        Set<URL> urls = remoteRegistered.get(client);
        if (urls == null) {
            remoteRegistered.putIfAbsent(client, new ConcurrentHashSet<URL>());
            urls = remoteRegistered.get(client);
        }
        urls.add(url);
        super.register(url);
        registered(url);
    }

之前我们说过,在spring容器启动之后,整个Bean信息会被解析成多个Config和Model的Bean放入Context上下文中,所以在注册时,会通过Context获取上下文中注册的Mode进行Bean解析注册。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-01-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 春哥talk 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 注册工厂
  • 注册抽象类
  • 容错机制
  • 注册实现
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档