专栏首页luozhiyun的技术学习2. 源码分析---SOFARPC客户端服务引用

2. 源码分析---SOFARPC客户端服务引用

我们先上一张客户端服务引用的时序图。

我们首先来看看ComsumerConfig的refer方法吧

    public T refer() {
        if (consumerBootstrap == null) {
            //如果服务消费者启动类为空,怎创建一个
            consumerBootstrap = Bootstraps.from(this);
        }
        return consumerBootstrap.refer();
    }

然后我们再看Bootstraps是怎么创建的

    public static <T> ConsumerBootstrap<T> from(ConsumerConfig<T> consumerConfig) {
        String bootstrap = consumerConfig.getBootstrap();
        ConsumerBootstrap consumerBootstrap;
        //如果有传入启动器,那么就使用启动器的参数
        if (StringUtils.isNotEmpty(bootstrap)) {
            consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
                .getExtension(bootstrap,
                    new Class[] { ConsumerConfig.class },
                    new Object[] { consumerConfig });
        } else {
            //没有传入启动器,那么就使用协议的参数
            // default is same with protocol
            bootstrap = consumerConfig.getProtocol();
            ExtensionLoader extensionLoader = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class);
            ExtensionClass<ConsumerBootstrap> extensionClass = extensionLoader.getExtensionClass(bootstrap);
            //预防性代码,实际不可能为空
            if (extensionClass == null) {
                // if not exist, use default consumer bootstrap
                // 为空的话,则是使用默认的启动器DefaultConsumerBootstrap
                bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_CONSUMER_BOOTSTRAP);
                consumerConfig.setBootstrap(bootstrap);
                consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
                    .getExtension(bootstrap, new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
            } else {
                consumerConfig.setBootstrap(bootstrap);
                consumerBootstrap = extensionClass.getExtInstance(
                    new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
            }
        }
        return (ConsumerBootstrap<T>) consumerBootstrap;
    }

这里返回的consumerBootstrap和用的启动器和协议有关,如果用的是bolt那么返回的就是BoltConsumerBootstrap实例。

从这里可以看出很多功能都是继承自父类DefaultConsumerBootstrap的。

继续调用consumerBootstrap#refer方法,会直接跳到父类的refer方法中。

    public T refer() {
        if (proxyIns != null) {
            return proxyIns;
        }
        synchronized (this) {
            if (proxyIns != null) {
                return proxyIns;
            }
            String key = consumerConfig.buildKey();
            String appName = consumerConfig.getAppName();
            // 检查参数
            checkParameters();
            // 提前检查接口类
            if (LOGGER.isInfoEnabled(appName)) {
                LOGGER.infoWithApp(appName, "Refer consumer config : {} with bean id {}", key, consumerConfig.getId());
            }

            // 注意同一interface,同一tags,同一protocol情况
            AtomicInteger cnt = REFERRED_KEYS.get(key); // 计数器
            if (cnt == null) { // 没有发布过
                cnt = CommonUtils.putToConcurrentMap(REFERRED_KEYS, key, new AtomicInteger(0));
            }
            int c = cnt.incrementAndGet();
            //同一个服务 的最大引用次数,防止由于代码bug导致重复引用,每次引用都会生成一个代理类对象,-1表示不检查
            int maxProxyCount = consumerConfig.getRepeatedReferLimit();
            if (maxProxyCount > 0) {
                if (c > maxProxyCount) {
                    cnt.decrementAndGet();
                    // 超过最大数量,直接抛出异常
                    throw new SofaRpcRuntimeException("Duplicate consumer config with key " + key
                        + " has been referred more than " + maxProxyCount + " times!"
                        + " Maybe it's wrong config, please check it."
                        + " Ignore this if you did that on purpose!");
                } else if (c > 1) {
                    if (LOGGER.isInfoEnabled(appName)) {
                        LOGGER.infoWithApp(appName, "Duplicate consumer config with key {} has been referred!"
                            + " Maybe it's wrong config, please check it."
                            + " Ignore this if you did that on purpose!", key);
                    }
                }
            }

            try {
                // build cluster
                //默认是FailOverCluster
                cluster = ClusterFactory.getCluster(this);
                // build listeners
                consumerConfig.setConfigListener(buildConfigListener(this));
                consumerConfig.setProviderInfoListener(buildProviderInfoListener(this));
                // init cluster
                cluster.init();
                // 构造Invoker对象(执行链)
                proxyInvoker = buildClientProxyInvoker(this);
                // 创建代理类
                proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(),
                    proxyInvoker);

                //动态配置
                final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS);
                if (StringUtils.isNotBlank(dynamicAlias)) {
                    final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager(
                        consumerConfig.getAppName(), dynamicAlias);
                    dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId());
                }
            } catch (Exception e) {
                if (cluster != null) {
                    cluster.destroy();
                    cluster = null;
                }
                consumerConfig.setConfigListener(null);
                consumerConfig.setProviderInfoListener(null);
                cnt.decrementAndGet(); // 发布失败不计数
                if (e instanceof SofaRpcRuntimeException) {
                    throw (SofaRpcRuntimeException) e;
                } else {
                    throw new SofaRpcRuntimeException("Build consumer proxy error!", e);
                }
            }
            if (consumerConfig.getOnAvailable() != null && cluster != null) {
                cluster.checkStateChange(false); // 状态变化通知监听器
            }
            RpcRuntimeContext.cacheConsumerConfig(this);
            return proxyIns;
        }
    }

