前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo学习(八)远程调用原理

dubbo学习(八)远程调用原理

作者头像
虞大大
发布2020-11-06 08:12:38
1.5K0
发布2020-11-06 08:12:38
举报
文章被收录于专栏:码云大作战码云大作战

一、Dubbo远程调用流程

首先在客户端启动时会从注册中心拉取和订阅对应的服务列表,Cluster会把拉取的服务列表聚合成一个invoker,每次RPC调用前会通过Directory#list获取providers地址。获取这些服务列表给后续路由和负载均衡使用。在①中主要是将多个服务聚合成一个invoker。在框架内部另外一个实现Directory接口是RegistryDirectory类,它和服务接口名是一对一关系即每一个服务接口都有一个RegistryDirectory实例,主要负责拉取和订阅服务提供者、动态配置和路由项。

Cluster.join - 服务列表聚合代码:

org.apache.dubbo.rpc.cluster.Cluster#join

这里由于没配置路由规则,因此默认会走FailoverCluster实现。

代码语言:javascript
复制
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new FailoverClusterInvoker<T>(directory);
}

Directory接口:

代码语言:javascript
复制
public interface Directory<T> extends Node {
    //获取接口

Class<T> getInterface();

代码语言:javascript
复制
    //获取服务列表
代码语言:javascript
复制
    List<Invoker<T>> list(Invocation invocation) throws RpcException;
}

在Dubbo发起服务调用时,所有理由和负载均衡都是在客户端实现的,如上图步骤②③④所示。客户端服务调用首先触发路由操作,然后将路由结果得到的服务列表作为负载均衡参数,经过负载均衡后会选出一台机器进行rpc调用,会将请求交给底层I/O线程池(比如netty)处理,线程池中主要处理读写、序列化、反序列化等逻辑。在处理反序列化对象时会在业务线程池中处理。在步骤⑤中包含两种线程池,一种是I/0线程池-netty,另一种是Dubbo业务线程池。

最后服务提供方会根据传递过来的接口、分组和版本查找invoker对应的实例进行反射调用。

二、服务提供方的查找和调用

服务提供方查询服务方法和调用主要实现在ExchangeHandlerAdapter中,内部实现代码:

代码语言:javascript
复制
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        Invocation inv = (Invocation) message;              //从exportedMap缓存中获取invoker        Invoker<?> invoker = getInvoker(channel, inv);

//...

//调用业务方具体方法

代码语言:javascript
复制
        Result result = invoker.invoke(inv);
        if (result instanceof AsyncRpcResult) {
            return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
        } else {
            return CompletableFuture.completedFuture(result);
        }
    }
    //...
}
代码语言:javascript
复制
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    //...    //根据服务接口名、端口、接口分组、版本构建缓存的key    String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
    //从缓存中获取exporter对象    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    if (exporter == null)
        throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
    //返回invoker对象    return exporter.getInvoker();
}

服务提供方查询服务方法和调用的源码比较简单,这里应用了exporterMap,这个map在之前服务暴露中提到过,服务暴露时会把暴露的服务根据特点的规则生成构造key-服务接口名+端口+分组+版本,value为服务被包装成的exporter对象。

因此在服务查找中,也会根据服务接口名+端口+分组+版本进行key的构建,从缓存中查找exporter对象,然后通过getInvoer方法找到invoker对象。最后通过invoker.invoke调用服务的具体业务逻辑。

三、Dubbo的请求响应的处理

Dubbo框架内部,所有方法的调用会被抽象成Request/Response,即每一次调用都会创建一个Request请求,方法的返回则会返回一个Response对象。请求/响应的处理主要在HeaderExchangeHandler中。

请求响应实现代码:

代码语言:javascript
复制
public void received(Channel channel, Object message) throws RemotingException {
    //更新事件时间戳    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            Request request = (Request) message;
            if (request.isEvent()) {                //如果是只读事件,在channel中打标
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {                    //处理方法调用并返回
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {            //接收响应
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {            //客户端不能进行telnet调用
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {                //触发telnet调用
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}//请求处理void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
        Object data = req.getData();
        String msg;
        if (data == null) msg = null;        //处理请求格式不正确,把异常转换字符串返回        else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
        else msg = data.toString();
        res.setErrorMessage("Fail to decode request due to: " + msg);
        res.setStatus(Response.BAD_REQUEST);
        channel.send(res);
        return;
    }
    // find handler by message class.
    Object msg = req.getData();
    try {
        //调用上述提到过的DubboProtocol.reply方法进行方法调用
        CompletableFuture<Object> future = handler.reply(channel, msg);
        if (future.isDone()) {
            res.setStatus(Response.OK);
            res.setResult(future.get());
            channel.send(res);
            return;
        }
        //...
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}//响应处理static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {        //唤醒阻塞的线程并通知结果
        DefaultFuture.received(channel, response);
    }
}

上述代码中首先会更新时间戳,然后判断是请求模式还是响应模式还是telnet调用模式,进行相应处理。

请求模式,主要处理只读事件,用于Dubbo优雅停机。当注册中心反注册元数据时,因为网络问题,客户端不能及时感应注册中心事件,服务端会发送readonly报文告知下线。如果不是只读事件,则调用Dubboprotocol.reply方法进行方法调用。

响应模式,唤醒阻塞的线程并告知业务调用方。

telnet模式,触发telnet调用。

四、总结

本章偏理论,原理也容易懂。主要分析了Dubbo调用原理和流程。后面也分析了服务提供方的查找和方法调用、dubbo的请求/响应处理。

参考资料:

《深入理解Apache Dubbo与实战》 - 第6章 Dubbo远程调用

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-10-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码云大作战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档