前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo源码之网络通信

Dubbo源码之网络通信

作者头像
spilledyear
发布2019-12-12 17:54:21
9320
发布2019-12-12 17:54:21
举报
文章被收录于专栏:小白鼠小白鼠

介绍了Dubbo通信流程,跟着源码调试过来的,如果有问题还请各位大佬指出

服务暴露将做哪些事情?

  1. 注册ZK,监听动态配置节点
  2. 开启Server端
  3. 创建代理服务
  4. Exporter -> Invoker -> proxyService

服务引用将做哪些事情?

  1. 注册ZK,监听动态配置节点、providr节点、路由节点
  2. 开启Client端
  3. 创建代理服务
  4. proxyService -> Invoker

客户端请求

代码语言:javascript
复制
ConsumerProxyService -> Invoker【DubboInvoker】 -> Exchanger【HeaderExchangeClient】 -> Transporter【NettyClient】 -> 编码 -> SEND-TO-SERVER (创建了DefaultFuture,Request带唯一标识)

服务端响应

代码语言:javascript
复制
解码 -> Transporter【NettyServer】-> 系列Handlers -> 线程池 -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback 

Exchanger

Exchangers

门面类,提供各种便捷方法,先通过SPI获取Exchanger,然后调用Exchanger的相关方法创建ExchangeServerExchangeClient

Exchanger

SPI接口,默认实现类HeaderExchanger,提供了两个快捷方法创建ExchangeServerExchangeClient

代码语言:javascript
复制
@SPI(HeaderExchanger.NAME)
public interface Exchanger {
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

ExchangeServer

Server端使用,默认实现类HeaderExchangeServer,内部调用Transporter开启Server服务

代码语言:javascript
复制
public interface ExchangeServer extends Server {
    Collection<ExchangeChannel> getExchangeChannels();

    ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}

ExchangeClient

Client端使用,默认实现类HeaderExchangeClient,核心request方法,内部调用Transporter发送请求

代码语言:javascript
复制
public interface ExchangeClient extends Client, ExchangeChannel {
}

ExchangeChannel

默认实现类 HeaderExchangeChannel,作为HeaderExchangeClient的一个属性

Transporter

Transporters

门面类,提供各种便捷方法,先通过SPI获取Transporter,然后调用Transporter的相关方法创建ServerClient

Transporter

SPI接口,默认实现类NettyTransporter,提供了两个快捷方法创建ServerClient

代码语言:javascript
复制
@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

public class NettyTransporter implements Transporter {
    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}

Server

Server端使用,默认实现类NettyServer,用于开启Server服务,核心方法doOpen

代码语言:javascript
复制
public class NettyServer extends AbstractServer implements Server {
}

Client

Client端使用,默认实现类NettyClient,核心request方法用于发送请求,doOpen用于与服务端建立连接

代码语言:javascript
复制
public class NettyClient extends AbstractClient {
}

服务端启动服务

代码语言:javascript
复制
DubboProtocol#export =>
DubboProtocol#openServer => 
DubboProtocol#createServer =>
Exchangers#bind => 
NettyServer#doOpen

最终,在NettyServer#doOpen中通过Netty开启了一个Server端

代码语言:javascript
复制
DubboProtocol#createServer
    => Exchangers#bind(url, requestHandler)
        => HeaderExchanger#bind(url, requestHandler)
            => return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))
                
// Transporters#bind 语句可以拆解为 
Transporters#bind
    => NettyTransporter#bind(url, handler)
        => return new NettyServer(url, handler)
            =>  NettyServer#doOpen【NettyServer构造函数中调用了doOpen方法】

NettyServer中的hander属性,最终指向的是new DecodeHandler(new HeaderExchangeHandler(handler))。最终Server端返回HeaderExchangeServer,然后在NettyServer的构造函数中,对handle其实还做了一些封装

代码语言:javascript
复制
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {}

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

所以,最终NettyServer中的hander属性指向MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler

客户端连接服务

调用链太长了,而且隐藏的非常深,重点省略了一些,在应用启动时为Reference对象生成Invoker时创建的

