前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty 入门篇 Day 3---网络编程

Netty 入门篇 Day 3---网络编程

作者头像
猫头虎
发布2024-04-08 10:16:02
810
发布2024-04-08 10:16:02
举报

6.网络编程

6.1阻塞和非阻塞
阻塞

在阻塞模式下,会导致 线程暂停 ssc.accept(); // 阻塞的方法 会导致线程暂停,一直等到有client连接 才继续工作 channel.read(buffer); // 阻塞的方法 会导致线程暂停,一直等client发送信息 才继续进行读操作 服务器端的单线程模式下,阻塞方法会导致这个线程暂停(闲置); 同时 多个client相到受影响,几乎不能正确工作,需要服务器端的多线程支持 服务器端的多线程模式缺点:1) 占用内存多 2)多线程切换,带来比较大的内存开销

阻塞模式的服务器端代码:

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
    ByteBuffer buffer = ByteBuffer.allocate(16);

    // 1. 创建服务器端对象
    ServerSocketChannel ssc = ServerSocketChannel.open();

    // 2. 绑定服务监听端口
    ssc.bind(new InetSocketAddress(9999));

    // 3. 连接集合
    List<SocketChannel> channels = new ArrayList<>();

    while (true) {
        System.out.println("等待连接...");
        // 4. 接收client的连接
        SocketChannel sc = ssc.accept(); // 阻塞的方法 会导致线程暂停,一直等到有client连接 才继续工作
        System.out.println("已连接... " + sc);
        channels.add(sc);

        for (SocketChannel channel : channels) {
            System.out.println("before read.....");
            // 5. 接收client发送的数据
            channel.read(buffer); // 阻塞的方法 会导致线程暂停,一直等client发送信息 才继续进行读操作
            buffer.flip();
            ByteBufferUtil.debugRead(buffer);
            buffer.clear();
            System.out.println("after read.....");
        }

    }
}

阻塞模式的client端代码:

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
    // 1 创建client连接对象
    SocketChannel sc = SocketChannel.open();
    System.out.println(sc);

    // 2 和server建议连接
    sc.connect(new InetSocketAddress("localhost", 9999));

    System.out.println(".....");
}

client的运行,使用debug+运行时多实例模式,进行演示。

非阻塞

设置非阻塞模式: serverSocketChannel.configureBlocking(false); // 设置非阻塞模式 socketChannel.configureBlocking(false); // 非阻塞模式 非阻塞模式下,不会导致线程暂停

SocketChannel sc = ssc.accept(); // 非阻塞的方法 不会导致线程暂停。没有连接时返回null;有连接时正确获取 int len = channel.read(buffer); // 非阻塞的方法 不会导致线程暂停。 没发消息则返回0;client发送信息正常读取 非阻塞模式下,线程不会暂停,即使 没有连接或没有可读数据,线程仍然不断运行,浪费CPU

非阻塞模式的server端代码:

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
    ByteBuffer buffer = ByteBuffer.allocate(16);

    // 1. 创建服务器端对象
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false); // 设置非阻塞模式

    // 2. 绑定服务监听端口
    ssc.bind(new InetSocketAddress(9999));

    // 3. 连接集合
    List<SocketChannel> channels = new ArrayList<>();

    while (true) {
        // System.out.println("等待连接...");
        // 4. 接收client的连接
        SocketChannel sc = ssc.accept(); // 非阻塞的方法 不会导致线程暂停。没有连接时返回null;有连接时正确获取
        if (sc != null) { // 有client连接
            System.out.println("已连接... " + sc);
            sc.configureBlocking(false); // 非阻塞模式
            channels.add(sc);
        }

        for (SocketChannel channel : channels) {
            // System.out.println("before read.....");
            // 5. 接收client发送的数据
            int len = channel.read(buffer); // 非阻塞的方法 不会导致线程暂停。 没发消息则返回0;client发送信息正常读取
            if (len > 0 ) { // 读到消息
                buffer.flip();
                ByteBufferUtil.debugRead(buffer);
                buffer.clear();
                System.out.println("after read.....");
            }
        }

    }
}

