首先,整理NIO进行服务端开发的步骤:
(1)创建ServerSocketChannel,配置它为非阻塞模式。
(2)绑定监听,配置TCP参数,backlog的大小。
(3)创建一个独立的I/O线程,用于轮询多路复用器Selector。
(4)创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKeyACCEPT。
(5)启动I/O线程,在循环体中执行Selector.select()方法,轮训就绪的Channel。
(6)当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端。
(7)设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数。
(8)将SocketChannel注册到Selector,监听OP_READ操作位。
(9)如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包。
(10)如果轮询的Channel为OP_WRITE,则说明还有数据没有发送完成,需要继续发送。
Netty时间服务器服务端 TimeServer:
1 package netty;
2
3 import io.netty.bootstrap.ServerBootstrap;
4 import io.netty.channel.ChannelFuture;
5 import io.netty.channel.ChannelInitializer;
6 import io.netty.channel.ChannelOption;
7 import io.netty.channel.EventLoopGroup;
8 import io.netty.channel.nio.NioEventLoopGroup;
9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11
12
13 public class TimeServer {
14
15 public void bind(int port) throws Exception{
16 //配置服务端的NIO线程组 一个用于服务端接收客户端的连接,另一个用于进行SocketChannel的网络读写
17 EventLoopGroup bossGroup = new NioEventLoopGroup();
18 EventLoopGroup workerGroup = new NioEventLoopGroup();
19 try{
20 ServerBootstrap b = new ServerBootstrap();
21 b.group(bossGroup,workerGroup)
22 .channel(NioServerSocketChannel.class)
23 .option(ChannelOption.SO_BACKLOG, 1024)
24 .childHandler(new ChildChannelHandler());
25 //绑定端口,同步等待成功
26 ChannelFuture f = b.bind(port).sync();
27 //等待服务器监听端口关闭
28 f.channel().closeFuture().sync();
29 }finally{
30 //优雅退出,释放线程池资源
31 bossGroup.shutdownGracefully();
32 workerGroup.shutdownGracefully();
33 }
34 }
35
36 private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
37 protected void initChannel(SocketChannel arg0) throws Exception{
38 arg0.pipeline().addLast(new TimeServerHandler());
39 }
40 }
41 }
ServerBootstrap是Netty用于启动NIO服务端的辅助类,目的是降低服务端的开发难度。
绑定childChannelHandler,其作用类似于Reactor模式中的handler类,主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。
使用bind绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成,完成后Netty会返回一个ChannelFuture,主要用于异步操作的通知回调。
Netty时间服务器服务端 TimeServerHandler:
1 package netty;
2
3 import java.io.IOException;
4 import io.netty.buffer.ByteBuf;
5 import io.netty.buffer.Unpooled;
6 import io.netty.channel.ChannelHandlerAdapter;
7 import io.netty.channel.ChannelHandlerContext;
8
9 public class TimeServerHandler extends ChannelHandlerAdapter{
10
11 public void channelRead(ChannelHandlerContext ctx,Object msg) throws IOException{
12 //将msg转换成Netty的ByteBuf对象
13 ByteBuf buf = (ByteBuf)msg;
14 //将缓冲区中的字节数组复制到新建的byte数组中,
15 byte[] req = new byte[buf.readableBytes()];
16 buf.readBytes(req);
17 //获取请求消息
18 String body = new String(req,"UTF-8");
19 System.out.println("The time server receive order:" + body);
20 //如果是"QUERY TIME ORDER"则创建应答消息
21 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
22 System.currentTimeMillis()).toString() : "BAD ORDER";
23 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
24 //异步发送应答消息给客户端
25 ctx.write(resp);
26 }
27
28 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
29 ctx.flush();
30 }
31
32 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
33 ctx.close();
34 }
35 }
相比昨天原生的NIO服务端,代码量大大减少。
Netty时间服务器客户端 TimeClient:
1 package netty;
2
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.channel.ChannelFuture;
5 import io.netty.channel.ChannelInitializer;
6 import io.netty.channel.ChannelOption;
7 import io.netty.channel.EventLoopGroup;
8 import io.netty.channel.nio.NioEventLoopGroup;
9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11
12 public class TimeClient {
13
14 public void connect(int port,String host) throws Exception{
15 //创建客户端处理I/O读写的NioEventLoopGroup Group线程组
16 EventLoopGroup group = new NioEventLoopGroup();
17 try{
18 //创建客户端辅助启动类Bootstrap
19 Bootstrap b = new Bootstrap();
20 b.group(group).channel(NioSocketChannel.class)
21 .option(ChannelOption.TCP_NODELAY, true)
22 .handler(new ChannelInitializer<SocketChannel>(){
23 //将ChannelHandler设置到ChannelPipleline中,用于处理网络I/O事件
24 @Override
25 protected void initChannel(SocketChannel ch) throws Exception {
26 ch.pipeline().addLast(new TimeClientHandler());
27 }
28 });
29 //发起异步连接操作,然后调用同步方法等待连接成功。
30 ChannelFuture f = b.connect(host,port).sync();
31
32 //等待客户端链路关闭
33 f.channel().closeFuture().sync();
34 }finally{
35 //优雅退出,释放NIO线程组
36 group.shutdownGracefully();
37 }
38 }
39
40 public static void main(String[] args) throws Exception{
41 int port = 8080;
42 if(args != null && args.length > 0){
43 try{
44 port = Integer.valueOf(args[0]);
45 }catch(NumberFormatException e){
46 //采用默认值
47 }
48 }
49 new TimeClient().connect(port, "127.0.0.1");
50 }
51
52 }
Netty时间服务器客户端 TimeClientHandler:
1 package netty;
2
3 import io.netty.buffer.ByteBuf;
4 import io.netty.buffer.Unpooled;
5 import io.netty.channel.ChannelHandlerAdapter;
6 import io.netty.channel.ChannelHandlerContext;
7
8 import java.util.logging.Logger;
9
10 public class TimeClientHandler extends ChannelHandlerAdapter{
11
12 private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
13
14 private final ByteBuf firstMessage;
15
16 public TimeClientHandler(){
17 byte[] req = "QUERY TIME ORDER".getBytes();
18 firstMessage = Unpooled.buffer(req.length);
19 firstMessage.writeBytes(req);
20 }
21
22 //当客户端与服务端TCP链路简历成功后,Netty的NIO线程会调用该方法,发送查询时间的指令给服务器
23 public void channelActive(ChannelHandlerContext ctx){
24 //将请求消息发送给服务端
25 ctx.writeAndFlush(firstMessage);
26 }
27
28 //当服务器返回应答消息时,该方法被调用
29 public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
30 ByteBuf buf = (ByteBuf) msg;
31 byte[] req = new byte[buf.readableBytes()];
32 buf.readBytes(req);
33 String body = new String(req,"UTF-8");
34 System.out.println("Now is :" + body);
35 }
36
37 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
38
39 //释放资源
40 logger.warning("Unexpected exception from downstream :" + cause.getMessage());
41 ctx.close();
42 }
43 }
运行结果:
Server:
Client: