前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo学习(四)provider服务发布

dubbo学习(四)provider服务发布

作者头像
虞大大
发布2020-10-09 15:28:42
1.1K0
发布2020-10-09 15:28:42
举报
文章被收录于专栏:码云大作战码云大作战

一、服务端provider发布

根据dubbo启动日志,provider的发布动作为以下几个步骤:

(1)暴露本地服务

Export dubbo service com.ywl.dubbo.TestApi to local registry, dubbo version: 2.0.0, current host: 127.0.0.1。

(2)暴露远程服务

Export dubbo service com.ywl.dubbo.TestApi to url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi...后面省略。

(3)启动netty

Start NettyClient yuwenlei.local/192.168.24.69 connect to the server /192.168.1.100:20041, dubbo version: 2.0.0, current host: 192.168.24.69。

(4)打开zk

Opening socket connection to server dailyzk.webuy.ai/192.168.49.11:2181。

(5)注册provider服务到zk

Register dubbo service com.ywl.dubbo.TestApi url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi? ...中间省略。

to registry registry://dailyzk.webuy.ai:7005/org.apache.dubbo.registry.RegistryService? ...后面省略。

(6)监听zk(订阅与通知)

Subscribe: provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。

Notify urls for subscribe url provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。

· 服务发布的目的

解析dubbo-provider.xml中的接口。将服务提供者向注册中心注册服务,以便服务消费者从注册中心查询并调用服务。

代码语言:javascript
复制
<dubbo:service interface="com.ywl.dubbo.TestApi" ref="testApi" retries="0"
               cluster="failfast" timeout="3000"/>

二、provider发布原理探索

上一篇文章说明了dubbo.xml文件中的自定义元素都是通过schema来进行解析。解析service元素后会形成一个ServiceBean。而SerivceBean实现了ApplicationListener<ContextRefreshedEven>接口,该接口的目的为上下文刷新监听(即当TestApi的bean被初始化或刷新时,该事件被激活,执行实现类方法),dubbo也是在该实现方法中暴露服务。

· doExport

org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent—>

org.apache.dubbo.config.ServiceConfig#export—>

org.apache.dubbo.config.ServiceConfig#doExportUrls

暴露服务的代码如下:

代码语言:javascript
复制
private void doExportUrls() {    //加载注册中心配置    List<URL> registryURLs = loadRegistries(true);
    //遍历dubbo协议,默认采用的是dubbo协议 即tcp协议    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

上述代码在暴露服务时,首先加载注册中心配置,然后根据dubbo协议进行遍历来暴露服务。

代码语言:javascript
复制
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//...//获取服务作用域String scope = url.getParameter(Constants.SCOPE_KEY);
    
    if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
        //暴露本地服务
        if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        //暴露远程服务
        if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
           //...
           Exporter<?> exporter = protocol.export(wrapperInvoker);
           exporters.add(exporter);
        }
    }
    //...
}

上述代码中属于暴露服务中的核心代码,根据配置的scope判断暴露本地服务还是暴露远程服务。如果没有配置,则默认先暴露本地服务,再暴露远程服务。因此doExport方法中主要做的事情就是暴露本地服务和暴露远程服务。

暴露本地服务和远程服务的区别:

(1)暴露本地服务表示在同一个JVM中,不用通过远程通信来调用。即,在同一个服务中,可以自己调用自己的接口。

(2)暴露远程服务表示暴露给远程客户端IP和端口号,需要通过远程通信来调用。

· 暴露本地服务

代码语言:javascript
复制
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {        //将远程暴露的URL协议 指定为本地暴露的Url协议
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(LOCALHOST).setPort(0);        //保存dubbo api-ref的calss,是一个简单的单例实现        ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));        Exporter<?> exporter = protocol.export(                //将TestApi封装成Invoker接口,进行暴露
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        //日志打印        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
    }
}

ProxyFactory:

代码语言:javascript
复制
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

ProxyFactory通过spi机制进行加载,默认编译方式为javassist,在getProxy和getInvoker方法被@Adaptive注解修饰,因此ProxyFactory会新生成一个adaptive动态代理类。

