Netty 源码深度解析(九) - 编码概述1 抽象类 MessageToByteEncoder2 抽象类 MessageToMessageEncoder一个java对象最后是如何转变成字节流,写到s

概述

一个问题

编码器实现了ChannelOutboundHandler,并将出站数据从 一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类, 用于帮助你编写具有以下功能的编码器:

  • 将消息编码为字节
  • 将消息编码为消息 我们将首先从抽象基类 MessageToByteEncoder 开始来对这些类进行考察

1 抽象类 MessageToByteEncoder

MessageToByteEncoder API

解码器通常需要在Channel关闭之后产生最后一个消息(因此也就有了 decodeLast()方法) 这显然不适于编码器的场景——在连接被关闭之后仍然产生一个消息是毫无意义的

1.1 ShortToByteEncoder

其接受一Short 型实例作为消息,编码为Short的原子类型值,并写入ByteBuf,随后转发给ChannelPipeline中的下一个 ChannelOutboundHandler 每个传出的 Short 值都将会占用 ByteBuf 中的 2 字节

ShortToByteEncoder

1.2 Encoder

Netty 提供了一些专门化的 MessageToByteEncoder,可基于此实现自己的编码器 WebSocket08FrameEncoder类提供了一个很好的实例

2 抽象类 MessageToMessageEncoder

你已经看到了如何将入站数据从一种消息格式解码为另一种 为了完善这幅图,将展示 对于出站数据将如何从一种消息编码为另一种。MessageToMessageEncoder类的 encode()方法提供了这种能力

MessageToMessageEncoderAPI

为了演示,使用IntegerToStringEncoder 扩展了 MessageToMessageEncoder

  • 编码器将每个出站 Integer 的 String 表示添加到了该 List 中

IntegerToStringEncoder的设计

关于有趣的 MessageToMessageEncoder 的专业用法,请查看 io.netty.handler. codec.protobuf.ProtobufEncoder类,它处理了由 Google 的 Protocol Buffers 规范所定义 的数据格式。

一个java对象最后是如何转变成字节流,写到socket缓冲区中去的

pipeline中的标准链表结构 java对象编码过程 write:写队列 flush:刷新写队列 writeAndFlush: 写队列并刷新

pipeline中的标准链表结构

标准的pipeline链式结构

数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务Handler处理,调用write,将结果对象写出去 而写的过程先通过tail节点,然后通过encoder节点将对象编码成ByteBuf,最后将该ByteBuf对象传递到head节点,调用底层的Unsafe写到JDK底层管道

Java对象编码过程

为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的ByteBuf,写到管道里?

我们先看下调用write的code

业务处理器接受到请求之后,做一些业务处理,返回一个user

  • 然后,user在pipeline中传递

AbstractChannel#

DefaultChannelPipeline#

AbstractChannelHandlerContext#

AbstractChannelHandlerContext#

  • 情形一

AbstractChannelHandlerContext#

AbstractChannelHandlerContext#

  • 情形二

AbstractChannelHandlerContext#

AbstractChannelHandlerContext#invokeWrite0

AbstractChannelHandlerContext#invokeFlush0 handler 如果不覆盖 flush 方法,就会一直向前传递直到 head 节点

落到 Encoder节点,下面是 Encoder 的处理流程

按照简单自定义协议,将Java对象 User 写到传入的参数 out中,这个out到底是什么?

需知User对象,从BizHandler传入到 MessageToByteEncoder时,首先传到 write

1. 判断当前Handelr是否能处理写入的消息(匹配对象)

  • 判断该对象是否是该类型参数匹配器实例可匹配到的类型

TypeParameterMatcher#

具体实例

2 分配内存

3 编码实现

  • 调用encode,这里就调回到 Encoder 这个Handler
  • 其为抽象方法,因此自定义实现类实现编码方法

4 释放对象

            try {
            // 调用encode,这里就调回到  `Encoder` 这个Handelr中    
                encode(ctx, cast, buf);
            } finally {
                // 既然自定义java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉
                // (当传入的msg类型是ByteBuf的时候,就不需要自己手动释放了)
                ReferenceCountUtil.release(cast);
            }
            // 如果buf中写入了数据,就把buf传到下一个节点
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
            // 否则,释放buf,将空数据传到下一个节点    
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            // 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        // 当buf在pipeline中处理完之后,释放
        if (buf != null) {
            buf.release();
        }
    }
}

我们详细阐述一下Encoder是如何处理传入的Java对象的 1.判断当前Handler是否能处理写入的消息,如果能处理,进入下面的流程,否则,直接扔给下一个节点处理 2.将对象强制转换成Encoder可以处理的 Response对象 3.分配一个ByteBuf 4.调用encoder,即进入到 Encoder 的 encode方法,该方法是用户代码,用户将数据写入ByteBuf 5.既然自定义java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉,(当传入的msg类型是ByteBuf的时候,就不需要自己手动释放了) 6.如果buf中写入了数据,就把buf传到下一个节点,否则,释放buf,将空数据传到下一个节点 7.最后,当buf在pipeline中处理完之后,释放节点

总结就是,Encoder节点分配一个ByteBuf,调用encode方法,将java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点,在我们的例子中,最终会传入到head节点

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里的msg就是前面在Encoder节点中,载有java对象数据的自定义ByteBuf对象

write:写队列

  • 首先,调用assertEventLoop确保该方法的调用是在reactor线程中
  • 然后,调用 filterOutboundMessage(),将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer

AbstractNioByteChannel

  • 接下来,估算出需要写入的ByteBuf的size
  • 最后,调用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情

ChannelOutboundBuffer

想要理解上面这段代码,须掌握写缓存中的几个消息指针

ChannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise下面分别是

