前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty入门之消息边界处理以及ByteBuffer大小分配

Netty入门之消息边界处理以及ByteBuffer大小分配

作者头像
@派大星
发布2023-06-28 14:26:07
2110
发布2023-06-28 14:26:07
举报
文章被收录于专栏:码上遇见你

以上三篇内容主要讲了NIO的三大组件ByteBuffer文件编程阻塞非阻塞Selector等,需要了解像详情的请移步查看。

本章主要讲解如何处理在消息传递过程中的边界问题。

处理消息边界(如图)

如图所示:在实际项目中,消息有可能要比ByteBuffer长,或者比ByteBuffer短; 针对以上的几种情况,应该如何去处理呢?有两种方案:

  1. 固定消息长度,数据包大小一样,服务器按照预定长度读取,缺点是浪费带宽。
  2. 按分隔符拆分,但是效率低。
  3. TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息的大小,从而分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则会影响server的吞吐量
  • Http1.1 是TLV格式
  • Http2.0 是LTV格式

上代码(⚠️一定要注意代码中的注释):

  • server
代码语言:javascript
复制
@Slf4j
public class Server {

    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            // 找到一条完整消息
            if (source.get(i) == '\n') {
                int length = i + 1 - source.position();
                // 把这条完整消息存入新的 ByteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                // 从 source 读,向 target 写
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
        source.compact();
    }


    public static void main(String[] args) throws IOException {
        // 1. 创建 selector, 管理多个 channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        // 2. 建立 selector 和 channel 的联系(注册)
        // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
        SelectionKey sscKey = ssc.register(selector, 0, null);
        // key 只关注 accept 事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("sscKey:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));
        while (true) {
            // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
            // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
            selector.select();
            // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
                iter.remove();
                log.debug("key: {}", key);
                // 5. 区分事件类型
                if (key.isAcceptable()) { // 如果是 accept
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    ByteBuffer buffer = ByteBuffer.allocate(16) // 利用 附件的方式将buffer注册关联到selectionKey上
                    SelectionKey scKey = sc.register(selector, 0, buffer);
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                    log.debug("scKey:{}", scKey);
                } else if (key.isReadable()) { // 如果是 read
                    try {
                        SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                        // 取上次注册的buffer
                        ByteBuffer buffer = (ByteBuffer)key.attachment();
                        int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                        if(read == -1) {
                            key.cancel();
                        } else {
                            split(buffer)
                            // 这里说明原有的buffer满了
                            if (buffer.position() == buffer.limit()) {
                                ByteBuffer newByteBuffer = ByteBuffer.allocate(buffer.capacity() * 2 )
                                buffer.flip();
                                newByteBuffer.put(buffer);
                                // 替换原有的附件buffer
                                key.attach(newByteBuffer); 
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
                    }
                }
            }
        }
    }
}
  • client
代码语言:javascript
复制
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        SocketAddress address = sc.getLocalAddress();
        sc.write(Chaset.defaultCharset().encode("012345677890abcdef"))
        System.in.read();
    }
}

demo 解析: 上述server端代码用到了附件这个概念

代码语言:javascript
复制
ByteBuffer buffer = ByteBuffer.allocate(16) // 利用 附>件的方式将buffer注册关联到selectionKey上
代码语言:javascript
复制
SelectionKey scKey = sc.register(selector, 0, buffer);

重新关联附件buffer key.attach(newByteBuffer);

说明:

上述代码就是简单做了一个消息边界的处理,相信大家也看到了一些问题,它只能是做到自动扩容,无法自适应,也就是缩小。暂时先提前告诉大家Netty是可以做到自适应的。

如何处理消息边界问题以及ByteBuffer大小分配的问题已经说完了,接下来给大家说一下ByteBuffer的大小如何分配的注意点。

每个Channel都需要记录可能被切分的消息,因为ByteBuffer不能够被多个Channel共同使用,因此需要为每个channel维护一个独立的ByteBUffer

  • ByteBuffer不能太大,比如一个ByteBuffer1Mb的话,需要支持百万连接就要1Tb内存,因此需要设计大小可变的ByteBUffer
    • 思路一:首先分配一个较小的buffer,例如4k,如果发现数据不够,再分配8k的buffer,将4kbuffer内容拷贝至8k的buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。
    • 思路二:用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

好了本次的文章就到这里了后续再为大家带来关于Netty的更多内容。切记:一定要好好消化上述的demo案例。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码上遇见你 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 处理消息边界(如图)
    • 说明:
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档