前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty入门(Netty4.x使用指南)

Netty入门(Netty4.x使用指南)

作者头像
小诸葛
发布2020-04-14 15:55:48
3640
发布2020-04-14 15:55:48
举报
文章被收录于专栏:方法论方法论

前言

问题

现如今,我们使用通用的应用程序或库来相互通信。例如,我们经常使用HTTP客户端库从服务器上获取信息并通过web服务执行远程过程调用。但是,通用协议或它的实现有时并不能很好的伸缩。这就像我们不会使用通用HTTP服务器来交换大文件、电子邮件、还有像金融信息、游戏数据等实时信息。这些业务所需要的是高度优化实现协议,用于专门的目的。例如,您可能希望实现一个针对基于ajax的聊天应用程序、媒体流应用、大文件传输进行优化的http服务器。您甚至可能想要设计并实现一个完全符合您的需求的新协议。另一个不可避免的情况是,你不得不去处理一个遗留的专有协议,来保证和旧系统的互操作性。在这些情况下,重要的是在不牺牲最终应用程序的稳定性和性能的前提前,如何尽可能快的实现该协议。

解决方案

Netty项目致力于提供一个异步事件驱动的网络应用框架和工具,以便快速开发可维护的高性能和高扩展性协议的服务器和客户端。

换句话说,Netty是一个NIO服务器客户端框架,它支持快速简单的开发协议服务器和客户端等网络应用程序。它极大的简化和流线化了网络开发(例如TCP和UDP安全套接字服务器开发)。

快速和简单并不意味着最终的应用程序会出现可维护性和性能问题。Netty经过精心设计,从许多已实现协议(例如FTP、SMTP、HTTP)和众多二进制和基于文本的协议中吸取经验。最终,Netty在不妥协的前提下成功找到一个方法来实现简单的开发、高性能、稳定、灵活的应用。

一些开发者可能已经找到其他声称具有相同优势的网络应用框架,那么你可能会问Netty与它们有什么不同。答案是它所基于的哲学,Netty旨在从第一天起就为你提供最舒服的API和实现体验。它并不是什么有形的东西,但是当你阅读这份指南并使用Netty时你会发现:这个哲学将使你的生活更加的简单.

开始之前

运行本章中的案例有两点最低要求:Netty和JDK 1.6或以上的最低版本。Netty最新版本可以在项目下载页下载。为了下载和JDK版本相匹配的版本,请参考您首选的jdk运营商网站。

当你阅读的时候,你可能会对本章中介绍的一些类有一些疑问,当你想要了解更多关于这些类的信息的时候,你可以参考API。为了方便,文中的所有类都有链接到显示API文档。并且,如果哪里有不正确的信息、错误的语法或错别字或你有一些好的建议来提高这篇文档,请毫不犹豫的联系Netty项目社区让我们知道。

编写一个’丢弃’服务器

世界上最简单的协议不是“Hello,World!”而是“丢弃”,它是一种丢弃任何接收到的数据而没有任何响应的协议。

想要实现“丢弃”协议,你唯一要做的一件事就是忽略所有接收到的数据。让我们直接从处理器实现者开始,它处理Netty生成的I/O事件。

代码语言:javascript
复制
package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 1.DiscardServerHandler 继承自ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现。ChannelInboundHandler提供多种你可以覆写的事件处理方法。现在,只需继承ChannelInboundHandlerAdapter 就可以了,而不必自己实现ChannelInboundHandler这个接口;
  2. 2.在这里我们覆写了channelRead()这个事件处理方法。每当接收到客户端的新消息时,这个方法就会被调用。在这个案例中,接收到的消息的类型是ByteBuf(字节缓冲);
  3. 3.为了实现丢弃协议,处理器必须忽略接收到的消息。ByteBuf是一个必须明确通过release()方法释放的引用计数对象。请记住:释放任何传递到处理器的引用计数对象都是处理器的责任。通常,channelRead()处理器方法是像下面这样实现的.
代码语言:javascript
复制
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. 4.当Netty由于I/O引发异常或者处理器实现在处理事件时发生异常,exceptionCaught()事件处理方法就会被调用。在多数情况下,被捕获的异常应该被记录并且与之关联的通道也应该被关闭,尽管这个方法的实现取决于你打算怎样处理这样的异常情况。例如,在关闭连接之前,你可能会发送一个含有错误码的响应消息。