这个方法里面除了做校验以外,主要做了如下几件事:

  1. 设置cluster属性,默认是FailOverCluster
  2. 设置监听器
  3. 初始化cluster
    1. 构造路由链表,主要有DirectUrlRouter、RegistryRouter、CustomRouter
    2. 设置loadBalancer属性,默认是RandomLoadBalancer
    3. 设置地址管理器addressHolder
    4. 设置连接管理器connectionHolder
    5. 构造Filter链
    6. 启动重连线程
  4. 设置proxyInvoker属性,如果用的是bolt协议,那么返回的是BoltClientProxyInvoker
  5. 创建代理类

如果暴露的服务的接口如下:

public interface HelloService {
    String sayHello(String string);
}

默认用javagent代理生成代理类:

public class HelloService_proxy_0 extends Proxy implements HelloService {
    public Invoker proxyInvoker = null;
    private Method method_1;

    public HelloService_proxy_0() {
        super(new UselessInvocationHandler());
        this.method_1 = ReflectUtils.getMethod(HelloService.class, "sayHello", new Class[]{String.class, Integer.TYPE});
    }

    public String sayHello(String var1, int var2) {
        Class var3 = HelloService.class;
        Method var4 = this.method_1;
        Class[] var5 = new Class[2];
        Object[] var6 = new Object[]{var1, null};
        var5[0] = String.class;
        var6[1] = new Integer(var2);
        var5[1] = Integer.TYPE;
        SofaRequest var7 = MessageBuilder.buildSofaRequest(var3, var4, var5, var6);
        //这里的invoker应该是BoltClientProxyInvoker
        SofaResponse var8 = this.proxyInvoker.invoke(var7);
        if (var8.isError()) {
            throw new SofaRpcException(199, var8.getErrorMsg());
        } else {
            Object var9 = var8.getAppResponse();
            if (var9 instanceof Throwable) {
                throw (Throwable)var9;
            } else {
                return (String)var9;
            }
        }
    }

    public String toString() {
        return this.proxyInvoker.toString();
    }

    public int hashCode() {
        return this.proxyInvoker.hashCode();
    }

    public boolean equals(Object var1) {
        return this == var1 || this.getClass().isInstance(var1) && this.proxyInvoker.equals(JavassistProxy.parseInvoker(var1));
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 4. 源码分析---SOFARPC服务端暴露

    从示例入手我们设置好ServerConfig和ProviderConfig之后调用ProviderConfig的export方法进行暴露 ProviderCo...

    luozhiyun
  • 6.Sentinel源码分析—Sentinel是如何动态加载配置限流的?

    Sentinel源码解析系列: 1.Sentinel源码分析—FlowRuleManager加载规则做了什么?

    luozhiyun
  • 4.Sentinel源码分析— Sentinel是如何做到降级的?

    在我的第二篇文章里面2. Sentinel源码分析—Sentinel是如何进行流量统计的?里面介绍了整个Sentinel的主流程是怎样的。所以降级的大致流程可以...

    luozhiyun
  • 组合模式

    文件夹下面可以为另一个文件夹也可以为文件, 我们希望统一对待这些文件夹和文件, 这种情形适合使用组合模式。

    JS菌
  • Proe文件版本归1,python脚本

    等等,久而久之,文件版本就会非常多,会非常臃肿,而且保留过多的历史版本没有太大意义。

    py3study
  • Android使用SQLITE3 WAL

    sqlite是支持write ahead logging(WAL)模式的,开启WAL模式可以提高写入数据库的速度,读和写之间不会阻塞,但是写与写之间依然是阻塞的...

    py3study
  • PowerBI:如何处理单位万的需求?

    使用PowerBI时你可能发现,PowerBI的默认单位只有无、千、百万、十亿、万亿

    披头
  • docker storage driver

    使用docker目录创建一个volume,并将该volume挂载到容器的/my_Cvol目录下

    charlieroro
  • 如何分析渠道招聘的有效性

    相对与转换率对渠道的分析,可能很多HR没有做的很精细,只停留在比较宽泛的渠道数据分析里,今天我们来讲讲如何对招聘渠道做数据分析。

    王佩军
  • The Twelve-Factor App

    如今,软件通常会作为一种服务来交付,它们被称为网络应用程序,或软件即服务(SaaS)。12-Factor 为构建如下的 SaaS 应用提供了方法论:

    耕耘实录

扫码关注云+社区

领取腾讯云代金券