前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[网络通信] Netty 入门实战

[网络通信] Netty 入门实战

作者头像
架构探险之道
发布2023-03-04 10:58:29
6720
发布2023-03-04 10:58:29
举报
文章被收录于专栏:架构探险之道架构探险之道

[网络通信] Netty 入门实战

简介

什么是 Netty?让我们带着问题来跟着官网的 Demo 教程先入个门。

  • 依赖
  • 实战
    • 丢弃服务器
    • 响应服务器
    • 时间服务器
  • 流数据传输
  • 对象序列化传输
  • 关闭
  • 小结

Netty 是异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

  • Netty 项目旨在为可维护的高性能和高可伸缩性协议服务器和客户端的快速开发提供一个异步事件驱动的网络应用框架和工具。
  • Netty 是一个 NIO 客户机服务器框架,可以快速简单地开发网络应用程序,如协议服务器和客户机。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器的开发。
  • “快速和简单”并不意味着产生的应用程序会受到可维护性或性能问题的影响。Netty 是根据实现许多协议(如 FTP、 SMTP、 HTTP 以及各种二进制和基于文本的遗留协议)的经验而精心设计的。因此,Netty 成功地找到了一种方法来实现简单的开发、性能、稳定性和灵活性。
  • 一些用户可能已经发现了其他声称具有同样优势的网络应用程序框架,您可能想要问是什么使 Netty 与他们如此不同。答案就是它所建立的哲学。Netty 的目的是从第一天开始就在 API 和实现方面为您提供最舒适的体验。它不是什么实实在在的东西,但是当你阅读本指南和玩 Netty 的时候,你会意识到这种哲学会让你的生活变得更加轻松。

依赖

代码语言:javascript
复制
dependencies {
    implementation "io.netty:netty-all:4.1.56.Final"
}

实战

世界上最简单的协议实现不是发送Hello World消息,被服务器接受到返回相应的响应结果。而是服务器接收到消息后直接丢弃,不做任何响应。

丢弃服务器

要实现 DISCARD 协议,您需要做的唯一一件事就是忽略所有接收到的数据。让我们直接从处理程序实现开始,它处理 Netty 生成的 I/O 事件。

代码语言:javascript
复制
// [1]
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
   
    // [2]
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        // 不处理消息,直接释放
        // [3]
        ((ByteBuf) msg).release();
    }

    // [4]
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelInboundHandlerAdapter 实现了接口 ChannelInboundHandler。充当适配器的角色提供了各种可以重写的事件处理程序方法,通过适配器的标准实现方式,可以避免我们自己实现处理程序接口。
  2. 我们可以覆盖channelRead()的事件处理器方法。只要从客户机接收到新数据,就会使用接收到的消息调用此方法。
  3. 为了实现DISCARD 协议,处理程序必须忽略接收到的消息。ByteBuf是一个引用计数的对象,必须通过release()方法来进行释放。通常的事项方式是这样的
代码语言:javascript
复制
// [2]
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        // 不处理消息,直接释放
        // [3]
        //((ByteBuf) msg).release();
        try {
            // 针对消息 msg 进行处理
        } finally {
         // 释放引用
            ReferenceCountUtil.release(msg);
        }
    }
  1. exceptionCaught()作为异常处理,当 Netty 由于I/O 错误或处理程序实现由于处理事件时抛出的异常而引发异常时,使用 Throwable 调用事件处理程序方法。在大多数情况下,被捕获的异常应该被记录,其相关的通道应该在这里关闭,尽管这个方法的实现可以根据您想要处理的异常情况而有所不同。例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。

启动服务器

