前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo集群容错机制代码分析1

dubbo集群容错机制代码分析1

作者头像
技术蓝海
发布2018-04-26 14:34:51
1K0
发布2018-04-26 14:34:51
举报
文章被收录于专栏:wannshan(javaer,RPC)wannshan(javaer,RPC)

dubbo版本2.5.3

我们这里以zookeeper作为注册中心为例说明。

这里说的集群,可以理解为,一个接口服务对应有多个提供者。 在dubbo的调用方(reference)看来,每个提供方(service)对应一个invoker。 关于一个调用方对应多个提供方的场景大概包括三大类: 1,者调者订阅一个注册中心,此注册中心,同一个服务有多个提供者(以不同机器,端口,版本等发布的服务) 2,者调者订阅多个注册中心的服务,每个注册中心都有引用的服务的提供者(一个或者多个)。 3,调用方,通过url配置,提供多个提供者地址,多个地址以分号隔开。 1,2是同一类场景,3是直连场景,这两中场景是互斥,也就是用户配置了reference的url属性,dubbo就不会再订阅注册中心。

下面通过代码分析下,这三种场景的集群容错 客户端订阅可以看ReferenceConfig类的createProxy方法里以下代码

代码语言:javascript
复制
		if (isJvmRefer) {//引用本地服务,只返回一个exporter不会有集群。
			    URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
			    invoker = refprotocol.refer(interfaceClass, url);
			    if (logger.isInfoEnabled()) {
				logger.info("Using injvm service " + interfaceClass.getName());
			    }
		} else {//应用远程服务
		    if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
			String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
			if (us != null && us.length > 0) {//用户自定多个直连服务
			    for (String u : us) {
				URL url = URL.valueOf(u);
				if (url.getPath() == null || url.getPath().length() == 0) {
				    url = url.setPath(interfaceName);
				}
				if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
				    urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
				} else {
				    urls.add(ClusterUtils.mergeUrl(url, map));
				}
			    }
			}
		    } else { // 通过注册中心配置拼装URL
			List<URL> us = loadRegistries(false);
			if (us != null && us.size() > 0) {//用户自定多个注册中心
				for (URL u : us) {
				    URL monitorUrl = loadMonitor(u);
				if (monitorUrl != null) {
				    map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
				}
				    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
			    }
			}
			if (urls == null || urls.size() == 0) {
			    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
			}
		    }

		    if (urls.size() == 1) {//调用方订阅一个注册中心,或者自定一个直连服务(直连的这种情况不考虑集群,只有一个提供者)
                        //一个注册中心时,这个refprotocol自适应后是RegistryProtocol
                        //一个直连者时,这个refprotocol自适应后是DubboProtocol(如果是duboo协议)
			invoker = refprotocol.refer(interfaceClass, urls.get(0));
		    } else {//调用方订阅多个注册中心,或者多个直连地址
			List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
			URL registryURL = null;
			for (URL url : urls) {
			    invokers.add(refprotocol.refer(interfaceClass, url));
			    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
				registryURL = url; // 用了最后一个registry url
			    }
			}
			if (registryURL != null) { // 有 注册中心协议的URL
			    // 对有注册中心的Cluster 只用 AvailableCluster 容错策略
			    // 对于订阅多个注册中心的,这里其实有两层的容错机制,只是第一层,被强制设置为AvailableCluster 容错策略
			    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
			    invoker = cluster.join(new StaticDirectory(u, invokers));//cluster是通过spi机制注入的自适应adaptive实现,场景2执行逻辑
			}  else { // 不是 注册中心的URL
			    invoker = cluster.join(new StaticDirectory(invokers));//cluster是通过spi机制注入的自适应adaptive实现,场景3执行逻辑
			}
		    }
        }

通过代码我们看到,对于场景1,引用一个注册中心的场景,会执行 invoker = refprotocol.refer(interfaceClass, urls.get(0));代码

通过代码调试,可以发现,refprotocol.refer会调用RegistryProtocol的refer方法最终进入doRefer方法, 会执行如下代码

代码语言:javascript
复制
  private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);//cluster是通过spi机制注入的自适应adaptive实现。此时的directory是RegistryDirectory类型
    }

三种场景实际上,都执行了dubbo SPI机制生成的adaptive的Cluster实现代码 通过dubbo打印日志,可以看到adaptive的Cluster实现代码如下

代码语言:javascript
复制
package com.alibaba.dubbo.rpc.cluster;

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
    public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover");//可以看到,通过url里的cluster键值获取容错机制,url中没有指定cluster键值,dubbo默认是用failover集群容错策略
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
        com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
}

到此,我们可以看到对于多注册中心的,第一层容错机制被强制设置为available, 然后第二层,就和单个注册中心多服务提供者集群容错机制一样了,即默认为failover容错机制。这里看下这两种容错机制的代码实现 1,failover容错机制 通过spi机制我们找到Cluster failover扩展FailoverCluster类是这样实现的

代码语言:javascript
复制
public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }

}

接着看FailoverClusterInvoker类,先看它的父类AbstractClusterInvoker,这个类实现了Invoker接口:

代码语言:javascript
复制
public Result invoke(final Invocation invocation) throws RpcException {

        checkWhetherDestroyed();

        LoadBalance loadbalance;
        //这里是获取负载均衡策略
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);//回调子类的doInvoke方法
}

然后再回到子类看doInvoke方法:

代码语言:javascript
复制
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    	List<Invoker<T>> copyinvokers = invokers;
    	checkInvokers(copyinvokers, invocation);
	//获取重试次数  +1是因为第一次调用不算重试次数
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
        	//重试时,进行重新选择,避免重试时invoker列表已发生变化.
        	//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
        	if (i > 0) {
        		checkWhetherDestroyed();
        		copyinvokers = list(invocation);
        		//重新检查一下
        		checkInvokers(copyinvokers, invocation);
        	}
		//这里是通过负载均衡策略获取下一个服务提供者
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers 
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.如果是业务异常,则不再重试,直接抛出异常
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());//记录调用失败信息
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

}

通过代码可以看到, failvoer集群容错机制,总的逻辑是,以方法重复次数为限制,每次调用如果失败, 就利用负责均衡策略获取下一个提供者(invoker),直到调用成功,或者最后方法超限,抛出异常, 其中中间如果有业务异常,则不再重试,直接抛出异常。

2,available集群容错机制,我们找到AvailableCluster类,它只有一个方法

代码语言:javascript
复制
 public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        //它没通过扩展AbstractClusterInvoker抽象类,而是直接实现它,它没用负载均衡策略,而是简单选择一个可达的服务
        return new AbstractClusterInvoker<T>(directory) {
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {//获取第一个,可达的服务提供方,
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };
        
    }

通过代码可以看到, available集群容错机制,则是简单的调用第一个可到达的服务。都不可达是,抛出异常

最后 dubbo本身还有其他集群容错的扩展实现

dubbo集群容错策略的代码分析2

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档