阻塞模式的client端代码:

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
    // 1 创建client连接对象
    SocketChannel sc = SocketChannel.open();
    System.out.println(sc);

    // 2 和server建议连接
    sc.connect(new InetSocketAddress("localhost", 9999));

    System.out.println(".....");
}

client的运行,使用debug+运行时多实例模式,进行演示。

6.2selector选择器

单个线程配合selector完成 对多个channel事件的监控,称为多路复用。

6.2.1处理accept事件

服务器端代码:

代码语言:javascript
复制
	public static void main(String[] args) throws IOException {
        // 1 创建selector 管理多个channel
        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // 2 注册 channel 到 selector 上(建议channel和selector的关系)
        SelectionKey sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT); // accept事件
        System.out.println("sscKey: " + sscKey);

        ssc.bind(new InetSocketAddress(9999));
        while (true){
            // 3 通过select监控事件,有事件 线程正常工作,没有事件 线程阻塞
            selector.select();

            // 4 处理事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                System.out.println(sc);
            }
        }
    }

client端代码:

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
    // 1 创建client连接对象
    SocketChannel sc = SocketChannel.open();
    System.out.println(sc);

    // 2 和server建议连接
    sc.connect(new InetSocketAddress("localhost", 9999));

    System.out.println(".....");
}
6.2.2cancel事件

tex // select() 在事件未处理时,不阻塞 (事件发生后 要么处理 要么cancel) selector.select();

注意: 事件发生后 要么处理,要么cancel取消,否则 下次该事件 仍会触发。

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
        // 1 创建selector 管理多个channel
        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // 2 注册 channel 到 selector 上(建议channel和selector的关系)
        SelectionKey sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT); // accept事件
        System.out.println("sscKey: " + sscKey);

        ssc.bind(new InetSocketAddress(9999));

        while (true){
            // 3 通过select监控事件,有事件 线程正常工作,没有事件 线程阻塞
            // select()方法 在事件未处理时,不阻塞  (事件发生后 要么处理 要么cancel)
            selector.select();
            System.out.println(".......");

            // 4 处理事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                System.out.println("key: " + key);

                /*ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                System.out.println(sc);*/

                // 注意: 事件发生后, 要么处理, 要么cancel取消,否则下次该事件 仍会触发
                key.cancel(); // 取消事件
            }
        }
    }
6.2.3 处理read事件
代码语言:javascript
复制
public static void main(String[] args) throws IOException {
        // 1 创建selector 管理多个channel
        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // 2 注册 channel 到 selector 上(建议channel和selector的关系)
        SelectionKey sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT); // accept事件
        System.out.println("sscKey: " + sscKey);

        ssc.bind(new InetSocketAddress(9999));

        while (true){
            // 3 通过select监控事件,有事件 线程正常工作,没有事件 线程阻塞
            // select()方法 在事件未处理时,不阻塞  (事件发生后 要么处理 要么cancel)
            selector.select();
            System.out.println(".......");

            // 4 处理事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                // 处理key时, 要从selectedKeys中进行删除,否则下次处理时会遇到问题
                iter.remove();
                System.out.println("key: " + key);

                // 区分不同的事件 进行处理
                if (key.isAcceptable()){ // accept事件
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    System.out.println(sc);

                    sc.configureBlocking(false);
                    // 把channel 注册到 selector上
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ); // read事件
                } else if(key.isReadable()) { // read事件
                    try {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        int len = sc.read(buffer);
                        if (len == -1){ // 没读到内容
                            key.cancel(); // 正常close client时的处理 cancel事件
                        } else { // 读到内容
                            buffer.flip();
                            ByteBufferUtil.debugRead(buffer);
                        }
                    }catch (Exception e){  
                        // e.printStackTrace();
                        key.cancel(); // 非正常关闭client时的处理 cancel事件
                    }
                }

            }
        }
    }

解决读事件时边界信息的问题

