首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码分析Dubbo服务提供者启动流程-下篇

源码分析Dubbo服务提供者启动流程-下篇

作者头像
丁威
发布2019-06-10 17:33:18
7470
发布2019-06-10 17:33:18
举报
文章被收录于专栏:中间件兴趣圈中间件兴趣圈

微信公众号:[中间件兴趣圈] 作者简介:《RocketMQ技术内幕》作者;

本文继续上文Dubbo服务提供者启动流程,在上篇文章中详细梳理了基于dubbo spring文件的配置方式,Dubbo是如何加载配置文件,服务提供者dubbo:service标签服务暴露全流程,本节重点关注RegistryProtocol#export中调用doLocalExport方法,根据服务暴露协议建立网络通讯服务器,在特定端口建立监听,监听来自消息消费端服务的请求。

RegistryProtocol#doLocalExport:

 1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
 2        String key = getCacheKey(originInvoker);
 3        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
 4        if (exporter == null) {
 5            synchronized (bounds) {
 6                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
 7                if (exporter == null) {
 8                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));   // @1
 9                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);    // @2
10                    bounds.put(key, exporter);
11                }
12            }
13        }
14        return exporter;
15    }

代码@1:如果服务提供者以dubbo协议暴露服务,getProviderUrl(originInvoker)返回的URL将以dubbo://开头。 代码@2:根据Dubbo内置的SPI机制,将调用DubboProtocol#export方法。

源码分析DubboProtocol#export
 1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
 2        URL url = invoker.getUrl();     // @1
 3        // export service.
 4        String key = serviceKey(url);      // @2
 5        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
 6        exporterMap.put(key, exporter);
 7
 8        //export an stub service for dispatching event
 9        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);    //@3  start
10        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);                                                  
11        if (isStubSupportEvent && !isCallbackservice) {                                                                                                                        
12            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
13            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
14                if (logger.isWarnEnabled()) {
15                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
16                            "], has set stubproxy support event ,but no stub methods founded."));
17                }
18            } else {
19                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);                                                                      
20            }
21        }   // @3 end
22
23        openServer(url);   // @4
24        optimizeSerialization(url);  // @5
25        return exporter;                
26    }

代码@1:获取服务提供者URL,以协议名称,这里是dubbo://开头。 代码@2:从服务提供者URL中获取服务名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880。 代码@3:是否将转发事件导出成stub。 代码@4:根据url打开服务,下面将详细分析其实现。 代码@5:根据url优化器序列化方式。

源码分析DubboProtocol#openServer
 1private void openServer(URL url) {
 2        // find server.
 3        String key = url.getAddress();    // @1
 4        //client can export a service which's only for server to invoke
 5        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
 6        if (isServer) {
 7            ExchangeServer server = serverMap.get(key);           // @2
 8            if (server == null) {
 9                serverMap.put(key, createServer(url));                    //@3
10            } else {
11                // server supports reset, use together with override
12                server.reset(url);                                                       //@4
13            }
14        }
15    }

代码@1:根据url获取网络地址:ip:port,例如:192.168.56.1:20880,服务提供者IP与暴露服务端口号。 代码@2:根据key从服务器缓存中获取,如果存在,则执行代码@4,如果不存在,则执行代码@3。 代码@3:根据URL创建一服务器,Dubbo服务提供者服务器实现类为ExchangeServer。 代码@4:如果服务器已经存在,用当前URL重置服务器,这个不难理解,因为一个Dubbo服务中,会存在多个dubbo:service标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务。

源码分析DubboProtocol#createServer
 1private ExchangeServer createServer(URL url) {
 2        // send readonly event when server closes, it's enabled by default
 3        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());    // @1
 4        // enable heartbeat by default
 5        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));     // @2
 6        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);  // @3
 7
 8        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))    // @4
 9            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10
11        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);       // @5
12        ExchangeServer server;
13        try {
14            server = Exchangers.bind(url, requestHandler);    // @6
15        } catch (RemotingException e) {
16            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17        }
18        str = url.getParameter(Constants.CLIENT_KEY);     //@7
19        if (str != null && str.length() > 0) {
20            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21            if (!supportedTypes.contains(str)) {
22                throw new RpcException("Unsupported client type: " + str);
23            }
24        }
25        return server;
26    }

代码@1:为服务提供者url增加channel.readonly.sent属性,默认为true,表示在发送请求时,是否等待将字节写入socket后再返回,默认为true。 代码@2:为服务提供者url增加heartbeat属性,表示心跳间隔时间,默认为60*1000,表示60s。 代码@3:为服务提供者url增加server属性,可选值为netty,mina等等,默认为netty。 代码@4:根据SPI机制,判断server属性是否支持。 代码@5:为服务提供者url增加codec属性,默认值为dubbo,协议编码方式。 代码@6:根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例。requestHandler的实现具体在以后详细分析Dubbo服务调用时再详细分析。 代码@7:验证客户端类型是否可用。

源码分析Exchangers.bind

根据URL、ExchangeHandler构建服务器

 1public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
 2        if (url == null) {
 3            throw new IllegalArgumentException("url == null");
 4        }
 5        if (handler == null) {
 6            throw new IllegalArgumentException("handler == null");
 7        }
 8        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
 9        return getExchanger(url).bind(url, handler);
10    }

上述代码不难看出,首先根据url获取Exchanger实例,然后调用bind方法构建ExchangeServer,Exchanger接口如下

  • ExchangeServer bind(URL url, ExchangeHandler handler) : 服务提供者调用。
  • ExchangeClient connect(URL url, ExchangeHandler handler):服务消费者调用。

dubbo提供的实现类为:HeaderExchanger,其bind方法如下:

