前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo源码,详解dubbo协议数据包及解包过程

Dubbo源码,详解dubbo协议数据包及解包过程

作者头像
吴就业
发布2020-07-10 10:47:23
1.9K0
发布2020-07-10 10:47:23
举报
文章被收录于专栏:Java艺术Java艺术

RPC协议即远程进程调用协议,自己的话理解就是,让两个进程之间的调用像同进程内调用一个函数那么简单。Dubbo框架的传输层默认使用dubbo协议,这也是一种RPC远程通信协议。学习Dubbo,我们有必要了解dubbo协议长什么样,最好的办法就是从源码中寻找答案。本篇将通过源码分析了解dubbo协议,以及了解dubbo协议数据包的解码过程。

dubbo协议数据包

dubbo协议数据包无论是请求数据包还是响应数据包,大体分为两部分,一部分是头部,一部分是body。头部的长度为16字节,这是固定的。而头部又可分为5个小部分:魔数 + 序列化标志和消息类型 + 状态码 + 序列化后的消息body的长度 + 请求id。

  • 0~1字节:共两个字节,存储魔数,标志这是一个dubbo协议的数据包;
  • 2~2字节:共一个字节,高3位存储消息类型,低5位存储序列化协议id;
  • 3~3字节:共一个字节,请求数据包并未使用,响应数据包用来存储响应的状态码;
  • 4~7字节:共四个字节,描述body的长度;
  • 8~15字节:共八字节,请求id。

魔数是固定的,存储的是0xdabb,标志这是一个dubbo协议的请求或响应数据包;序列化标志就是序列化协议的id,每种序列化协议都对应一个id,比如hession2序列化协议的id是2;消息类型标志这是一个请求消息还是一个响应消息,以及是否事件类型的消息;请求id用于标识一次请求,客户端需要根据这个请求id识别一个响应是哪个请求的回复。

整个数据包非常的简单,而唯一算得上复杂一点的,就是第3个字节码。使用高3位标志一个消息类型,如第8位为0表示这是一个响应数据包,第8位为1表示这是一个请求数据包。低5位表示使用的序列化协议。

在发送消息时,先计算出序列化协议,比如hession2序列化协议的id为2,将序列化协议id‘或’FLAG_REQUEST得到10000010;服务端只需要将该byte‘与’10000000,结果为1则拿到消息类型为FLAG_REQUEST说明这是一个请求数据包,将byte‘与’SERIALIZATION_MASK就能得到序列化协议的id。

代码语言:javascript
复制
protected static final byte FLAG_REQUEST = (byte) 0x80;//10000000
protected static final byte FLAG_TWOWAY = (byte) 0x40; //01000000
protected static final byte FLAG_EVENT = (byte) 0x20;  //00100000
protected static final int SERIALIZATION_MASK = 0x1f;  //00011111

可以看出,dubbo在努力的减少数据包的大小。与http协议的数据包相比,是不是感觉数据包小很多?当然,只是这样对比是没有意义的,假设一个接口调用,不需要任何参数,dubbo协议要描述请求哪个接口的哪个方法需要填写在body,http协议可以只在请求行上修改uri,那么此时,哪种协议的数据包更大,就需要通过抓包分析了。我的直觉告诉我,还是会比http协议的数据包小,因为http协议的请求头会比描述一个接口的某个方法的字符串更长。

dubbo协议的数据包解析过程会比http协议的请求效率高很多,因为dubbo协议不需要一行行探测是否是一个数据包的开始,只要找到魔术,就找到了一个数据包,按顺序读取16字节的数据就是请求头,根据请求头中的body长度就能拿到body,根据序列化算法就能解析body。

dubbo协议数据包解码源码分析

源码在dubbo-rpc模块的子模块dubbo-rpc-dubbo,主要分析的类:DubboCodecExchangeCodec

  • ExchangeCodec:负责编解码数据包
  • DubboCodec:负责编解码body

解码数据包入口在ExchangeCodec类的decode方法

代码语言:javascript
复制
@Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int readable = buffer.readableBytes();
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        buffer.readBytes(header);
        return decode(channel, buffer, readable, header);
    }
    
     * 数据包头部长度:16byte
     * 2byte魔数:0xdabb
     * 1字节序列化标志|消息类型
     * 1字节状态码
     * 4byte数据包body长度
     * 8byte请求id
     * 请求body
     *
     * @param channel
     * @param buffer
     * @param readable
     * @param header
     * @return
     * @throws IOException
     */
    @Override
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // check magic number.检查魔数是否是数据包的开始
        if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            // 读取请求头
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            return super.decode(channel, buffer, readable, header);
        }
        // check length. 检查读取到的字节长度是否大于请求头长度16字节码,如果不是,则等待接收到更多的字节再解析
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // get data length. 从请求头中获取数据包长度
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        // 如果当前读取的字节长度少于body长度+请求头长度,则继续等待接收到一个完整的数据包
        int tt = len + HEADER_LENGTH;
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        // 根据长度截取一个完整的数据包
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

