前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo通信消息解析过程分析(1)

dubbo通信消息解析过程分析(1)

作者头像
技术蓝海
修改2020-10-26 13:42:02
1.7K0
修改2020-10-26 13:42:02
举报
文章被收录于专栏:wannshan(javaer,RPC)

概述

由于rpc底层涉及网络编程接口,线程模型,网络数据结构,服务协议,细到字节的处理。牵涉内容较多,本文就先从一个点说起。通过源码说明下dubbo通过netty框架做传输层,从接到数据字节流到把字节转换为dubbo上层可读的Request消息对象的过程。dubbo还支持mina,grizzly做底层传输层。这里包括两部,反序列化和解码。这篇主要是说解码的过程。 本文是说明下图dubbo架构图中红框中的部分

netty

既然是netty做传输层,netty的基础得提一点。 netty框架是通过管道(ChannelPipeline)模型处理网络数据流的,每个管道中有多个处理接点(ChannelHandler), 节点分为,进站(client请求进服务端口)和出站(请求响应出服务端口)两种。比如一个进站消息总是,顺序的(顺序是程序中编码指定的)通过进站处理节点。 同理出站消息,总是顺序的通过出站节点到达网络接口。

dubbo2.5.6版本,传输层dubbo提供有netty3和netty4两种实现,初始化netty通道都在NettyServer类里,两个类同名,包名不同。

具体,netty3在NettyServer类里doOpen()方法:

代码语言:javascript
复制
protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
	       //编解码器的初始化 
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());//设置解码hander
                pipeline.addLast("encoder", adapter.getEncoder());//设置编码hander
                pipeline.addLast("handler", nettyHandler);//自定义NettyHandler 扩展自netty双向handler基类,可以接受进站和出站数据流
                return pipeline;
		//进站的请求,先经过adapter.getDecoder()handler处理,再由nettyHandler处理
		//出站的请求,先经过nettyHandler处理 再由adapter.getEncoder()handler处理
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

netty4版本NettyServer类里doOpen()方法:

代码语言:javascript
复制
  protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();

        bootstrap = new ServerBootstrap();
        //acceptor 事件循环线程
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        //client channel事件循环线程
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //编解码器的初始化 
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())//设置解码hander
                                .addLast("encoder", adapter.getEncoder())//设置编码hander
                                .addLast("handler", nettyServerHandler);//自定义NettyServerHandler 扩展netty双向handler基类,可以接受进站和出站数据流
                        //进站的请求,先经过adapter.getDecoder()handler处理,再由nettyServerHandler处理
                        //出站的请求,先经过nettyServerHandler处理 再由adapter.getEncoder()handler处理
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

本次说的是进站流程,请求数据解析成Request对象过程,通过上面的代码和netty特性可知, 进站数据先通过解码hander,经解码成消息Request对象后,再到自定义handler,然后由自定义hanlder通过装饰模式,调用实际服务。

先看下解码handler的实现: 由adapter.getDecoder()这句跟踪到NettyCodecAdapter类的getDecoder()方法 public ChannelHandler getDecoder() { return decoder; } 可以看到,这个方法获取的解码handler,decoder,是NettyCodecAdapter类的私有属性

private final ChannelHandler decoder = new InternalDecoder();

看下InternalDecoder类定义,netty3版本:

代码语言:javascript
复制
     * 这里需要些netty的知识,继承SimpleChannelUpstreamHandler,表明它是进站handler
     * 所以进站的数据流,都会经过本handler对象,具体就是messageReceived方法。
     */
    private class InternalDecoder extends SimpleChannelUpstreamHandler {

        private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;//这是dubbo根据nio自己实现的buffer

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            Object o = event.getMessage();
            if (!(o instanceof ChannelBuffer)) {//这里ChnnelBuffer是netty基于jdk nio ByteBuffer 实现
                ctx.sendUpstream(event);
                return;
            }

            ChannelBuffer input = (ChannelBuffer) o;
            int readable = input.readableBytes();//到这就是从netty event对象取数据的过程。
            if (readable <= 0) {
                return;
            }

            com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
            if (buffer.readable()) {
                if (buffer instanceof DynamicChannelBuffer) {
                    buffer.writeBytes(input.toByteBuffer());
                    message = buffer;
                } else {
                    int size = buffer.readableBytes() + input.readableBytes();
                    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                            size > bufferSize ? size : bufferSize);//bufferSize 不指定是8k,这里表示最小8K
                    message.writeBytes(buffer, buffer.readableBytes());
                    message.writeBytes(input.toByteBuffer());//把netty 读到的字节流写入message
                }
            } else {//直接通过构造器,构造message
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                        input.toByteBuffer());
            }

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            Object msg;
            int saveReaderIndex;
            //从netty框架,读取到的数据,放入message后,下面就是针对message的反序列化和解码过程。
            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        //解码,这里面包括的反序列化
                        msg = codec.decode(channel, message);//重要!!!通过具体编解码实例codec完成解码
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {//如果解码结果是,Codec2.DecodeResult.NEED_MORE_INPUT,表示,需要更多数据
                        message.readerIndex(saveReaderIndex);//很重要,设置readerIndex为,解码读取前的位置,为了下次再从头读取。
                        break;
                    } else {
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {//解码完成,这里的msg已经是Request对象。!!
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());
            } finally {
                if (message.readable()) {
                    message.discardReadBytes();
                    buffer = message;
                } else {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                }
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }
        //处理异常
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            ctx.sendUpstream(e);//向下一个传递。一般在最后一个handler处理异常
        }
    }

netty4版本:

代码语言:javascript
复制
/***
     * netty4 扩展了ByteToMessageDecoder
     * 重写decode 方法,解码完成后,不用像netty3手动Channels.fireMessageReceived 发送事件,
     * netty4自动把对象,传递到下一个handler
     */
    private class InternalDecoder extends ByteToMessageDecoder {

        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

            ChannelBuffer message = new NettyBackedChannelBuffer(input);

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            Object msg;

            int saveReaderIndex;

            try {
                // decode object.
                do {
                    //保存初始,读取位置
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);//重要!!!通过具体编解码实例codec完成解码
                    } catch (IOException e) {
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        //解码失败后,重新设置readerIndex为读取前位置
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        //is it possible to go here ?
                        if (saveReaderIndex == message.readerIndex()) {
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            //解码成功后,加入到out list中,传递到下一个处理handler
                            out.add(msg);
                        }
                    }
                } while (message.readable());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    }

可以看到,无论netty3,还是netty4都是通过,NettyCodecAdapter的codec属性完成解码的, 这里有个概念,编解码handler是通过编解码实例完成编解码的,这里的编解码实例就是codec 而codec实例是由它构造函数从上层方法传递的。如下

代码语言:javascript
复制
public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
        int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
        this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
    }

再回到NettySever类中NettyCodecAdapter的构造语句 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); 跟进getCodec()方法,这个方就是获取实际编解码方案的。这个方法的实现在NettyServer的祖先类AbstractEndpoint中:

代码语言:javascript
复制
    protected Codec2 getCodec() {
        return codec;
    }
    //可以看到codec在构造方法里创建的
     public AbstractEndpoint(URL url, ChannelHandler handler) {
        super(url, handler);
        this.codec = getChannelCodec(url);//根据url配置,构造编码解码器(通过spi得到DubboCountCodec类实例)
        this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }
   // 再跟到getChannelCodec方法
     protected static Codec2 getChannelCodec(URL url) {
        //通过spi机制,从url里获取编解码方案,这里是dubbo。取不到就是telnet
	//dubbo编解码方案,实现类是DubboCountCodec
        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
                    .getExtension(codecName));
        }
    }

这里看下,dubbo中的所有编解码类结构:

Codec2接口

可以看到,所有编解码器实现,都扩展了Codec2接口。同时Codec2也是个spi扩展点。 接口Codec2,如下:

代码语言:javascript
复制
@SPI
public interface Codec2 {
    /**
     * spi 获取编码器
     * @param channel
     * @param buffer
     * @param message
     * @throws IOException
     */
    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

    /***
     * spi 获取解码器
     * @param channel
     * @param buffer
     * @return
     * @throws IOException
     */
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;


    enum DecodeResult {
        NEED_MORE_INPUT, SKIP_SOME_INPUT
    }

}

具体实现通过spi获取,dubbo编解码方案实例就是DubboCountCodec 那么看下DubboCountCodec类,以及decode方法:

代码语言:javascript
复制
public final class DubboCountCodec implements Codec2 {

    private DubboCodec codec = new DubboCodec();//具体dubbo协议编解码方案实现

    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        codec.encode(channel, buffer, msg);
    }

    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int save = buffer.readerIndex();//记录读取初始位置
        MultiMessage result = MultiMessage.create();//解码后对象容器。list,可放多个消息
        do {
            Object obj = codec.decode(channel, buffer);//解码过程在DubboCodec类中的decode方法里
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//需要接受更多信息
                buffer.readerIndex(save);//恢复读取位置
                break;
            } else {//解码完成,加到对象容器
                result.addMessage(obj);
                logMessageLength(obj, buffer.readerIndex() - save);//记录日志,可忽略
                save = buffer.readerIndex();//更新读取位置
            }
        } while (true);
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        if (result.size() == 1) {
            return result.get(0);//返回解码后对象
        }
        return result;
    }

    private void logMessageLength(Object result, int bytes) {
        if (bytes <= 0) {
            return;
        }
        if (result instanceof Request) {
            try {
                ((RpcInvocation) ((Request) result).getData()).setAttachment(
                        Constants.INPUT_KEY, String.valueOf(bytes));
            } catch (Throwable e) {
                /* ignore */
            }
        } else if (result instanceof Response) {
            try {
                ((RpcResult) ((Response) result).getResult()).setAttachment(
                        Constants.OUTPUT_KEY, String.valueOf(bytes));
            } catch (Throwable e) {
                /* ignore */
            }
        }
    }

}

dubbo协议消息格式

分析DubboCodec类之前,先说下dubbo协议消息格式,它包括消息头和消息体:

前2个字节: 为协议魔数,固定值oxdabb

第三字节: 第1比特(0/1)表示是请求消息,还是响应消息 第2比特(0/1)表示是是否必须双向通信,即有请求,必有响应 第3比特(0/1)表示是是否是,心跳消息 第低5位比特,表示一个表示消息序列化的方式(1,是dubbo ,2,是hessian...)

第四字节: 只在响应消息中用到,表示响应消息的状态,是成功,失败等

第5-12字节: 8个字节,表示一个long型数字,是reqeustId

第13—16字节: 4个字节,表示消息体的长度(字节数)

消息体,不固定长度 是请求消息时,表示请求数据 是响应消息时,表示方法调用返回结果。

编码和解码主要是对消息头的设置和解析。序列化和反序列化主要是对消息体的操作。

先看DubboCodec的关系图:

DubboCodec类decode方法的实现在其父类ExchangeCodec中:

代码语言:javascript
复制
     //先看下类中定义的常量:
    // header length.消息头长度
    protected static final int HEADER_LENGTH = 16;
    // magic header.
    protected static final short MAGIC = (short) 0xdabb;//1101 1010 1011 1011魔数
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];//高字节
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];//低字节
    // message flag.
    protected static final byte FLAG_REQUEST = (byte) 0x80;//1000,0000//表示消息类型
    protected static final byte FLAG_TWOWAY = (byte) 0x40;//0100,0000//表示是否双向通信
    protected static final byte FLAG_EVENT = (byte) 0x20;//0010,0000//表示是否是心跳事件
    protected static final int SERIALIZATION_MASK = 0x1f;//0001,1111/表示是序列化实现类型
  /***
     * 解码入口方法
     * @param channel
     * @param buffer
     * @return
     * @throws IOException
     */
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        //取得可读的字节数
        int readable = buffer.readableBytes();
        //header 最大16字节
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        //从buffer中读16字节到header数组中
        buffer.readBytes(header);
        //调用本身decode方法
        return decode(channel, buffer, readable, header);
    }

     /***
     * 具体协议解析方法,本方法主要是读取验证消息头的过程
     * @param channel
     * @param buffer
     * @param readable
     * @param header
     * @return
     * @throws IOException
     */
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // check magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            //前两字节,不是dubbo魔数
            int length = header.length;
            //如果可读的字节数目,大于16字节
            if (header.length < readable) {
                //给header扩容到readable大小
                header = Bytes.copyOf(header, readable);
                //把buffer剩下的字节读到header中,这里多于16字节
                buffer.readBytes(header, length, readable - length);
            }
            //上层方法,第一字节已经验证过,这个从第二字节开始验证。
            for (int i = 1; i < header.length - 1; i++) {
                //如果发现,后续字节有dubbo协议的开头
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    //重置readerIndex 位置,到魔数开始的地方。
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    //把魔数开始位置,以前的数据,覆盖赋给header(下面调用super.decode解析)
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            //上层方法是dubbo telnet 编解码实现
            return super.decode(channel, buffer, readable, header);
        }
        // check length.
        if (readable < HEADER_LENGTH) {//小于16字节,返回 需要更多对象
            return DecodeResult.NEED_MORE_INPUT;
        }

        //get data length.读取header[]最后四字节,构造一个int的数据,
        //根据dubbo协议,这个是消息体的长度
        int len = Bytes.bytes2int(header, 12);
        //检查数据大小
        checkPayload(channel, len);//默认为8M

        int tt = len + HEADER_LENGTH;//总的消息大小,消息头加消息实体
        if (readable < tt) {//如果可读取的,不够消息总大小,就返回 需要更多数据
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.这个时候,buffer的readerIndex位置已是,读完header后的位置,接下来的len长度的数据,全是消息体的数据。
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            //解码反序列化成Reqeust或者Response对象,decodeBody方法被子类DubboCodec重写了
            //这里要看DubboCodec的decodeBody的方法
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

DubboCodec类的decodeBody方法:

代码语言:javascript
复制
/***
     * 解码,是从输出流 is 取字节数据,经反序列化,构造Request 和Response对象的过程。
     * @param channel
     * @param is
     * @param header
     * @return
     * @throws IOException
     */
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        //消息头第3字节和SERIALIZATION_MASK&操作后,就可以得到,序列化/反序列化方案
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        //根据序列化方案id,或者url指定,通过spi机制去获取序列化实现。dubbo协议默认用hession2序列化方案
        //是放在消息头flag 里的。这里proto 值是2
        //获取具体用序列化/反序列化实现
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // get request id.header字节从第5到12 8个字节,是请求id
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {//根据flag&FLAG_REQUEST后,判断需要解码的消息类型
            // decode response.
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // get status. 获取响应 状态 成功,失败等
            byte status = header[3];
            res.setStatus(status);
            if (status == Response.OK) {//返回结果状态 ok成功
                try {
                    Object data;
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    } else if (res.isEvent()) {//事件消息,反序列化
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    } else {//业务调用结果消息 解码构造 DecodeableRpcResult 对象的过程
                        DecodeableRpcResult result;
                        if (channel.getUrl().getParameter(
                                Constants.DECODE_IN_IO_THREAD_KEY,
                                Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否在io 线程内解码
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            //!!!Response消息反序列化 就是把调用结果返回值 从is里反序列化出来,放在 DecodeableRpcResult类的result 字段的过程。
                            result.decode();
                        } else {
                            //不在io线程解码,要先通过readMessageData方法把调用结果数组取出后,
                            //放在UnsafeByteArrayInputStream对象,存在DecodeableRpcResult对象里,后续通过上层方法解码。
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }

                    //同时把DecodeableRpcResult对象放入Response result字段。
                    res.setResult(data);
                } catch (Throwable t) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode response failed: " + t.getMessage(), t);
                    }
                    //异常处理,设置status和异常信息
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            } else {
                //异常处理,设置异常信息
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        } else {//反解码Requset 消息类型
            // decode request.
            Request req = new Request(id);
            req.setVersion("2.0.0");
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {//业务调用请求消息 解码构造 DecodeableRpcInvocation 对象的过程
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否 io 线程内解码
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        //!!!Requset 类型反序列化方法
                        inv.decode();
                    } else {
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                //同时把DecodeableRpcInvocation对象放入Request data字段。
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request 异常请求对象设置
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }

    /***
     * 获取反序列化方案
     * @param serialization
     * @param url
     * @param is
     * @return
     * @throws IOException
     */
    private ObjectInput deserialize(Serialization serialization, URL url, InputStream is)
            throws IOException {
        return serialization.deserialize(url, is);
    }

    /***
     * 读取is 里的可用数据
     * @param is
     * @return
     * @throws IOException
     */
    private byte[] readMessageData(InputStream is) throws IOException {
        if (is.available() > 0) {
            byte[] result = new byte[is.available()];
            is.read(result);
            return result;
        }
        return new byte[]{};
    }

RPC调用请求:DecodeableRpcInvocation 类反序列化方法:

代码语言:javascript
复制
public void decode() throws Exception {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {//具体在decode重载方法里
                decode(channel, inputStream);
            } catch (Throwable e) {//异常请求设置
                if (log.isWarnEnabled()) {
                    log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
                }
                request.setBroken(true);
                request.setData(e);
            } finally {
                hasDecoded = true;
            }
        }
    }

    /**
     * 反序列化,解码 通过反序列化还原
     * RpcInvocation 类的
     * private String methodName;
     private Class<?>[] parameterTypes;
     private Object[] arguments;
     private Map<String, String> attachments; 是个属性值,就像在客户端请求时设置的一样。
     * @param channel channel.
     * @param input   input stream.
     * @return
     * @throws IOException
     */
    public Object decode(Channel channel, InputStream input) throws IOException {
        //获取反序列化方案
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);
        //反序列化出,dubbo版本,路径,服务版本信息,设置到attachments里,这读取顺序和序列化时的写入顺序也一致
        //当然,序列化方案也一致。这里默认都是hissen2
        //调用ObjectInput的readUTF()反序列化方法,依次获取调用信息
        setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());
        //读取方法名
        setMethodName(in.readUTF());
        //反序列化,方法请求参数类型,
        try {
            Object[] args;
            Class<?>[] pts;
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        //更具类型读取请求参数值
                        //调用ObjectInput的readObject()反序列化方法,反序列化出参数值
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            //设置保存请求参数类型
            setParameterTypes(pts);
            //反序列化,attachment map
            //调用ObjectInput的readObject()反序列化方法,反序列化出attachemnet值
            Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap<String, String>();
                }
                attachment.putAll(map);
                setAttachments(attachment);
            }
            //decode argument ,may be callback 回调参数设置,这个再说。
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }
            //保存请求参数值
            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        }
        return this;
    }

