dubbo源码——服务提供者的服务暴露过程(三) 完~

上篇博客讲到了Protocol接口的export()方法。

在这篇博客讲解前,希望读者可以先了解一下SPI机制:SPI(Service Provider Interface)服务提供者接口是提供给服务厂商,或者框架扩展者的接口,例如JDK中的java.sql.Driver,dubbo中的com.alibaba.dubbo.rpc.Protocol等等。

一、SPI机制

不同服务厂商只需对接口进行自己的实现,然后在META-INF下services目录中创建以接口全限定名为名字的文件,然后写入自己的实现类的全限定名即可。

Java提供了一种服务发现类:Serviceloader,下面是一个小例子。

对java.sql.Driver接口添加两个实现类AImplDriver,BImplDriver

AImplDriver
BImplDriver

将两个实现类打包为jar,并且在META-INF/services/下添加java.sql.Driver文件,并将jar包添加到依赖中

lib

编写Test.java进行测试

public class Test {

    public static void main(String[] args) throws SQLException {
        ServiceLoader<Driver> loader = ServiceLoader.load(Driver.class);
        Iterator<Driver> iterator = loader.iterator();
        while (iterator.hasNext()) {
            iterator.next().connect(null, null);
        }
    }

}

那么,理想情况下,应该是两个实现类都被加载,那么console应该打印

A Impl connector ......

B Impl connector ......

那么实际情况呢?

跟预料的一样。

其实mysql和oracle等的驱动包等等也是这样做的。

//file: java.sql.Driver
com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver

我认为提供SPI这种机制,可以实现良好的热插拔,加入减少jar包,无需修改代码,提供动态的服务发现。

二、Dubbo中的SPI机制

dubbo针对SPI机制实现了自己的服务发现类ExtensionLoader用以加载SPI的实现类。它会加载META-INF/dubbo/internalMETA-INF/dubboMETA-INF/services下的以SPI接口名命名的文件中的实现类。

然后根据自定义,或者javasist/jdk生成的适配类来调用这些实现类。这里不对其进行多讲,想了解的可以百度dubbo spi机制和查看ExtensionLoader的实现。

三、protocol.export()

因为我们没有提供默认的适配类,因此生成的Protocol实例为javasist生成的Protocol$Adapter适配类,它最终会调用根据我们url协议对应的Protocol实现类,因为当前我们的invoker的URL协议为registry所以应该调用RegistryPortocol的export()方法。为什么说最终呢?因为它会被构造方法为此SPI的SPI实现类所包装。

Exporter<?> exporter = protocol.export(wrapperInvoker);
  • #1 在本地暴露服务
  • #2 与注册中心建立连接获取Registry实例,一般上用的是项目中用的是zookeeper,这里就是ZookeeperRegistry
  • #3 向注册中心注册服务
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);  #1

    URL registryUrl = getRegistryUrl(originInvoker);

    //registry provider
    final Registry registry = getRegistry(originInvoker);  #2
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    boolean register = registedProviderUrl.getParameter("register", true);

    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

    if (register) {
        register(registryUrl, registedProviderUrl);  #3
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

在此方法中,invoker的协议为dubbo,因为会调用DubboRegistry的export()方法,将暴露后返回的exporter缓存起来。

@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);  
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}
  • #1 将服务名+端口号作为key,netty服务端会根据key来调用对应的exporter,以调用invoker
  • #2 构建exporter对象,并缓存到exporterMap,以便后续进行选择调用。
  • #3 是否为本地存根
  • #4 启动netty服务端,开始监听。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();  

    // export service.
    String key = serviceKey(url);  #1
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); #2
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event  #3
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);    #4
    optimizeSerialization(url);
    return exporter;
}

完成以上工作后,netty客户端就可以根据key,在服务端找到对应的exporter,然后根据客户端传入的接口名方法名,参数等信息,调用invoker中ref对应的方法,然后服务端将结果返回给客户端就ok了!

  • #1 获取url中ip:port for example : 127.0.0.1:20880
  • #2 判断是客户端还是服务端
  • #3 没有服务端的话,就创建一个.
  • #4 如果已经存在服务端,则需要根据url中的心跳,和心跳超时时间等参数reset服务端。
private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {    #2
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));  #3
            } else {
                // server supports reset, use together with override
                server.reset(url);   #4
            }
        }
    }

server = Exchangers.bind(url, requestHandler); 的作用是启动netty服务端,它的ChannelHandler逻辑是在requestHandler中

private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler); ####
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

reply()是对入站数据进行处理的方法,也就是客户端传给我们的要调用的类,方法的信息。

message已被解码为Invocation接口类型,invocation封装了获取方法名,参数类型,参数...等方法。

  • #1 从exporterMap中根据接口名和端口作为key获取exporter对象中的invoker以用来调用。
  • #2 如果是回调服务,则会检验此方法是否存在
  • #3 进行实际方法的调用,将返回结果用RpcResult封装,编码后返回给客户端。就完成了远程调用的操作
@Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);   #1
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {  #2
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);  #3
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编码小白

tomcat请求处理分析(六)servlet的处理过程

1.1.1.1  servlet的解析过程 servlet的解析分为两步实现,第一个是匹配到对应的Wrapper,第二个是加载对应的servlet并进行数据,这...

6587
来自专栏岑玉海

Carbondata源码系列(一)文件生成过程

在滴滴的两年一直在加班,人也变懒了,就很少再写博客了,最近在进行Carbondata和hive集成方面的工作,于是乎需要对Carbondata进行深入的研究。 ...

5366
来自专栏跟着阿笨一起玩NET

以读取博客园随笔备份为例 将xml 序列化成json,再序列化成对象

资源下载:http://files.cnblogs.com/codealone/ConsoleApplication2.zip

691
来自专栏Java成神之路

Java企业微信开发_05_消息推送之被动回复消息

微信加解密包 下载地址:http://qydev.weixin.qq.com/java.zip      ,此包中封装好了AES加解密方法,直接调用方法即可。

2402
来自专栏后端之路

Dubbo优雅服务降级之Stub和回声服务

上篇Dubbo优雅服务降级之mock描述了关于mock的细节。此篇就详述一下关于Stub的实现。 在dubbo的官方文档中写道 Mock是Stub的一个子集,...

2939
来自专栏大内老A

Enterprise Library深入解析与灵活应用(6):自己动手创建迷你版AOP框架

基于Enterprise Library PIAB的AOP框架已经在公司项目开发中得到广泛的使用,但是最近同事维护一个老的项目,使用到了Enterprise L...

1998
来自专栏屈定‘s Blog

造轮子--Excel报表工具

由于公司内部之前对于excel封装操作并不是很方便,而且对于特殊的需求不是很容易满足,这个月的任务是迁移部分业务小报表顺便重构下,因此这里造个轮子,便于导入和导...

1473
来自专栏Java学习网

常见的 Java 错误及避免方法之第五集(每集10个错误后续持续发布)

当输入期间意外终止文件或流时,将抛出“EOFException”。 以下是抛出EOFException异常的一个示例,来自JavaBeat应用程序:

1193
来自专栏Android 开发学习

JsBridge 源码分析

1403
来自专栏后端之路

为什么说dubbo的声明式缓存不好用!!!

那么通常提供缓存的目的是什么呢? 关于两级缓存的说明 通常为了更快的速度(以及一定的稳定性) 那么dubbo中的实现是通过filter机制基本上来缓存了我们需要...

1.5K5

扫码关注云+社区