代码语言:javascript
复制
public class DiscardServer {
    /**
     * 端口
     */
    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        // [1]
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // [2]
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                 // [3]
                    .channel(NioServerSocketChannel.class)
                 // [4]
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                 // [5]
                    .option(ChannelOption.SO_BACKLOG, 128)
                 // [6]
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
   // [7]
            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new DiscardServer(port).run();
    }
}

  1. NioEventLoopGroup 是一个处理I/O操作的多线程事件循环的处理器定义。例子中定义了2个处理器:
    • 第一个通常被称为“ boss” ,接受一个传入的连接。
    • 第二个通常被称为“工人” ,一旦老板接受了连接并注册了与工人接受的连接,就处理接受连接的通信。
  2. ServerBootstrap 是服务器构造的辅助类,一般不推荐此方式进行服务器的创建。
  3. 此处指定NioServerSocketChannel类,用于实例化一个新的Channel来接受传入的连接。
  4. 此处指定的处理程序将始终由新接受的ChannelChannelInitializer作为特殊的处理程序,用于帮助用户配置新的Channel。往往适用于为新的Channel添加一些处理程序来实现更为复杂的应用程序。
  5. option参数设置,支持设置特定的套接字选项。来满足特定的协议需求,如.option(ChannelOption.TCP_NODELAY, true)来编写TCP/IP 服务协议。
  6. childOptionoption不同之处在于:
    • option 适用于NioServerSocketChannel来接受传入的连接。
    • childOption适用于被父级的ServerChannel接受的Channels
  7. 绑定到指定端口。

模拟通信

  • 调整代码,打印接受的消息
代码语言:javascript
复制
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        try {
            // 针对消息 msg 进行处理
            while (in.isReadable()) { // [4]
                System.out.print((char) in.readByte());
                System.out.flush();
            }        
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
  • windows 环境下使用 powershell 输入命令 telnet localhost 8080,进行通信。
  • powershell 终端输入的字符会同步在控制台打印出来。

响应服务器

目前为止,我们只接受但是没有任何响应。一台服务器,通常应该响应该请求。让我们学习如何通过实现ECHO协议向客户端写入响应消息,其中任何接收到的数据都被发送回来。 与前面部分实现的丢弃服务器的唯一区别在于它将接收到的数据发回,而不是将接收的数据输出到控制台。因此,再次修改channelRead()方法是足够的: 参考地址:https://netty.io/4.1/xref/io/netty/example/echo/package-summary.html

代码语言:javascript
复制
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

public class EchoServer {
    /**
     * 端口
     */
    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new EchoServer(port).run();
    }
}

通过终端输入telnet localhost 8080后输入英文字符会得到响应,原字符返回。如依次输入abc,终端打印结果:

代码语言:javascript
复制
aabbcc
  • ChannelHandlerConetxt 提供了很多方法让你去触发 IO 事件或操作。这里我们调用 write(object)来逐字的写入接受到的消息。注意,我们不像 DISCARD 例子里的那样,我们没有释放我们收到的消息。这是因为当它被写回到 wire 时,Netty 替我们释放它。
  • ctx.write(Object) 不会让消息发送,它存在于内部缓冲区,通过调用 ctx.flush() 来把消息发送出去,或者,您可以简洁的调用 ctx.writeAndFlush(msg)。

时间服务器

接下来要实现的协议是 TIME 协议。它不同于前面的示例,因为它发送包含32位整数的消息,而不接收任何请求,并在消息发送后关闭连接。在本例中,您将学习如何构造和发送消息,以及如何在完成时关闭连接。

因为我们将忽略任何接收到的数据,但是一旦建立连接就发送消息,所以这次不能使用 channelRead() 方法。相反,我们应该重写 channelActive()方法。代码如下:

服务端
代码语言:javascript
复制
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // [1]
        final ByteBuf time = ctx.alloc().buffer(4); // [2]
        time.writeInt(89); //ASCII 10进制,对应 Y
        time.writeInt(105); //ASCII 10进制,对应 i
        final ChannelFuture f = ctx.writeAndFlush(time); // [3]
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // [4]
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

如前所述,当建立连接并准备生成通信量时,将调用 channelActive ()方法。让我们编写一个32位的整数,它表示此方法中的当前时间。

要发送一个新消息,我们需要分配一个新的缓冲区,其中将包含消息。我们要写一个32位的整数,因此我们需要一个容量至少为4字节的 ByteBuf。通过 ChannelHandlerContext.alloc ()获取当前 ByteBufAllocator 并分配一个新缓冲区。

