前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java学习笔记——dubbo服务之底层通讯协议Protocol

Java学习笔记——dubbo服务之底层通讯协议Protocol

作者头像
慕容千语
发布2019-06-11 23:26:20
1.1K0
发布2019-06-11 23:26:20
举报

我们先来找到通讯协议的入口点吧。通过Protocol接口查找通讯协议入口点,我们根据接口的export方法搜索发现入口了,在ServiceConfig的doExportUrlsFor1Protocol方法,如下图:

然后我们进入 protocol.export(invoker)方法发现有很多实现类,根据spi(不懂的请看之前写的容器篇)查看配置文件能找到如下

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol

dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol //这个是默认的,我们在Protocol接口上可以看到spi的注解

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper

listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper

mock=com.alibaba.dubbo.rpc.support.MockProtocol

injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol

rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol

hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol

com.alibaba.dubbo.rpc.protocol.http.HttpProtocol

com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol

thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol

memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol

redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol

rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol

进入DubboProtocol.export(Invoker<T> invoker)方法里面有个 openServer(url);

代码:

代码语言:javascript
复制
  private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也可以暴露一个只有server可以调用的服务。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
        serverMap.put(key, createServer(url)); //createServer是创建服务
        } else {
        //server支持reset,配合override功能使用
        server.reset(url);
        }
        }
    }

继续进入createServer,上源码

代码语言:javascript
复制
    private ExchangeServer createServer(URL url) {
        //默认开启server关闭时发送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

dubbo从要暴漏的服务的URL中取得相关的配置(host,port等)进行服务端server的创建,同上面的server = Exchangers.bind(url, requestHandler) 正式创建服务。

所以基本的创建步骤是

export() --> openServer() --> createServer() --> server = Exchangers.bind(url, requestHandler);

我们进行来看 Exchangers.bind(url, requestHandler)

源码:

代码语言:javascript
复制
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
      if (url == null) {
          throw new IllegalArgumentException("url == null");
      }
      if (handler == null) {
          throw new IllegalArgumentException("handler == null");
      }
      url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
      return getExchanger(url).bind(url, handler);
  }
  然后通过getExchanger(url).bind(url, handler)的bing进入 HeaderExchanger类
  public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

在进入Transporters类的bing的

代码语言:javascript
复制
  public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
         if (url == null) {
             throw new IllegalArgumentException("url == null");
         }
         if (handlers == null || handlers.length == 0) {
             throw new IllegalArgumentException("handlers == null");
         }
         ChannelHandler handler;
         if (handlers.length == 1) {
             handler = handlers[0];
         } else {
             handler = new ChannelHandlerDispatcher(handlers);
         }
         return getTransporter().bind(url, handler);
     }

通过bing可以知道他讲调用:GrizzlyTransporter,MinaTransporter,NettyTransporter 通过spi默认是调用NettyTransporter

到这里我们基本明白dubbo的通讯默认是交给了netty来处理,

我们在看下doOPen方法

@Override

代码语言:javascript
复制
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
            ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
            ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
            bootstrap = new ServerBootstrap(channelFactory);
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            channels = nettyHandler.getChannels();
            // https://issues.jboss.org/browse/NETTY-365
            // https://issues.jboss.org/browse/NETTY-379
            // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    /*int idleTimeout = getIdleTimeout();
                    if (idleTimeout > 10000) {
                        pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                    }*/
                    pipeline.addLast("decoder", adapter.getDecoder());//解码
                    pipeline.addLast("encoder", adapter.getEncoder());//编码
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
            // bind
            channel = bootstrap.bind(getBindAddress());
        }

了解netty的同学,肯定早已习惯这个方法的写法,就是创建了netty的server嘛,到这里dubbo的服务创建完毕了,这个时候控制台见打印:

[DUBBO] Start NettyServer bind /0.0.0.0:20880, export /192.168.4.241:20880, dubbo version: 2.8.4, current host: 127.0.0.1

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.02.14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档