到目前还好,我们已经实现了丢弃服务器的前半部分,剩下的就是写main()方法来启动丢弃服务器处理器这个服务器。

代码语言:javascript
复制
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}
  1. 1.NioEventLoopGroup是一个用来处理I/O操作的多线程事件循环。Netty为不同类型的传输提供了多种EventLoopGroup的实现。在这个案例中,我们实现了服务端应用程序,因此会使用两个NioEventLoopGroup。第一个,通常被称为“老板”,接受传入连接。第二个,通常被称为“工人”,当“老板”接受了连接并将接受的连接注册给工人后,“工人”会处理接受的连接的传输。使用多少线程以及如何将它们映射到创建的Channels取决于EventLoopGroup的实现,甚至可以通过构造参数进行配置;
  2. 2.ServerBootstrap是一个工具类,可以用来设置服务器。你可以直接通过通道设置服务器。但是,请注意这是一个乏味的过程,在大多数情况下你不必这么做;
  3. 3.在这里,我们指定使用NioServerSocketChannel类,它可以实例化一个新的通道来接受传入的连接;
  4. 4.这里指定的处理器总是在接收到一个新的通道的时候实例化。ChannelInitializer是一个用来帮助用户配置一个新通道的特殊的处理器。您很有可能希望通过添加一些类似DiscardServerHandler的处理器来配置一个新通道的通道管道线来实现您的网络应用。随着应用变得复杂,您可能给管道线添加更多的处理器,最终将这个匿名类抽象到一个顶级类里。
  5. 5.您还可以给通道的实现设置特殊的参数。我们在开发一个TCP/IP服务器,因此我们可以给套接字的参数设置值,例如tcpDelay和keepAlive等参数。详情可参考ChannelOption类和ChannelConfig的实现类的api文档来获取ChannelOption更多的特性。
  6. 6.你注意到option()和childOption()这两个方法了吗?option()方法是NioServerSocketChannel用来接收到来的连接的。childOption()是父服务器通道接收通道的,在本例中是NioServerSocketChannel。
  7. 7.至此已经准备好了。剩下的就是绑定端口并启动服务器。在这里,我们绑定到机器上所有NICS(网络接口卡)的8080端口。现在你可以任意多次调用bind()方法(绑定的地址要不同才可以)。

恭喜您,你刚刚完成了Netty上的第一个服务器。

查看收到的数据

现在我们已经编写了第一个服务器,我们需要测试它是否真的可以工作。测试的最简单的方法就是使用telnet命令,例如,你可以在命令行界面输入 telnet localhost 8080,回车,然后再输入一些其他的内容。

但是,我们能说这个服务器是正常工作的吗?我们并不能真正知道,因为这是一个丢弃服务器,你根本就不会得到任何响应。为了证明它真的工作,我们来修改下服务器让它打印它接收到的数据。

我们已经知道当数据到达后channelRead方法会被调用。让我们向DiscardServerHandler的channelRead方法中添加下面这些代码。

代码语言:javascript
复制
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 1.这个低效的循环可以直接简化为:
代码语言:javascript
复制
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  1. 2.同样,你可以在这里执行
代码语言:javascript
复制
in.release()

如果你再运行telnet命令,你会看到服务器会打印它接收到的数据。

丢弃服务器的完整源码位于发行版的io.netty.example.discard包中。

写一个应答服务器

到目前为止,我们消费的数据完全没有应答。但是,一个服务器,通常都是用来响应请求的。现在我们来学习如何通过实现应答协议来向客户端响应消息,将接收到的数据发回原处。

应答服务器与我们在前面的章节中实现的丢弃服务器的唯一区别是:它将接收到的数据发回原处,而不是打印到控制台。因此,仅需要修改channelRead方法就可以了。