三个指针的作用

  • flushedEntry 表第一个被写到OS Socket缓冲区中的节点
  • unFlushedEntry 表第一个未被写入到OS Socket缓冲区中的节点
  • tailEntry 表ChannelOutboundBuffer缓冲区的最后一个节点

图解过程

  • 初次调用 addMessage

fushedEntry指向空,unFushedEntrytailEntry都指向新加入节点

  • 第二次调用 addMessage
  • 第n次调用 addMessage

可得,调用n次addMessage

  • flushedEntry指针一直指向null,表此时尚未有节点需写到Socket缓冲区
  • unFushedEntry后有n个节点,表当前还有n个节点尚未写到Socket缓冲区

flush:刷新写队列

  • 不管调用channel.flush(),还是ctx.flush(),最终都会落地到pipeline中的head节点

DefaultChannelPipeline#flush

  • 之后进入到AbstractUnsafe
  • flush方法中,先调用

ChannelOutboundBuffer#addFlush 结合前面的图来看,首先拿到 unflushedEntry 指针,然后将flushedEntry指向unflushedEntry所指向的节点,调用完毕后

  • 接下来,调用 flush0()

发现这里的核心代码就一个 doWrite

继续跟

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
        // 拿到第一个需要flush的节点的数据
        Object msg = in.current();

        if (msg instanceof ByteBuf) {
            // 强转为ByteBuf,若发现没有数据可读,直接删除该节点
            ByteBuf buf = (ByteBuf) msg;

            boolean done = false;
            long flushedAmount = 0;
            // 拿到自旋锁迭代次数
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            // 自旋,将当前节点写出
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            // 写完之后,将当前节点删除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}
  • 第一步,调用current()先拿到第一个需要flush的节点的数据

ChannelOutboundBuffer#current

  • 第二步,拿到自旋锁的迭代次数

ChannelConfig

  • 第三步,自旋的方式将ByteBuf写到JDK NIO的Channel

doWriteBytes跟进去

NioSocketChannel# 出现了 javaChannel(),表明已进入JDK NIO Channel的领域

  • 第四步,删除该节点 节点的数据已经写入完毕,接下来就需要删除该节点

首先拿到当前被flush掉的节点(flushedEntry所指) 然后拿到该节点的回调对象 ChannelPromise, 调用 removeEntry()移除该节点

这里是逻辑移除,只是将flushedEntry指针移到下个节点,调用后

随后,释放该节点数据的内存,调用safeSuccess回调,用户代码可以在回调里面做一些记录,下面是一段Example

ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() {
    @Override
    public void operationComplete(Future<? super Void> future) throws Exception {
       // 回调 
    }
})

最后,调用 recycle,将当前节点回收

writeAndFlush: 写队列并刷新

writeAndFlush在某个Handler中被调用之后,最终会落到 TailContext节点

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

AbstractChannelHandlerContext#

AbstractChannelHandlerContext#

最终,通过一个boolean变量,表示是调用invokeWriteAndFlush,还是invokeWriteinvokeWrite便是我们上文中的write过程

AbstractChannelHandlerContext#

可以看到,最终调用的底层方法和单独调用writeflush一样的

由此看来,invokeWriteAndFlush基本等价于write之后再来一次flush

总结

  • pipeline中的编码器原理是创建一个ByteBuf,将Java对象转换为ByteBuf,然后再把ByteBuf继续向前传递
  • 调用write并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
  • writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
  • netty中的缓冲区中的ByteBuf为DirectByteBuf

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java达人

Java中的堆和栈的区别

当一个人开始学习Java或者其他编程语言的时候,会接触到堆和栈,由于一开始没有明确清晰的说明解释,很多人会产生很多疑问,什么是堆,什么是栈,堆和栈有什么区别?更...

2216
来自专栏逸鹏说道

C# 温故而知新:Stream篇(五)上

MemoryStream 目录: 1 简单介绍一下MemoryStream 2 MemoryStream和FileStream的区别 3 通过部分源码深入了解下...

4015
来自专栏余林丰

Effective Java通俗理解(下)

第31条:用实例域代替序数   枚举类型有一个ordinal方法,它范围该常量的序数从0开始,不建议使用这个方法,因为这不能很好地对枚举进行维护,正确应该是利用...

2569
来自专栏机器之心

教程 | PyTorch内部机制解析:如何通过PyTorch实现Tensor

选自Github 机器之心编译 参与:朱乾树、黄小天 PyTorch 中的基本单位是张量(Tensor)。本文的主旨是如何在 PyTorch 中实现 Tens...

4015
来自专栏walterlv - 吕毅的博客

.NET/C# 推荐一个我设计的缓存类型(适合缓存反射等耗性能的操作,附用法)

发布于 2018-09-02 14:27 更新于 2018-09...

5571
来自专栏博岩Java大讲堂

Java虚拟机--对象内存布局

3576
来自专栏FD的专栏

写出形似QML的C++代码

我的第一个想法(居然?)是做个Embedded-DSL。不过C++又不是Ruby……随便搜了一下,发现了一篇文章,也只是利用了重载运算符和运算符优先级,看上去限...

632
来自专栏菜鸟致敬

记一次两小时的js编程学习

1.弱类型语言 2.解释型语言 3.客户端语言 对于有学习Java、C以及Python一类的人来说,最熟悉的莫过于这些都是强类型语言。它们严格的遵守自身的规定,...

812
来自专栏Java技术栈

Java 面试题经典 77 问(含答案)!

1763
来自专栏锦小年的博客

python学习笔记7.4-内建模块base64

有时候,我们用noepad++或者记事本打开图片或者程序等文件的时候会显示大量的乱码,主要原因是这些文件编码的时候并不是字符串编码的。如果我们想把这些文件正常显...

2169

扫码关注云+社区

领取腾讯云代金券