前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ByteToMessageDecoder分析

ByteToMessageDecoder分析

作者头像
用户1418372
发布2019-12-16 17:59:42
6590
发布2019-12-16 17:59:42
举报
文章被收录于专栏:清晨我上码清晨我上码

ByteToMessageDecoder

该方法提供了将 ByteBuf 转化为对象的解码器处理流程,具体的解码规则交由子类去实现。

我们以读操作 channelRead 为例来研究一下:

代码语言:javascript
复制
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            // out 是一个链表,存放解码成功的对象
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                // cumulation 中存放的是上次未处理完的半包消息
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    // 本次处理,需要把上次遗留的半包和本次数据拼接后,一起处理
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 调用解码器解码消息
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                // 如果有解码成功的数据,需要向后传递,让其他 ChannelHandler 继续处理
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    // 如果有解码成功的数据,需要向后传递,让其他 ChannelHandler 继续处理
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    // 如果当前 ChannelHandler 所属 ctx 被剔除 pipeline 上下文,就不需要继续处理了
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                // 解码
                decodeRemovalReentryProtection(ctx, in, out);

                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        // 设置解码器状态为正在解码,避免解码过程中另一个线程调用了 handlerRemoved 把数据销毁,造成混乱
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            decode(ctx, in, out);
        } finally {
            // STATE_HANDLER_REMOVED_PENDING 表示在解码过程中,ctx 被移除,需要由当前线程来调用 handlerRemoved 
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }

    // 具体的消息解码算法,交给子类实现
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;    
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ByteToMessageDecoder
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档