getInvoker,针对服务端,将服务对象,TestApiImpl包装成一个Invoker对象。

getProxy,针对客户端,将TestApi接口创建成一个动态代理对象。

代码语言:javascript
复制
public class JavassistProxyFactory extends AbstractProxyFactory {
    @Override
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {        //创建代理对象        return (T) Proxy.getProxy(interfaces).newInstance                        (new InvokerInvocationHandler(invoker));
    }
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        //首先将接口封装成wrapper对象
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                //封装成invoker对象                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

Invoker是一个可执行的对象,能根据方法名、参数得到相应的返回结果。Invoker后面单独写一篇知识点来讲解。

protocol.export:

代码语言:javascript
复制
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
代码语言:javascript
复制
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;    //最终会将本地暴露的invoke接口信息 放到exporterMap缓存中。    exporterMap.put(key, this);
}

以上就是暴露本地服务的全部内容,总结:dubbo的provider-api接口被暴露在本地服务时,会被封装成invoke对象,最终进入injvmExproter类中,将本地需要暴露的invoke接口信息放入到exporterMap中,map的key为接口全路径名。

· 暴露远程服务

在执行export方法之前的原理和本地服务的暴露一样,会将api封装成invoker对象。远程服务的暴露的实现类为RegistryProtocol。

代码语言:javascript
复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //暴露远程服务
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //...
代码语言:javascript
复制
}
代码语言:javascript
复制
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);    //缓存判断    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                //调用dubboProtocol暴露远程服务                exporter = new ExporterChangeableWrapper<T>((Exporter<T>)
                    protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

步骤一:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export暴露远程服务

代码语言:javascript
复制
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    //key-api路径名+dubbo端口号
代码语言:javascript
复制
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);    //和本地远程服务一样,放到exportMap中    exporterMap.put(key, exporter);
    //...
    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

暴露远程服务的原理和暴露本地服务的原理相似,都会将api封装成invoker对象,最终进入dubboProtocol类中,将需要暴露的远程服务invoke接口信息放入到exporterMap中,map的key与本地服务不同的是key为接口全路径名+dubbo端口号。

步骤二org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer启动Server。

暴露远程服务后需要调用openServer方法创建并启动Server。具体源码分析如下:

代码语言:javascript
复制
private void openServer(URL url) {
    //从url信息中获取key - ip+端口
    String key = url.getAddress();
    //是否为客户端暴露的服务 默认为true
代码语言:javascript
复制
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {        //从缓存中获取服务
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {                    //创建服务 放到缓存中
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // 服务重置
            server.reset(url);
        }
    }
}
代码语言:javascript
复制
private ExchangeServer createServer(URL url) {
    //...    ExchangeServer server;
    try {        //服务信息交互 进入HeaderExchanger类
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    //...
    return server;
}

org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind

在openServer方法中会调用createServer方法创建一个信息交换层对象ExchangeServer。该对象最终会进入到HeaderExchanger类中进行初始化创建。

代码语言:javascript
复制
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url,            new DecodeHandler(new HeaderExchangeHandler(handler))));
}
代码语言:javascript
复制
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
//...//指定bindAddress0.0.0.0:20880
代码语言:javascript
复制
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    //指定accepts 默认为0    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    //指定idleTimeout 默认为600000    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
    try {        //暴露netty
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    //...
}

以上述代码为exchanger对象的封装,如url信息、handler信息、连接超时时间(我配置的timeOut为3000毫秒)、bindAddress、accepts、idleTimeout等信息。

exchanger实际是一个信息交互层。主要用于封装请求响应服务,同步转异步。

步骤三:org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)暴露netty服务。

在步骤二的openServer中在封装信息交互层exchanger对象时,存在doOpen方法,该方法的目的实际为暴露netty服务。

代码分析如下:

代码语言:javascript
复制
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}

上述代码主要设置了NioSercerSocketChannelFactory、boss的线程池信息、worker的线程池信息,以及设置了编解码handler信息。最后调用bootstrap.bind来暴露netty服务。

因此步骤三的transporter属于网络传输层,用来抽象netty的统一接口,暴露netty。

步骤四:org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeServer#startHeartbeatTimer dubbo的心跳机制