HeaderExchanger#bind

1public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
3}

从这里可以看出,端口的绑定由Transporters的bind方法实现。

源码分析Transporters.bind方法
 1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
 2        if (url == null) {
 3            throw new IllegalArgumentException("url == null");
 4        }
 5        if (handlers == null || handlers.length == 0) {
 6            throw new IllegalArgumentException("handlers == null");
 7        }
 8        ChannelHandler handler;
 9        if (handlers.length == 1) {
10            handler = handlers[0];
11        } else {
12            handler = new ChannelHandlerDispatcher(handlers);
13        }
14        return getTransporter().bind(url, handler);
15    }
16
17public static Transporter getTransporter() {
18        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
19}

从这里得知,Dubbo网络传输的接口有Transporter接口实现,其继承类图所示:

本文以netty版本来查看一下Transporter实现。

NettyTransporter源码如下:

 1public class NettyTransporter implements Transporter {
 2
 3    public static final String NAME = "netty";
 4
 5    @Override
 6    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
 7        return new NettyServer(url, listener);
 8    }
 9
10    @Override
11    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
12        return new NettyClient(url, listener);
13    }
14}

NettyServer建立网络连接的实现方法为:

 1protected void doOpen() throws Throwable {
 2        NettyHelper.setNettyLoggerFactory();
 3        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
 4        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
 5        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
 6        bootstrap = new ServerBootstrap(channelFactory);
 7
 8        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);      // @1
 9        channels = nettyHandler.getChannels();
10        // https://issues.jboss.org/browse/NETTY-365
11        // https://issues.jboss.org/browse/NETTY-379
12        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
13        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
14            @Override
15            public ChannelPipeline getPipeline() {
16                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
17                ChannelPipeline pipeline = Channels.pipeline();
18                /*int idleTimeout = getIdleTimeout();
19                if (idleTimeout > 10000) {
20                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
21                }*/
22                pipeline.addLast("decoder", adapter.getDecoder());
23                pipeline.addLast("encoder", adapter.getEncoder());
24                pipeline.addLast("handler", nettyHandler);     // @2
25                return pipeline;
26            }
27        });
28        // bind
29        channel = bootstrap.bind(getBindAddress());
30    }

熟悉本方法需要具备Netty的知识,有关源码:阅读Netty系列文章,这里不对每一行代码进行解读,对于与网络相关的参数,将在后续文章中详细讲解,本方法@1、@2引起了我的注意,首先创建NettyServer必须传入一个服务提供者URL,但从DubboProtocol#createServer中可以看出,Server是基于网络套接字(ip:port)缓存的,一个JVM应用中,必然会存在多个dubbo:server标签,就会有多个URL,这里为什么可以这样做呢?从DubboProtocol#createServer中可以看出,在解析第二个dubbo:service标签时并不会调用createServer,而是会调用Server#reset方法,是不是这个方法有什么魔法,在reset方法时能将URL也注册到Server上,那接下来分析NettyServer#reset方法是如何实现的。

源码分析DdubboProtocol#reset

reset方法最终将用Server的reset方法,同样还是以netty版本的NettyServer为例,查看reset方法的实现原理。NettyServer#reset->父类(AbstractServer)

AbstractServer#reset

 1public void reset(URL url) {
 2        if (url == null) {
 3            return;
 4        }
 5        try {                                                                                                       // @1 start
 6            if (url.hasParameter(Constants.ACCEPTS_KEY)) {
 7                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
 8                if (a > 0) {
 9                    this.accepts = a;
10                }
11            }
12        } catch (Throwable t) {
13            logger.error(t.getMessage(), t);
14        }
15        try {
16            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
17                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
18                if (t > 0) {
19                    this.idleTimeout = t;
20                }
21            }
22        } catch (Throwable t) {
23            logger.error(t.getMessage(), t);
24        }
25        try {
26            if (url.hasParameter(Constants.THREADS_KEY)
27                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
28                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
29                int threads = url.getParameter(Constants.THREADS_KEY, 0);
30                int max = threadPoolExecutor.getMaximumPoolSize();
31                int core = threadPoolExecutor.getCorePoolSize();
32                if (threads > 0 && (threads != max || threads != core)) {
33                    if (threads < core) {
34                        threadPoolExecutor.setCorePoolSize(threads);
35                        if (core == max) {
36                            threadPoolExecutor.setMaximumPoolSize(threads);
37                        }
38                    } else {
39                        threadPoolExecutor.setMaximumPoolSize(threads);
40                        if (core == max) {
41                            threadPoolExecutor.setCorePoolSize(threads);
42                        }
43                    }
44                }
45            }
46        } catch (Throwable t) {
47            logger.error(t.getMessage(), t);
48        }              // @1 end
49        super.setUrl(getUrl().addParameters(url.getParameters()));    // @2
50    }

代码@1:首先是调整线程池的相关线程数量,这个好理解。、 代码@2:然后设置调用setUrl覆盖原先NettyServer的private volatile URL url的属性,那为什么不会影响原先注册的dubbo:server呢? 原来NettyHandler上加了注解:@Sharable,由该注解去实现线程安全。

Dubbo服务提供者启动流程将分析到这里了,本文并未对网络细节进行详细分析,旨在梳理出启动流程,有关Dubbo服务网络实现原理将在后续章节中详细分析,敬请期待。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-02-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 中间件兴趣圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源码分析DubboProtocol#export
  • 源码分析DubboProtocol#openServer
  • 源码分析DubboProtocol#createServer
  • 源码分析Exchangers.bind
  • 源码分析Transporters.bind方法
  • 源码分析DdubboProtocol#reset
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档