decode方法首先从buffer中读取固定长度的请求头,检查请求头的前两个字节是否是0xdabb,如果不是,则循环探测找出0xdabb,这是解决粘包问题,如前一个数据包解析出错遗留下的一部分字节数据,需要去掉这部分脏数据。

接着检查当前buffer的长度是否少于请求头的长度,少于则数据包不完整,需要等待接收更多的字节数据再解析。否则从请求头中获取body长度,如果body长度加上请求头长度大于当前buffer的长度,则数据包不完整,继续等待接收更多的字节数据再解析。

如果数据包检验通过,则根据body长度从buffer中读取body,调用decodeBody继续解码数据包。子类DubboCodec覆写了decodeBody方法,因此我们需要分析DubboCodec的decodeBody方法。

代码语言:javascript
复制
 @Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        // 获取消息类型与序列化协议id
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        // 获取请求id
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // 解码响应数据包(客户端)
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(true);
            }
            // 获取响应状态码
            byte status = header[3];
            res.setStatus(status);
            try {
                if (status == Response.OK) {
                    Object data;
                    // 心跳包
                    if (res.isHeartbeat()) {
                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                        data = decodeHeartbeatData(channel, in);
                    }
                    // 事件
                    else if (res.isEvent()) {
                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                        data = decodeEventData(channel, in);
                    }
                    // 正常请求响应
                    else {
                        DecodeableRpcResult result;
                        if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            result.decode();
                        } else {
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    res.setResult(data);
                } else {
                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                    res.setErrorMessage(in.readUTF());
                }
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode response failed: " + t.getMessage(), t);
                }
                res.setStatus(Response.CLIENT_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            return res;
        } else {
            // 解码请求数据包
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(true);
            }
            try {
                Object data;
                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                // 心跳包
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, in);
                }
                // 事件
                else if (req.isEvent()) {
                    data = decodeEventData(channel, in);
                }
                // 正常请求
                else {
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    } else {
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }

decodeBody方法虽然很长,但很简单。从请求头中获取到消息类型与序列化协议id、请求id。根据消息类型判断这是一个请求数据包还是响应数据包。如果flag & FLAG_REQUEST = FLAG_REQUEST,表示这是一个请求数据包,否则就是一个响应数据包。

响应数据包:创建Response对象,设置响应是否事件类型,如果flag & FLAG_EVENT = FLAG_EVENT,则说明此响应还是一个事件。响应数据包要求有状态码,请求头的第四个字节。根据响应状态码解码响应的data,响应数据包也分三种类型,分别是心跳包、事件响应、正常的请求响应。这里不讨论事件类型与心跳包。

请求数据包:与解码响应数据包一样,根据请求id创建一个Request对象,根据请求头的事件类型标志设置是否event,最大的区别是请求数据包不使用请求头的第四个字节。根据从请求头获取的序列化协议id调用CodecSupport.deserialize方法解码body。

代码语言:javascript
复制
public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
        // 根据序列化标志获取序列化器
        Serialization s = getSerialization(url, proto);
        return s.deserialize(url, is);
    }
    
    public static Serialization getSerialization(URL url, Byte id) throws IOException {
        Serialization serialization = getSerializationById(id);
        // 从url获取序列器名称,默认为hession2
        String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
        // 为了安全起见,检查从网络传递的“序列化id”是否与此端的id匹配(仅对JDK序列化生效)
        if (serialization == null
                || ((id == JAVA_SERIALIZATION_ID || id == NATIVE_JAVA_SERIALIZATION_ID || id == COMPACTED_JAVA_SERIALIZATION_ID)
                && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
            throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
        }
        return serialization;
    }

根据序列化协议id获取Serialization,调用Serializationdeserialize方法完成反序列化操作,序列化和反序列是我们最熟悉不过的,常用的api接口开发使用的就是json协议的序列化与反序列,而dubbo提供了很多种序列化协议的支持,如json、hession2。dubbo会为每种序列化协议分配一个id,使用map做映射,通过id拿到Serialization序列化器。

代码语言:javascript
复制
 /**
     * id-序列化器 映射
     */
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();

dubbo协议的数据包解码流程相比http协议的数据包解码流程简单许多,编码过程没有分析完整,但其实就是解码的反向操作而已。看完http协议数据包的解码过程与dubbo协议数据包的解码过程,也了解到http协议的数据包和dubbo协议的数据包长啥样了,你觉得哪种协议的解码效率更高、对内存的使用消耗更少?欢迎留言,说说你的看法。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java艺术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档