服务器端代码:

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
        // 1 创建selector 管理多个channel
        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // 2 注册 channel 到 selector 上(建议channel和selector的关系)
        SelectionKey sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT); // accept事件
        System.out.println("sscKey: " + sscKey);

        ssc.bind(new InetSocketAddress(9999));

        while (true){
            // 3 通过select监控事件,有事件 线程正常工作,没有事件 线程阻塞
            selector.select();

            // 4 处理事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove(); // 处理key时, 要从selectedKeys中进行删除,否则下次处理时会遇到问题
                System.out.println("key: " + key);

                // 区分不同的事件 进行处理
                if (key.isAcceptable()){ // accept事件
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    System.out.println(sc);

                    sc.configureBlocking(false);
                    // 把channel 注册到 selector上
                    // 第3个参数 attachment附件
                    ByteBuffer buffer = ByteBuffer.allocate(10);
                    SelectionKey scKey = sc.register(selector, 0, buffer);
                    scKey.interestOps(SelectionKey.OP_READ); // read事件
                } else if(key.isReadable()) { // read事件
                    try {
                        SocketChannel sc = (SocketChannel) key.channel();
                        // ByteBuffer buffer = ByteBuffer.allocate(5);
                        // 获取 channel注册时的attr (已存在的buffer)
                        ByteBuffer buffer = (ByteBuffer) key.attachment();

                        int len = sc.read(buffer);
                        if (len == -1){ // 没读到内容(正常close时)
                            key.cancel();
                        } else { // 读到内容
                            // 以\n做为分隔进行读取
                            split(buffer);

                            if (buffer.position() == buffer.limit()){ // 对buffer进行扩容
                                ByteBuffer buffer2 = ByteBuffer.allocate(buffer.capacity() * 2);
                                buffer.flip();
                                buffer2.put(buffer);
                                key.attach(buffer2);
                            }
                        }
                    }catch (Exception e){
                        // e.printStackTrace();
                        key.cancel();
                    }
                }

            }
        }
    }

    // 拆分缓冲区中的数据
    private static void split(ByteBuffer source){
        source.flip(); // 切换原缓冲区为 读模式

        // 依次按字节读取原缓冲区内容
        for(int i=0; i<source.limit(); i++){
            // 当读到 \n 时  (格式正确 的一项完整内容)
            if (source.get(i) == '\n'){
                // 把格式正确的数据 存入 新的缓冲区
                // 获取实际读到的字节数量
                int length = i - source.position() + 1;
                // System.out.println(i);
                // 分配新缓冲区内存大小 (实际读到的字节数量)
                ByteBuffer target = ByteBuffer.allocate(length);
                // 从source中读, 边读 边写入target
                for(int j=0; j<length; j++){
                    target.put(source.get()); // 使用get()读内容时,position会发生改变
                }
                ByteBufferUtil.debugAll(target);
            }
            // System.out.println((char)source.get(i));
        }

        // 切换原缓冲区为 写模式
        // 注意:此外必须为compact,把未读的内容 自动前移(相当于删除了已读过的内容),且position为未读内容的长度
        //   未读的内容  将和 新内容 连接到一起
        source.compact();
    }

客户端代码:

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
        // 1 创建client连接对象
        SocketChannel sc = SocketChannel.open();

        // 2 和server建议连接
        sc.connect(new InetSocketAddress("localhost", 9999));
        sc.write(Charset.defaultCharset().encode("12\n34567890\naaaa"));
        sc.write(Charset.defaultCharset().encode("123\n4567890\nbb"));
        sc.write(Charset.defaultCharset().encode("123456\n7890cccdddeee\n"));

        System.out.println(".....");
    }
6.2.4处理write事件
代码语言:javascript
复制
 sc = SocketChannel.open();

        // 2 和server建议连接
        sc.connect(new InetSocketAddress("localhost", 9999));
        sc.write(Charset.defaultCharset().encode("12\n34567890\naaaa"));
        sc.write(Charset.defaultCharset().encode("123\n4567890\nbb"));
        sc.write(Charset.defaultCharset().encode("123456\n7890cccdddeee\n"));

        System.out.println(".....");
    }

结语

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、评论、收藏➕关注,您的支持是我坚持写作最大的动力。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 6.网络编程
    • 6.1阻塞和非阻塞
      • 6.2selector选择器
      • 解决读事件时边界信息的问题
      • 结语
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档