前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo源码——服务提供者的服务暴露过程(三) 完~

dubbo源码——服务提供者的服务暴露过程(三) 完~

原创
作者头像
冰枫
发布2018-06-15 21:58:18
1.7K0
发布2018-06-15 21:58:18
举报
文章被收录于专栏:冰枫

上篇博客讲到了Protocol接口的export()方法。

在这篇博客讲解前,希望读者可以先了解一下SPI机制:SPI(Service Provider Interface)服务提供者接口是提供给服务厂商,或者框架扩展者的接口,例如JDK中的java.sql.Driver,dubbo中的com.alibaba.dubbo.rpc.Protocol等等。

一、SPI机制

不同服务厂商只需对接口进行自己的实现,然后在META-INF下services目录中创建以接口全限定名为名字的文件,然后写入自己的实现类的全限定名即可。

Java提供了一种服务发现类:Serviceloader,下面是一个小例子。

对java.sql.Driver接口添加两个实现类AImplDriver,BImplDriver

AImplDriver
AImplDriver
BImplDriver
BImplDriver

将两个实现类打包为jar,并且在META-INF/services/下添加java.sql.Driver文件,并将jar包添加到依赖中

lib
lib

编写Test.java进行测试

代码语言:javascript
复制
public class Test {

    public static void main(String[] args) throws SQLException {
        ServiceLoader<Driver> loader = ServiceLoader.load(Driver.class);
        Iterator<Driver> iterator = loader.iterator();
        while (iterator.hasNext()) {
            iterator.next().connect(null, null);
        }
    }

}

那么,理想情况下,应该是两个实现类都被加载,那么console应该打印

A Impl connector ......

B Impl connector ......

那么实际情况呢?

跟预料的一样。

其实mysql和oracle等的驱动包等等也是这样做的。

代码语言:javascript
复制
//file: java.sql.Driver
com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver

我认为提供SPI这种机制,可以实现良好的热插拔,加入减少jar包,无需修改代码,提供动态的服务发现。

二、Dubbo中的SPI机制

dubbo针对SPI机制实现了自己的服务发现类ExtensionLoader用以加载SPI的实现类。它会加载META-INF/dubbo/internalMETA-INF/dubboMETA-INF/services下的以SPI接口名命名的文件中的实现类。

然后根据自定义,或者javasist/jdk生成的适配类来调用这些实现类。这里不对其进行多讲,想了解的可以百度dubbo spi机制和查看ExtensionLoader的实现。

三、protocol.export()

因为我们没有提供默认的适配类,因此生成的Protocol实例为javasist生成的Protocol$Adapter适配类,它最终会调用根据我们url协议对应的Protocol实现类,因为当前我们的invoker的URL协议为registry所以应该调用RegistryPortocol的export()方法。为什么说最终呢?因为它会被构造方法为此SPI的SPI实现类所包装。

代码语言:java
复制
Exporter<?> exporter = protocol.export(wrapperInvoker);
  • #1 在本地暴露服务
  • #2 与注册中心建立连接获取Registry实例,一般上用的是项目中用的是zookeeper,这里就是ZookeeperRegistry
  • #3 向注册中心注册服务
代码语言:javascript
复制
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);  #1

    URL registryUrl = getRegistryUrl(originInvoker);

    //registry provider
    final Registry registry = getRegistry(originInvoker);  #2
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    boolean register = registedProviderUrl.getParameter("register", true);

    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

    if (register) {
        register(registryUrl, registedProviderUrl);  #3
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

在此方法中,invoker的协议为dubbo,因为会调用DubboRegistry的export()方法,将暴露后返回的exporter缓存起来。

代码语言:javascript
复制
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);  
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}
  • #1 将服务名+端口号作为key,netty服务端会根据key来调用对应的exporter,以调用invoker
  • #2 构建exporter对象,并缓存到exporterMap,以便后续进行选择调用。
  • #3 是否为本地存根
  • #4 启动netty服务端,开始监听。
代码语言:javascript
复制
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();  

    // export service.
    String key = serviceKey(url);  #1
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); #2
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event  #3
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);    #4
    optimizeSerialization(url);
    return exporter;
}

完成以上工作后,netty客户端就可以根据key,在服务端找到对应的exporter,然后根据客户端传入的接口名方法名,参数等信息,调用invoker中ref对应的方法,然后服务端将结果返回给客户端就ok了!

  • #1 获取url中ip:port for example : 127.0.0.1:20880
  • #2 判断是客户端还是服务端
  • #3 没有服务端的话,就创建一个.
  • #4 如果已经存在服务端,则需要根据url中的心跳,和心跳超时时间等参数reset服务端。
代码语言:javascript
复制
private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {    #2
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));  #3
            } else {
                // server supports reset, use together with override
                server.reset(url);   #4
            }
        }
    }

server = Exchangers.bind(url, requestHandler); 的作用是启动netty服务端,它的ChannelHandler逻辑是在requestHandler中

代码语言:javascript
复制
private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler); ####
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

reply()是对入站数据进行处理的方法,也就是客户端传给我们的要调用的类,方法的信息。

message已被解码为Invocation接口类型,invocation封装了获取方法名,参数类型,参数...等方法。

  • #1 从exporterMap中根据接口名和端口作为key获取exporter对象中的invoker以用来调用。
  • #2 如果是回调服务,则会检验此方法是否存在
  • #3 进行实际方法的调用,将返回结果用RpcResult封装,编码后返回给客户端。就完成了远程调用的操作
代码语言:javascript
复制
@Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);   #1
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {  #2
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);  #3
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、SPI机制
  • 二、Dubbo中的SPI机制
  • 三、protocol.export()
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档