步骤二的最后会将信息交互层和网络传输层封装的信息构造成一个HeadExhcangeServer返回,并且在初始化HeadExchangeServer对象时,存在心跳机制的启动。具体代码如下:

代码语言:javascript
复制
private void startHeartbeatTimer() {    //先关闭原来的心跳定时器    stopHeartbeatTimer();
    if (heartbeat > 0) {        //开启一个心跳定时器
        heartbeatTimer = scheduled.scheduleWithFixedDelay(
                new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                    @Override
                    public Collection<Channel> getChannels() {
                        return Collections.unmodifiableCollection(
                                HeaderExchangeServer.this.getChannels());
                    }
                }, heartbeat, heartbeatTimeout),
                heartbeat, heartbeat, TimeUnit.MILLISECONDS);
    }
}

心跳定时器的目的是检测provider和consumer之间的连接是否有效,如果连接断了,需要作出响应的处理。

provider:如上图源码所示heartbeat设置了60s,heatbeatTimeOut为(180s),表示如果在60秒内没接受到消息,就会发送心跳消息,如果连着3次没有收到心跳响应,则会关闭连接channel。

consumer:表示如果在60秒内如果没有接收到消息,就会发送心跳消息,如果连着3次没有收到心跳响应,则尝试重连。

心跳线程池任务原理代码如下:

代码语言:javascript
复制
public void run() {
    try {
        long now = System.currentTimeMillis();
        for (Channel channel : channelProvider.getChannels()) {
            if (channel.isClosed()) {
                continue;
            }
try {                //获取最后一次读操作的时间
                Long lastRead = (Long) channel.getAttribute(
                HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                //获取最后一次写操作的时间                Long lastWrite = (Long) channel.getAttribute(
                HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                //如果在heartbeat时间内没有操作读操作 或 写操作,则发送心跳请求                if ((lastRead != null && now - lastRead > heartbeat)
                        || (lastWrite != null && now - lastWrite > heartbeat)) {
                    Request req = new Request();
                    req.setVersion(Version.getProtocolVersion());
                    req.setTwoWay(true);
                    req.setEvent(Request.HEARTBEAT_EVENT);
                    channel.send(req);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                    }
}                //正常消息和心跳在heartbeatTimeout设置的时间内都没接收到的话,进入if
                if (lastRead != null && now - lastRead > heartbeatTimeout) {
                    logger.warn("Close channel " + channel
                            + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                    if (channel instanceof Client) {
                    try {                            //客户端则重连
                            ((Client) channel).reconnect();
                        } catch (Exception e) {
                            }
                        } else {                        //服务端关闭channel
                        channel.close();
                    }
                }
            } catch (Throwable t) {
                logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
            }
        }
    } catch (Throwable t) {
        logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
    }
}

三、provider发布总结

在ServiceBean初始化后监听到了spring刷新事件开始发布dubbo-provider服务,根据配置开始进行本地服务发布和远程服务发布,两者服务发布的原理有相似处,将api对象封装成invoker对象,本地服务发布的invoker对象会被封装为InjvmExporter对象放到exportMap中key为api的全路径名,远程服务发布的invoker对象会被封装为dubboExporter对象放到exportMap中并且key为api的全路径名+端口号来做区分。

远程服务发布好后,则会封装信息交换层exchanger对象和网络传输层transporter对象,在网络传输层对象的封装时,会调用doOpen方法来暴露netty服务。最后exchanger和transporter对象都会被封装成一个HeaderExchangeServer服务对象,并且初始化中会开启心跳机制的定时器,来管理服务端和客户端的心跳重连。

由于篇幅有限,本章只写了服务发布的暴露服务和暴露netty的原理,后面的打开zk、注册zk、监听zk放到后续的篇幅中。

补充:

在本地服务暴露和远程服务暴露时,都使用了protocol.export来暴露对象。Protocol协议可以在dubbo-provider.xml中被配置,如果没配置则默认使用http协议。

Protocol接口中存在两个关键方法:

(1)export:用于服务端暴露远程服务,实际上是将invoker对象通过协议暴露给外部。

(2)refer:用于客户端引用远程服务。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码云大作战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档