代码语言:javascript
复制
RegistryProtocol#doRefer =>
RegistryDirectory#subscribe =>
RegistryDirectory#toInvokers => 
ProtocolFilterWrapper#refer =>
AbstractProtocol#refer =>
DubboProtocol#protocolBindingRefer =>
DubboProtocol#getClients =>
DubboProtocol#getSharedClient =>
DubboProtocol#buildReferenceCountExchangeClientList =>
DubboProtocol#buildReferenceCountExchangeClient =>
DubboProtocol#initClient =>
Exchangers#connect =>
HeaderExchanger#connect =>
Transporters#connect =>
NettyTransporter#connect =>
NettyClient#<init> =>
NettyClient#doOpen

最终,在NettyClient#doOpen中通过Netty与Server建立连接

代码语言:javascript
复制
Exchangers#connect
    => HeaderExchanger#connect(url, handler)
        => return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
                
// Transporters#connect 语句可以拆解为 
Transporters#connect
    => NettyTransporter#connect(url, handler)
        => return new NettyClient(url, handler)
            =>  NettyClient#doOpen【NettyClient构造函数中调用了doOpen方法】

NettyClient中的hander属性,最终指向的是new DecodeHandler(new HeaderExchangeHandler(handler))。最终Client端返回HeaderExchangeClient,其中的client属性也对NettyClient做了包装处理

不过在DubboProtocol#buildReferenceCountExchangeClient方法中对HeaderExchangeClient包装了一层,最终Invoker中的Client类型是ReferenceCountExchangeClient

代码语言:javascript
复制
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    ExchangeClient exchangeClient = initClient(url);

    return new ReferenceCountExchangeClient(exchangeClient);
}

ReferenceCountExchangeClientHeaderExchangeClient没什么区别,只不过包装了一层,然后还有一个比较重要的属性referenceCount,用于记录客户端的个数?

客户端发送请求

代码语言:javascript
复制
调用方代理类 ->
InvokerInvocationHandler#invoke ->
MockClusterInvoker#invoke ->
AbstractClusterInvoker#invoke【获取LoadBalance】 -> 
FailoverClusterInvoker#doInvoke【处理重试次数】 ->
ProtocolFilterWrapper#invoke【处理Filter链路】 ->
AbstractInvoker#invoke【设置Attachments参数】 ->
DubboInvoker#doInvoke【Exchange交接层】 ->
ReferenceCountExchangeClient#request ->
HeaderExchangeClient#request ->
HeaderExchangeChannel#request【return CompletableFuture】 ->
AbstractPeer#send ->
AbstractClient#send ->
NettyChannel#send ->
Channel#writeAndFlush【发消息给服务端】

DubboInvoker#doInvoke开始与Exchange层交互,核心代码如下

代码语言:javascript
复制
protected Result doInvoke(final Invocation invocation) throws Throwable {
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    // return = false,即oneWay ,可以减少不必要的Future对象创建
    if (isOneway) {
        // send=true,即客户端发送之后再返回,否则直接返回
        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
        currentClient.send(inv, isSent);
        RpcContext.getContext().setFuture(null);
        return AsyncRpcResult.newDefaultAsyncResult(invocation);
    } else {
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
        CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
        asyncRpcResult.subscribeTo(responseFuture);
        RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
        return asyncRpcResult;
    }
}
代码语言:javascript
复制
ReferenceCountExchangeClient#request => 
HeaderExchangeClient#request =>  
HeaderExchangeChannel#request
代码语言:javascript
复制
// HeaderExchangeChannel.java
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

在这个方法中,有以下几个需要注意的点:

  1. Request构造函数内部,会为Request生成一个递增唯一的ID,用于标识该请求
  2. channel#send调用过程中,涉及到NettyChannel#getOrAddChannel方法的调用,NettyChannel中有一个ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP缓存,用于维护io.netty.channel.ChannelNettyChannel 的关系
  3. channel#send调用过程中,最终会调用到NettyChannel#send方法,该方法真正的将消息发给Server端
  4. 返回的DefaultFuture是一个CompletableFuture
代码语言:javascript
复制
// NettyChannel.java
public void send(Object message, boolean sent) throws RemotingException {
    boolean success = true;
    int timeout = 0;
    try {
        // 将消息发给Server
        ChannelFuture future = channel.writeAndFlush(message);
        if (sent) {
            // 如果配置了 send=true 参数,客户端需要等待消息发出之后再返回
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

从上面消息发送的流程中,好像没有看到对消息的编码工作,那是因为在Netty客户端初始化的时候,已经设置了编解码器

代码语言:javascript
复制
// NettyClient.java 
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .channel(NioSocketChannel.class);
    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);
            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}

先经过编码器,即InternalEncoder#encode方法,InternalEncoder实现了MessageToByteEncoder接口,该方法内部调用了Codec2的相关方法,而Codec2是一个SPI接口,默认实现DubboCodec

代码语言:javascript
复制
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}

服务端响应请求

上面提到了NettyServer中的hander属性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandlerNettyServer开启Server端的代码如下

代码语言:javascript
复制
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
  1. 先经过解码器,即InternalDecoder#decode方法,InternalDecoder实现了ByteToMessageDecoder接口,该方法内部调用了Codec2的相关方法,而Codec2是一个SPI接口,默认实现DubboCodec
代码语言:javascript
复制
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}
  1. MultiMessageHandler用于处理数组消息,如果是消息是MultiMessage类型,MultiMessage实现了Iterable数组,则遍历调用handle的received方法;否则直接调用下一个handle的received方法
  2. AllChannelHandler收到消息,将 channel handler message封装成state为ChannelState.RECEIVED类型的ChannelEventRunnable对象,然后交给线程池执行
  3. ChannelEventRunnable#run方法中判断state为ChannelState.RECEIVED类型,直接执行下一个handler的received方法,即DecodeHandler,这个过程是由线程池执行
  4. DecodeHandler#received方法中,如果消息是Decodeable类型,对整个消息进行解码;如果消息是Request类型,对Request.getData()进行解码;如果消息是Response类型,对Response.getResult()进行解码
  5. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleRequest -> requestHandler#replyrequestHandlerDubboProtocol中的一个属性,ExchangeHandlerAdapter类型
  6. HeaderExchangeHandler#handleRequest中会创建一个Response对象,它的ID属性值,就是Request对象的ID值,这样请求和响应就关联起来了
  7. requestHandler#reply方法中,从exporterMap缓存中获取对应的DubboExporter对象,然后从DubboExporter获取Invoker,最后执行Invoker#invoke方法,然后返回一个CompletableFuture对象
  8. HeaderExchangeHandler#handleRequest方法中接收返回的CompletableFuture对象,对它添加回调处理,在回调中将返回结果封装到Response对象中,然后通过channel将Response发出