代码语言:javascript
复制
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. 1.一个ChannelHandlerContext对象(通道处理器上下文,也可理解为通道处理器容器)提供了许多操作来允许你你触发各种I/O事件和操作。在这里,我们调用了write(Object)方法来逐字的编写收到的消息。请注意,我们并没有像丢弃服务器案例中那样释放收到的消息。这是因为Netty会在它被写出来的时候释放掉。
  2. 2.ctx.write(Object)并不会将消息写到通道里,它在内部进行缓冲,然后通过ctx.flush()方法将数据刷新到通道里。或者,为了简便,你可以调用ctx.writeAndFlush(msg)方法。

如果你再次运行telnet命令,你会看到服务器将你发送的任何消息返回。

应答服务器的完整源码的位置在发行版的io.netty.example.echo包里。

写一个时间服务器

本节要实现的协议是时间协议。它与前面的例子不同,它会发送一个32位整数的消息,不接收任何请求,并在发送消息之后关闭连接。在这个例子中,你会学到如何构造并发送一个消息,当发送完成后关闭连接。

因为我们会忽略任何接收到的数据,并且,当一个连接建立后,尽可能快的发送一个消息,因而这次我们不能使用channelRead方法了。替代的是覆写channelActive()方法。具体实现如下:

代码语言:javascript
复制
package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 1.正如上面所提到的,当一个连接建立并准备进行数据传输时,channelActive()方法会被调用,让我们在这个方法中写一个代表当前时间的32位整数。
  2. 2.为了发送一个新的消息,我们需要分配一个容纳这个消息的新的buffer(缓冲)。我们会写入一个32位的整数,因而我们需要一个容量至少为4个字节的字节缓冲。通过ChannelHandlerContext.alloc()方法获取当前ByteBufAllocator(字节缓冲分配器)并分配一个新的buffer。
  3. 3.像往常一样,我们编写构造好的消息。

但是,等等,flip在哪呢?在NIO中,我们发送消息之前不是要调用java.nio.ByteBuffer.flip()方法的吗?ByteBuf并没有这样的方法,因为它有两个指针:一个用于读操作,一个用于写操作。当你往ByteBuf写入数据的时候写索引增加,但是读索引并未改变。读索引和写索引各自代表着消息的读写位置。

相比之下,如果不调用flip方法,NIO buffer并没有提供一种清晰发方法来确定消息内容的开始位置和结束位置。当你忘记反转(flip)buffer的时候你会困惑,因为错误数据甚至空数据会被发送。这样的错误在Netty中不会发生,因为对于不同类型的操作(读和写)有不同的指针。当你使用它后,你会发现他使得你的生活非常简单——一个没有反转的生活。

另一点需要注意的是,ChannelHandlerContext.write()(包括writeAndFlush())方法返回一个ChannelFuture。一个ChannelFuture代表一个还未发生的I/O操作。这意味着,由于Netty是异步操作,所以可能还没有对请求执行任何处理操作。 例如,下面的代码可能在消息到来之前就把连接关闭了。

代码语言:javascript
复制
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();

因此,你需要在ChannelFuture完成之后再调用close()方法,ChannelFuture是write()方法返回的,当写操作完成后,它通知它的监听器。请注意,close()方法也可能不会立即关闭,它也返回一个ChannelFuture方法。

  1. 4.当请求完成后我们是如何得到通知的呢?这就像在返回的ChannelFuture中添加一个ChannelFutureListener一样简单。这里,我们创建一个新的ChannelFutureListener,当操作完成后,它会关闭Channel。

或者,你可以使用预定义的监听器简化代码:

代码语言:javascript
复制
f.addListener(ChannelFutureListener.CLOSE);

为了测试我们的时间服务器像我们期望的那样工作,你可以使用UNIX命令 rdate:

代码语言:javascript
复制
$ rdate -o <port> -p <host>

标签替换成main()方法中指定的端口号,标签一般使用localhost。

编写一个时间客户端

不同于丢弃服务器和应答服务器,我们需要一个时间协议的客户端,因为用户不可能让用户将一个32位二进制数据转化为一个时间。在本节中,我们学习如何确定服务端是否正常工作和如何编写一个Netty客户端。

Netty客户端和服务端的最大区别也是唯一区别就是它们使用不同的Bootstrap(启动器)和Channel(通道)实现。请看下面的代码:

代码语言:javascript
复制
public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. 1.Bootstrap和ServerBootstrap比较相似,不同的是Bootstrap是用于非服务端通道的,例如客户端或者无连接通道;
  2. 2.如果只指定一个EventLoopGroup,它将既用于boos组,也用于worker组。但是boss和worder并不用于客户端;
  3. 3.不同于NioServerSocketChannel,NioSocketChannel被用于创建客户端的通道(Channel);
  4. 4.注意,在这里我们没有像在ServerBootstrap中那样使用childOption()这个方法,因为客户端的SocketChannel没有父通道。
  5. 5.我们应该调用connet()方法,而不是bind()方法。

正如你所能看到的,它真的和服务端的代码不一样。ChannelHandler的实现应该是怎样的呢?它应该从服务端接收32位整数,然后转换为用户可读的格式,并打印转换后的时间,然后关闭连接:

代码语言:javascript
复制
import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 1.在TCP/IP中,Netty将对等点发来的数据读入ByteBuf。

它看起来非常简单,并且和服务端示例没有任何不同。但是,这个处理器有时会拒绝工作,并抛出一个IndexOutBoundsException异常。我们在后续的章节中研究为何会这样。

处理基于流的传输
套接字缓冲区的一个小警告

在像TCP/IP那样基于流的传输中,接收到的数据被存入一个套接字接收缓冲区中。不幸的是,基于流的传输所传输的缓冲不是数据包队列,而是字节队列。也就是说,即使你发送的两条消息是独立的数据包,操作系统不会将它们当做两条消息,而是仅将它们当做一组字节处理。因此,并不能保证你读到的内容与远程对等点写的内容完全一致。例如,让我们假设操作系统的TCP/IP栈已经接收到三个数据包:

因为这是基于流的协议的一般属性,在您的应用程序中,很有可能以以下片段形式阅读它们:

因此,接收到的部分,不管是服务端还是客户端,都应该将接收到的数据整理成一个或多个有意义的数据帧,以便应用程序能够轻松理解。在上面的例子种,接收到的数据应该像下面的帧:

第一种解决方法

现在让我们回到时间客户端的例子中,例子中有着同样的问题。一个32位的整数是非常小的数据,通常不会被碎片化。但是,问题是,它还是可以被碎片化,随着流量的增加,碎片化的可能性也会增加。

最简单的解决方法就是创建一个内部累积缓冲,一直等待到4byte数据全部被接受近内部缓冲里。下面是时间客户端处理器实现的修改,这样就解决了这个碎片化问题:

代码语言:javascript
复制
import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 1.ChannelHandler有两个生命周期监听方法:handlerAdded()和handlerRemoved()。你可以执行任意初始化任务,只要不会阻塞很长时间;
  2. 2.首先,所有接收到的数据应该被累积进buf内;
  3. 然后,处理器必须检查buf是否有足够的数据,在这个例子中是4byte,
  4. 3.然后进行实际的业务逻辑。否则,当更多的数据到来的时候,Netty会再次调用channelRead()方法,最终将累加够4byte的数据。
第二种解决方法

尽管第一种方法解决了时间客户端的问题,修改后的处理器看起来不是很整洁。设想一个更复杂的协议,它由多个字段(例如可变长度的字段)组成。你的ChannelInboundHandler实现很快会变得无法维护。

正如你注意到的,你可以给通道管道线添加不止一个ChannelHandler(通道处理器),因此,你可以将单个ChannelHandler拆分成多个模块处理程序,以降低应用程序的复杂性。例如,你可以将TimeClientHandler拆分成两个处理器:

  • 1.TimeDecoder,处理碎片问题的时间解码器,以及其他
  • 2.TimeClientHandler最初的版本。

幸运的是,Netty提供了扩展类,帮助你编写第一个开箱即用的类:

代码语言:javascript
复制
public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. 1.ByteToMessageDecoder是ChannelInboundHandler的一个实现,它使得处理碎片化问题变得非常简单;
  2. 2.每当接收到新数据的时候,ByteToMessageDecoder都会使用内部维护的累加缓冲区调用decode()方法;
  3. 3.decode可以决定在累加缓冲区没有足够数据的地方不添加任何数据。ByteToMessageDecoder在接收到足够多的数据后会再次调用decode()方法;
  4. 4.如果decode()方法往out中添加了一个对象,也就意味着解码器成功解码了一个消息。ByteToMessageDecoder会丢弃累加缓冲区中已读的数据。请记住:你不需要解码多个消息。ByteToMessageDecoder会持续调用decode()方法,知道它没有任何内容可以添加为止。