像往常一样,我们写入一条构造好的消息。但是,等等,哪里冒险了?我们以前不是叫 java.nio 吗。在 NIO 中发送消息之前使用 ByteBuffer.flip () ?ByteBuf 没有这样的方法,因为它有两个指针: 一个用于读操作,另一个用于写操作。当您将某些内容写入 ByteBuf 而读取器索引不变时,写入器索引会增加。读者索引和写者索引分别表示消息的开始和结束位置。

  • 相比之下,NIO 缓冲区并不提供一种清晰的方法来确定消息内容的开始和结束位置而不调用 flip 方法。当您忘记翻转缓冲区时,您将遇到麻烦,因为不会发送任何内容或错误的数据。这种错误在 Netty 不会发生,因为我们对不同的操作类型有不同的指针。当你习惯了它,你会发现它会让你的生活变得更加轻松。
  • 另一点需要注意的是 ChannelHandlerContext.write () 和 writeAndFlush () 方法返回 ChannelFuture。ChannelFuture 表示尚未发生的 I/O操作。这意味着,任何请求的操作可能尚未执行,因为所有操作在 Netty 都是异步的。例如,下面的代码可能会在发送消息之前关闭连接: Channel ch = ...; ch.writeAndFlush(message); ch.close();
  • 因此,您需要在 ChannelFuture 完成之后调用 close ()方法,该方法由 write ()方法返回,并在完成写操作后通知其侦听器。请注意,close () 也可能不会立即关闭连接,而是返回一个 ChannelFuture。

那么,当写请求完成时,我们如何得到通知呢?这很简单,可以添加一个ChannelFutureListener来监听返回的结果ChannelFuture。在这里,我们创建了一个新的匿名通道 ChannelFutureListener,当操作完成时它会关闭通道。

  • 或者,您可以使用预定义的侦听器简化代码:
代码语言:javascript
复制
f.addListener(ChannelFutureListener.CLOSE);
  • 要测试我们的时间服务器是否正常工作,可以使用 telnet localhost 8080 命令,终端在连接上后,打印消息后直接失去连接:
代码语言:javascript
复制
Yi

遗失对主机的连接。
客户端

与 DISCARD 和 ECHO 服务器不同,我们需要 TIME 协议的客户端,因为人不能将32位二进制数据转换为日历上的日期。在本节中,我们将讨论如何确保服务器正常工作,并学习如何使用 Netty 编写客户机。

  • 调整服务端接受请求并返回时间戳
代码语言:javascript
复制
 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // [1]
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        // 2208988800为1900年1月1日00:00:00~1970年1月1日00:00:00的总秒数
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
  • 客户端接收服务端的响应并转换为时间格式输出
代码语言:javascript
复制
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

public class TimeServer {
    /**
     * 端口
     */
    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 在 Netty,服务器和客户机之间最大也是唯一的区别是使用了不同的 Bootstrap 和 Channel 实现。请看下面的代码:
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new TimeServer(port).run();
    }
}

客户端接收到响应打印结果:

代码语言:javascript
复制
Tue Dec 29 12:01:58 CST 2020

流数据传输

在基于流的传输(如 TCP/IP)中,接收到的数据被存储到套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是一个包队列,而是一个字节队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将其视为两条消息,而只是将其视为一堆字节。因此,不能保证您所读到的内容与远程对等方所写的内容完全一致。

例如,假设一个操作系统的 TCP/IP 协议栈已经接收了三个数据包:

1

由于基于流的协议的这个一般属性,在应用程序中很有可能以下面的碎片形式读取它们:

2

因此,接收部分,无论是服务器端还是客户端,都应该将接收到的数据碎片整理成一个或多个有意义的帧,应用程序逻辑可以很容易地理解这些帧。在上面的例子中,接收到的数据应该如下所示:

3

第一个解决方案

现在让我们回到 TIME 客户端示例。我们这里也有同样的问题。32位整数是一个非常小的数据量,它不太可能经常被分段。然而,问题在于它可能是支离破碎的,并且随着流量的增加,支离破碎的可能性也会增加。

最简单的解决方案是创建一个内部累积缓冲区,并等待所有4个字节都被接收到内部缓冲区。以下是修改后的 TimeClientHandler 实现,它解决了这个问题:

代码语言:javascript
复制
public class TimeClientWithBufferHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf buff;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buff = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buff.release();
        buff = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg; // (1)
        buff.writeBytes(m);
        m.release();
        if (buff.readableBytes() >= 4) {
            long currentTimeMillis = (buff.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

  • 有两种生命周期监听方法:handlerAdded() and 及handlerRemoved()
  • 您可以执行任意初始化任务,只要它不长时间阻塞;
  • 首先,所有接收到的数据应该累积成buff
  • 然后,处理程序必须检查buff有足够的数据,在这个例子中是4个字节,然后继续进行实际的业务逻辑,当更多的数据到达时,这个函数会重新调用一个方法,最终所有的4个字节都会被累积;

4

  • 非 4 字节的数据会直接被丢弃掉。
第二种解决方案

尽管第一个解决方案已经解决了 TIME 客户机的问题,但是修改后的处理程序看起来并不那么干净。想象一个更复杂的协议,它由多个字段组成,比如一个可变长度的字段。您的 ChannelInboundHandler 实现将很快变得不可维护。

正如您可能已经注意到的,您可以向 ChannelPipeline 添加多个 ChannelHandler,因此,您可以将一个单片 ChannelHandler 分割为多个模块化的 ChannelHandler,以降低应用程序的复杂性。例如,你可以将 TimeClientHandler 分成两个处理器:

  • TimeDecoder 处理碎片化问题
  • 最初的简单版本 TimeClientHandler

幸运的是,Netty 提供了一个可扩展的类,可以帮助你写出第一个开箱即用的类:

代码语言:javascript
复制
public class TimeDecoder extends ByteToMessageDecoder { // (1)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}
public class TimeClientWithDecoder {

    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080;
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); 
            b.group(workerGroup);
            b.channel(NioSocketChannel.class); 
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast(new TimeDecoder()) // (5)
                            .addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  • ByteToMessageDecoder 使得处理分裂问题变得容易;
  • 每当接收到新数据时,ByteToMessageDecoder利用内部维护的累积缓冲区,调用decode方法来处理新数据;
  • 当累积缓冲区中没有足够的数据时ByteToMessageDecoder什么都不会添加到out缓冲区中。当收到更多的数据时会再次调用decode()
  • 如果decode()将一个数据添加到out, 这意味着解码器成功解码了一条信息,将丢弃累积缓冲区的读取部分。请记住,您不需要解码多个消息,ByteToMessageDecoder将继续调用方法,直到它没什么数据可以放入out了;
  • ChannelPipeline添加处理程序TimeDecoder来实现数据的分解。

还可以通过以下方式进一步简化解码器:

代码语言:javascript
复制
public class TimeWithReplayingDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(in.readBytes(4));
    }
}
// 同样的,别忘了在 ChannelPipeline 中添加相应的处理程序
 b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            //.addLast(new TimeDecoder())
                            .addLast(new TimeWithReplayingDecoder())
                            .addLast(new TimeClientHandler());
                }
            });

此外,Netty 提供了开箱即用的解码器,使您能够非常容易地实现大多数协议,并帮助您避免最终得到一个不可维护的整体处理程序实现。更详细的例子请参考以下软件包:

  • io.netty.example.factorial 二进制协议
  • io.netty.example.telnet 基于文本行的协议

对象序列化传输

到目前为止,我们讨论的所有示例都使用 ByteBuf 作为协议消息的主要数据结构。实际的网络通信过程远比上面的时间协议实现的要更复杂,功能也要更加强大,比如我们常用的 Json 序列化传输,如果用 Netty,能否直接传输对象呢?

在 ChannelHandlers 中使用 POJO 的优势是显而易见的;通过分离从处理程序中提取 ByteBuf 信息的代码,您的处理程序变得更加可维护和可重用。在 TIME 协议的客户端和服务器示例中,我们只读取一个32位整数,直接使用 Bytebuf 并不是一个主要问题。但是,您会发现在实现现实世界的协议时有必要进行分离。

首先,我们将我们要传输的时间戳封装成一个简单对象:

代码语言:javascript
复制
public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

增加解码器:

代码语言:javascript
复制
public class TimeDecoderWithPojo extends ByteToMessageDecoder { // (1)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        //out.add(in.readBytes(4)); // (4)
        out.add(new UnixTime(in.readUnsignedInt())); // (4)
    }
}

增加处理器:

代码语言:javascript
复制
public class TimeClientHandlerWithPojo extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

和前面一样,设置客户端的处理器:

代码语言:javascript
复制
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            //.addLast(new TimeDecoder())
                            .addLast(new TimeDecoderWithPojo())
                            .addLast(new TimeClientHandlerWithPojo());
                }
            });

响应结果如下:

通过更新的解码器,``TimeClientHandler 不再使用 ByteBuf。

更简单和优雅,对不对?同样的技术也可以应用于服务器端。

首先是消息处理器,负责发送一个时间戳数据作为响应结果:

代码语言:javascript
复制
public class TimeServerHandlerWithPojo extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UnixTime unixTime = new UnixTime();
        System.out.println("准备发送:"+ unixTime);
        final ChannelFuture f = ctx.writeAndFlush(unixTime);
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        // 当异常发生的时候关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

然后是编码处理器,将Pojo转换为ByteBuf进行传输:

代码语言:javascript
复制

public class TimeServerEncoderHandlerWithPojo extends ChannelOutboundHandlerAdapter {
    // 它是 ChannelOutboundHandler 的一个实现,它将 UnixTime 转换回 ByteBuf。这比编写解码器要简单得多,因为在对消息进行编码时不需要处理数据包碎片和汇编。
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
        // 首先,我们传递原始的 ChannelPromise as-is,这样当编码的数据实际写入到连线时,Netty 将其标记为成功或失败。
        // 其次,我们没有调用 ctx.flush ()。有一个单独的处理程序方法 void flush (ChannelHandlerContext ctx) ,它旨在重写 flush ()操作。
    }
}

最后是服务端ChannelPipeline程序设置:

代码语言:javascript
复制
  bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new TimeServerEncoderHandlerWithPojo())
                                    .addLast(new TimeServerHandlerWithPojo());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

服务端接收到请求发送数据:

客户端接收到请求的响应数据:

7

同样的,Netty 也为服务端的消息编码定义了很多拆箱即用的工具类:

代码语言:javascript
复制
public class TimeServerMessageToByteEncoderHandler extends MessageToByteEncoder<UnixTime> {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int) m.value());
        ctx.write(encoded, promise); // (1)
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
        out.writeInt((int) msg.value());
    }
}
// ChannelPipeline 设置
bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    //.addLast(new TimeServerEncoderHandlerWithPojo())
                                    .addLast(new TimeServerMessageToByteEncoderHandler())
                                    .addLast(new TimeServerHandlerWithPojo());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

关闭

关闭 Netty 应用程序通常非常简单,只需关闭通过 shutdownly() 创建的所有 EventLoopGroups 即可。它返回一个 Future,当 EventLoopGroup 完全终止并且属于该组的所有通道都已关闭时,它会通知您。(前文示例已演示多次,此处不再赘述。)

源码:https://gitee.com/zacsnz/architectrue-adventure/tree/master/netty-examples/netty-chapter-1

小结

Netty 作为高性能的异步通信框架,提供了很多很多好用的 API。

  • Channel: Channel 接口是 Netty 对网络操作抽象类,它除了包括基本的 I/O 操作,如 bind()、connect()、read()、write() 等。比较常用的Channel接口实现类是NioServerSocketChannel(服务端)和NioSocketChannel(客户端),这两个 Channel 可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上。Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
  • EventLoop: 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。那 Channel 和 EventLoop 直接有啥联系呢?Channel 为 Netty 网络操作(读写等操作)抽象类,EventLoop 负责处理注册到其上的Channel 处理 I/O 操作,两者配合参与 I/O 操作。
  • ChannelFuture: Netty 是异步非阻塞的,所有的 I/O 操作都为异步的。因此,我们不能立刻得到操作是否执行成功,但是,你可以通过 ChannelFuture 接口的 addListener() 方法注册一个 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。并且,你还可以通过ChannelFuture 的 channel() 方法获取关联的Channel。
  • ChannelHandler: 息的具体处理器。他负责处理读写操作、客户端连接等事情。
  • ChannelPipeline: ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。当 Channel 被创建时,它会被自动地分配到它专属的ChannelPipeline。我们可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个ChannelHandler ,因为一个数据或者事件可能会被多个 Handler 处理。当一个 ChannelHandler 处理完之后就将数据交给下一个 ChannelHandler 。
  • EventLoopGroup 包含多个 EventLoop(每一个 EventLoop 通常内部包含一个线程),上面我们已经说了 EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。
  • Bootstrap 是客户端的启动引导类/辅助类。
  • ServerBootstrap 客户端的启动引导类/辅助类。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构探险之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • [网络通信] Netty 入门实战
    • 依赖
      • 实战
        • 丢弃服务器
        • 响应服务器
        • 时间服务器
      • 流数据传输
        • 对象序列化传输
          • 关闭
            • 小结
            相关产品与服务
            容器服务
            腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档