dubbo源码——服务提供者的服务暴露过程(三) 完~

上篇博客讲到了Protocol接口的export()方法。

在这篇博客讲解前,希望读者可以先了解一下SPI机制:SPI(Service Provider Interface)服务提供者接口是提供给服务厂商,或者框架扩展者的接口,例如JDK中的java.sql.Driver,dubbo中的com.alibaba.dubbo.rpc.Protocol等等。

一、SPI机制

不同服务厂商只需对接口进行自己的实现,然后在META-INF下services目录中创建以接口全限定名为名字的文件,然后写入自己的实现类的全限定名即可。

Java提供了一种服务发现类:Serviceloader,下面是一个小例子。

对java.sql.Driver接口添加两个实现类AImplDriver,BImplDriver

AImplDriver
BImplDriver

将两个实现类打包为jar,并且在META-INF/services/下添加java.sql.Driver文件,并将jar包添加到依赖中

lib

编写Test.java进行测试

public class Test {

    public static void main(String[] args) throws SQLException {
        ServiceLoader<Driver> loader = ServiceLoader.load(Driver.class);
        Iterator<Driver> iterator = loader.iterator();
        while (iterator.hasNext()) {
            iterator.next().connect(null, null);
        }
    }

}

那么,理想情况下,应该是两个实现类都被加载,那么console应该打印

A Impl connector ......

B Impl connector ......

那么实际情况呢?

跟预料的一样。

其实mysql和oracle等的驱动包等等也是这样做的。

//file: java.sql.Driver
com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver

我认为提供SPI这种机制,可以实现良好的热插拔,加入减少jar包,无需修改代码,提供动态的服务发现。

二、Dubbo中的SPI机制

dubbo针对SPI机制实现了自己的服务发现类ExtensionLoader用以加载SPI的实现类。它会加载META-INF/dubbo/internalMETA-INF/dubboMETA-INF/services下的以SPI接口名命名的文件中的实现类。

然后根据自定义,或者javasist/jdk生成的适配类来调用这些实现类。这里不对其进行多讲,想了解的可以百度dubbo spi机制和查看ExtensionLoader的实现。

三、protocol.export()

因为我们没有提供默认的适配类,因此生成的Protocol实例为javasist生成的Protocol$Adapter适配类,它最终会调用根据我们url协议对应的Protocol实现类,因为当前我们的invoker的URL协议为registry所以应该调用RegistryPortocol的export()方法。为什么说最终呢?因为它会被构造方法为此SPI的SPI实现类所包装。

Exporter<?> exporter = protocol.export(wrapperInvoker);
  • #1 在本地暴露服务
  • #2 与注册中心建立连接获取Registry实例,一般上用的是项目中用的是zookeeper,这里就是ZookeeperRegistry
  • #3 向注册中心注册服务
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);  #1

    URL registryUrl = getRegistryUrl(originInvoker);

    //registry provider
    final Registry registry = getRegistry(originInvoker);  #2
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    boolean register = registedProviderUrl.getParameter("register", true);

    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

    if (register) {
        register(registryUrl, registedProviderUrl);  #3
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

在此方法中,invoker的协议为dubbo,因为会调用DubboRegistry的export()方法,将暴露后返回的exporter缓存起来。

@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);  
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}
  • #1 将服务名+端口号作为key,netty服务端会根据key来调用对应的exporter,以调用invoker
  • #2 构建exporter对象,并缓存到exporterMap,以便后续进行选择调用。
  • #3 是否为本地存根
  • #4 启动netty服务端,开始监听。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();  

    // export service.
    String key = serviceKey(url);  #1
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); #2
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event  #3
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);    #4
    optimizeSerialization(url);
    return exporter;
}

完成以上工作后,netty客户端就可以根据key,在服务端找到对应的exporter,然后根据客户端传入的接口名方法名,参数等信息,调用invoker中ref对应的方法,然后服务端将结果返回给客户端就ok了!

  • #1 获取url中ip:port for example : 127.0.0.1:20880
  • #2 判断是客户端还是服务端
  • #3 没有服务端的话,就创建一个.
  • #4 如果已经存在服务端,则需要根据url中的心跳,和心跳超时时间等参数reset服务端。
private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {    #2
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));  #3
            } else {
                // server supports reset, use together with override
                server.reset(url);   #4
            }
        }
    }

server = Exchangers.bind(url, requestHandler); 的作用是启动netty服务端,它的ChannelHandler逻辑是在requestHandler中

private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler); ####
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

reply()是对入站数据进行处理的方法,也就是客户端传给我们的要调用的类,方法的信息。

message已被解码为Invocation接口类型,invocation封装了获取方法名,参数类型,参数...等方法。

  • #1 从exporterMap中根据接口名和端口作为key获取exporter对象中的invoker以用来调用。
  • #2 如果是回调服务,则会检验此方法是否存在
  • #3 进行实际方法的调用,将返回结果用RpcResult封装,编码后返回给客户端。就完成了远程调用的操作
@Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);   #1
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {  #2
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);  #3
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java架构沉思录

Spring Boot是如何实现自动配置的

Spring Boot 是 Spring 旗下众多的子项目之一,其理念是约定优于配置,它通过实现了自动配置(大多数用户平时习惯设置的配置作为默认配置)的功能来为...

953
来自专栏微信公众号:Java团长

Spring Boot 自动配置的“魔法”是如何实现的?

Spring Boot是Spring旗下众多的子项目之一,其理念是约定优于配置,它通过实现了自动配置(大多数用户平时习惯设置的配置作为默认配置)的功能来为用户快...

922
来自专栏菩提树下的杨过

基于spring boot 2.x的websocket示例

spring boot 2/spring 5自带了websocket,下面是最基本的示例(包括java服务端、java客户端以及js客户端)

772
来自专栏史上最简单的Spring Cloud教程

深入理解Feign之源码解析

什么是Feign Feign是受到Retrofit,JAXRS-2.0和WebSocket的影响,它是一个jav的到http客户端绑定的开源项目。 Feign的...

7017
来自专栏开发技术

flying-saucer + iText + Freemarker实现pdf的导出, 支持中文、css以及图片

      项目中有个需求,需要将合同内容导出成pdf。上网查阅到了 iText , iText 是一个生成PDF文档的开源Java库,能够动态的从XML或者数...

841
来自专栏Java成神之路

Java企业微信开发_04_消息推送之发送消息(主动)

(1)流程不同:发送消息是第三方服务器主动通知微信服务器向用户发消息。而被动回复消息是 用户发送消息之后,微信服务器将消息传递给 第三方服务器,第三方服务器接收...

1315
来自专栏java工会

Spring Boot 自动配置的 “魔法” 是如何实现的?

Spring Boot是Spring旗下众多的子项目之一,其理念是约定优于配置,它通过实现了自动配置(大多数用户平时习惯设置的配置作为默认配置)的功能来为用户快...

620
来自专栏大大的微笑

java使用mina和websocket通信

这里以mina整合springMVC为例: //springMVC的配置: <!-- mina --> <bean class="org.spring...

91510
来自专栏阿杜的世界

Spring Boot with Redis

Spring Boot是为了简化Spring开发而生,从Spring 3.x开始,Spring社区的发展方向就是弱化xml配置文件而加大注解的戏份。最近召开的S...

762
来自专栏老码农专栏

在Actframework中使用依赖注入

933

扫码关注云+社区