代码语言:javascript
复制
// ChannelEventRunnable.java
public void run() {
    if (state == ChannelState.RECEIVED) {
        try {
            // RECEIVED 类型,直接执行下一个handle的received方法,即 DecodeHandler
            handler.received(channel, message);
        } catch (Exception e) {}
    } else {
        switch (state) {
        case CONNECTED:
            try {
                handler.connected(channel);
           } catch (Exception e) {}
            break;
        case DISCONNECTED:
            try {
                handler.disconnected(channel);
           } catch (Exception e) {}
            break;
        case SENT:
            try {
                handler.sent(channel, message);
            } catch (Exception e) {}
            break;
        case CAUGHT:
            try {
                handler.caught(channel, exception);
           } catch (Exception e) {}
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}
代码语言:javascript
复制
InternalDecoder#decode
    => NettyServerHandler#channelRead
        => AbstractPeer#received
            => MultiMessageHandler#received
                => HeartbeatHandler#received
                    => AllChannelHandler#received
                    
                    ------------------ 异步执行,放到线程池 ----------------------
                    => ChannelEventRunnable#run
                        => DecodeHandler#received
                            => DecodeHandler#decode
                                => DecodeableRpcInvocation#decode
                        => HeaderExchangeHandler#received
                            => HeaderExchangeHandler#handleRequest
                                => DubboProtocol.requestHandler#reply
                    ------------------ 异步执行 -----------------------

                                    ----------------扩展点-------------------
                                    => ProtocolFilterWrapper.invoke
                                    => EchoFilter.invoke
                                        => ClassLoaderFilter.invoke
                                        => GenericFilter.invoke
                                            => TraceFilter.invoke
                                            => MonitorFilter.invoke
                                                => TimeoutFilter.invoke
                                                => ExceptionFilter.invoke
                                                    => InvokerWrapper.invoke
                                    -----------------扩展点-------------------
                                                        => AbstractProxyInvoker#invoke
                                                            => JavassistProxyFactory.AbstractProxyInvoker#doInvoke
                                                                => 代理类#invokeMethod
                                                                    => 真正的service方法


                            //把接收处理的结果,数据发回consumer  future#whenComplete                                                              
                            => channel.send(response)
                                => HeaderExchangeChannel
                                    => NettyChannel.send
                                        => NioSocketChannel#writeAndFlush(message)                                                  

服务端发送结果

代码语言:javascript
复制
HeaderExchangeChannel#send =>
NettyChannel#send => 
NioSocketChannel#writeAndFlush(message) 

客户端响应结果

在客户端启动的时候,入参handler和服务端的handler是同一个

代码语言:javascript
复制
// DubboProtocol#initClient
Exchangers.connect(url, requestHandler);

// HeaderExchanger#connect
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect =>
    NettyTransporter#connect
        return NettyClient

NettyClient构造函数中,对handler做了包装

代码语言:javascript
复制
ChannelHandlers.wrap(handler, url)

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();
    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

所以,最终NettyClient中的handler属性指向 MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandler ,和服务端处理流程一样一样

  1. 接收消息,经过MultiMessageHandlerHeartbeatHandler 处理,到达 AllDispatcher
  2. AllChannelHandler中将消息封装成new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)类型,交由线程池执行
  3. 线程池执行任务,经过DecodeHandler到达HeaderExchangeHandler
  4. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#receivedDefaultFuture中维护了一个请求ID和DefaultFuture的映射关系,Request和Response通过请求ID可以一一对应
代码语言:javascript
复制
public static void received(Channel channel, Response response, boolean timeout) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
                t.cancel();
            }
            future.doReceived(response);
        } else {
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}
  1. 通过response.Id获取DefaultFuture
  2. 执行CompletableFuture#complete方法可以让 执行了CompletableFuture#get的用户线程得到响应,获取结果返回。至此整个调用过程完成

同步转异步

可是我们在代码中很多时候都是同步调用,很少自己去调用CompletableFuture#get方法,这一部分逻辑又是怎么处理的。在DubboInvoker#doInvoke方法中,返回的是一个AsyncRpcResult

代码语言:javascript
复制
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // return = false,即oneWay ,可以减少不必要的Future对象创建
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {c
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            // 订阅 responseFuture ,当 responseFuture 完成的之后,执行 asyncRpcResult 的complete方法, 这样用户线程就可以响应了
            asyncRpcResult.subscribeTo(responseFuture);

            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

AsyncToSyncInvoker

AsyncToSyncInvoker#invoke方法中,会判断是同步调用还是异步调用,如果是同步调用,将调用AsyncRpcResult#get方法阻塞用户线程,以达到同步效果

代码语言:javascript
复制
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);
    try {
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            // 如果是同步调用,调用 asyncResult#get 阻塞用户线程
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Exchanger
    • Exchangers
      • Exchanger
        • ExchangeServer
          • ExchangeClient
            • ExchangeChannel
            • Transporter
              • Transporters
                • Transporter
                  • Server
                    • Client
                    • 服务端启动服务
                    • 客户端连接服务
                    • 客户端发送请求
                    • 服务端响应请求
                    • 服务端发送结果
                    • 客户端响应结果
                    • 同步转异步
                      • AsyncToSyncInvoker
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档