RPC调用结果:DecodeableRpcResult 反序列化 方法decode()

代码语言:javascript
复制
public void decode() throws Exception {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {//具体实现在decode(channel, inputStream)重载方法里
                decode(channel, inputStream);
            } catch (Throwable e) {//设置异常返回response
                if (log.isWarnEnabled()) {
                    log.warn("Decode rpc result failed: " + e.getMessage(), e);
                }
                response.setStatus(Response.CLIENT_ERROR);
                response.setErrorMessage(StringUtils.toString(e));
            } finally {
                hasDecoded = true;
            }
        }
    }

     /***
     * 反序列化,解码过程,读取input的调用结果字节数据,经反序列化成方法返回类型对象
     * 并发放回结果设置到RpcResult的result字段里
     * 以及异常返回字段的设置。
     * @param channel channel.
     * @param input   input stream.
     * @return
     * @throws IOException
     */
    public Object decode(Channel channel, InputStream input) throws IOException {
        //获取反序列化方案
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);
        //反序列化出 结果标识 调用ObjectInput的readByte方法,反序列化出一个byte值
        byte flag = in.readByte();//
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE://null 值
                break;
            case DubboCodec.RESPONSE_VALUE: //非空值
                try {
                    //根据invocation获取调用方法的放回类型
                    Type[] returnType = RpcUtils.getReturnTypes(invocation);
                    //根据返回类型,反序列出结果并这是到RpcResult 的result字段里。
                    //void 类型 结果值 是null ;int 等基本类型,自动装箱 Integer;
                    //具体调用ObjectInput的readObject重载的两个方法,反序列出结果对象
                    setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                            (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                    : in.readObject((Class<?>) returnType[0], returnType[1])));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION://异常信息反序列化,设置到exception字段
                try {
                   //具体调用ObjectInput的readObject重载的两个方法,反序列出异常对象
                    Object obj = in.readObject();
                    if (obj instanceof Throwable == false)
                        throw new IOException("Response data error, expect Throwable, but get " + obj);
                    setException((Throwable) obj);
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }
        return this;
    }

下一篇再说说反序列化。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • netty
  • Codec2接口
  • dubbo协议消息格式
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档