既然我们有另一个处理器可以插入ChannelPipeline(通道管道线)中,我们应该在时间客户端中修改ChannelInitializer的实现了:

代码语言:javascript
复制
b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果你是一个喜欢探索的人,你可能想尝试使用ReplayingDecoder,它极大简化了解码器。更多信息你可以参考API文档。

代码语言:javascript
复制
public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty提供了开箱即用的解码器可以让你非常容易的实现更多的协议,并且帮助你避免最终得到不可维护的单一处理器实现,更多详细的案例可以参考下面包里的案例:

  • 1.io.netty.example.factorial 用于二进制协议
  • 2.io.netty.example.telnet 用于基于文本行的协议
用POJO替代ByteBuf

到目前为止,我们看到的所有案例的协议消息使用的数据结构都是ByteBuf。在本节中,我们会通过使用POJO替代ByteBuf来改进时间协议的客户端和服务端案例。

在你的通道处理器(ChannelHandler)中使用POJO的好处是很明显的。通过将ByteBuf信息代码从处理器的中抽取出来,你的处理器变得可维护和重用。在时间客户端和服务端例子中,我们直接使用ByteBuf来读取32位整数并没有什么问题,但是,你会发现,在实现实际协议时,有必要进行分离。

首先,我们来定义一个新的类,叫做:UnixTime

代码语言:javascript
复制
import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

我们现在可以修改时间解码器来生成一个UnixTime而不是ByteBuf.

代码语言:javascript
复制
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

由于修改了解码器,时间客户端处理器不在需要使用ByteBuf:

代码语言:javascript
复制
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

非常简单而优雅,是不是?同样的技术可以应用在服务端。这次我们首先更新时间服务器处理器:

代码语言:javascript
复制
@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

现在,唯一缺少的就是编码器了,它是ChannelOutboundHandler的实现,用来将UnixTime转成ByteBuf。相比较编写一个解码器,编写编码器非常简单,因为当编码一个消息时不需要处理数据包的碎片化和装配。

代码语言:javascript
复制
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 1.单单这么一行中却有挺多重要的东西。 首先,我们传递了传统的ChannelPromise以便当编码的数据被实际写入数据链路中后,Netty将其标记为成功或失败。 第二,我们没有调用ctx.flush()。有一个单独的处理器方法 :void flush(ChannelHandlerContext ctx),这个方法的目的就是覆写flush()操作。
  2. 2.想要进一步的简化,你可以使用MessagetToByteEncoder:
代码语言:javascript
复制
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

最后的工作就是在服务器的通道管道线的时间服务器处理器的前面添加时间编码器(TimeEncoder),这就很容易了。

关闭你的应用

关闭一个Netty应用通常就像通过shutdownGracefully()关闭你所创建的所有EventLoopGroup一样简单。它将返回一个Future,当EventLoopGroup完全被终止,并且所有的通道已被关闭,它将通知你。

总结

在本章中,我们简要介绍了Netty,并演示了如何在Netty上编写一个完整工作的网络应用程序。

在下面的章节中将有更多关于Netty的详细信息。我们也鼓励你复习io.netty.example包中的Netty案例。

请注意,社区永远都期待你的问题和建议,你的反馈可以帮助你理解Netty也帮助改进Netty和它的文档。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小诸葛的博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 问题
      • 解决方案
        • 开始之前
          • 编写一个’丢弃’服务器
            • 查看收到的数据
              • 写一个应答服务器
                • 写一个时间服务器
                  • 编写一个时间客户端
                    • 处理基于流的传输
                      • 套接字缓冲区的一个小警告
                      • 第一种解决方法
                      • 第二种解决方法
                    • 用POJO替代ByteBuf
                      • 关闭你的应用
                        • 总结
                        相关产品与服务
                        容器服务
                        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档