为什么选择Netty
netty是业界最流行的NIO框架之一,它的健壮型,功能,性能,可定制性和可扩展性都是首屈一指的,Hadoop的RPC框架Avro就使用了netty作为底层的通信框架,此外netty在互联网,大数据,网络游戏,企业应用,电信软件等众多行业都得到了成功的商业应用。正因为以上的一些特性,使得netty已经成为java NIO编程的首选框架。
构建netty开发环境
其实使用netty很简单,直接将其jar包引入到工程中即可使用。 去 http://netty.io/网站上下载最新版本的jar包(由于官网上netty5已经被废弃,但是这里仍然使用netty5进行开发, 可以考虑从csnd下载),我这里下载的为:netty-5.0.0.Alpha1.tar.bz2。这其实是一个压缩文件,解压这个文件,取里面的所有类集合到一起的那个jar包netty-all-5.0.0.Alpha1.jar即可。另外还需要注意的是,我这里使用的jdk版本是1.8。
第一个netty程序 这里利用netty来实现一个时钟的小程序,服务器端接收特定的指令然后将服务器时间返回给客户端,客户端按照一定的时间间隔往服务器端发送命令。
1 package com.rampage.netty.time;
2
3 import io.netty.bootstrap.ServerBootstrap;
4 import io.netty.channel.ChannelFuture;
5 import io.netty.channel.ChannelInitializer;
6 import io.netty.channel.ChannelOption;
7 import io.netty.channel.EventLoopGroup;
8 import io.netty.channel.nio.NioEventLoopGroup;
9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11
12 /**
13 * 时钟程序的服务器端
14 * @author zyq
15 *
16 */
17 public class TimeServer {
18
19 public static void main(String[] args) throws Exception {
20 new TimeServer().bind(8080);
21 }
22
23 public void bind(int port) throws Exception {
24 // 配置服务器的NIO线程组
25 EventLoopGroup bossGroup = new NioEventLoopGroup();
26 EventLoopGroup workerGroup = new NioEventLoopGroup();
27
28 try {
29 ServerBootstrap bootStrap = new ServerBootstrap();
30
31 // 进行链式调用(每一次调用的返回结果都是ServerBootstrap)
32 // group带两个参数第一个表示给父(acceptor)用的EventExecutorGroup(其实就是线程池)
33 // 第二个参数表示子(client)线程池
34 // channel方法可以带一个ServerChannel类来创建进行IO操作的通道。
35 // option方法给Channel定制对应的选项
36 // childHandler方法用来处理Channel中的请求
37 bootStrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
38 .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
39
40 // 绑定端口,等待同步成功
41 // bind方法返回一个ChannelFuture类,就是相当于绑定端口并且创建一个新的channel
42 // sync方法会等待ChannelFuture的处理结束
43 ChannelFuture future = bootStrap.bind(port).sync();
44
45 // 等待服务器监听端口关闭
46 future.channel().closeFuture().sync();
47 } finally {
48 // 优雅地退出,释放线程资源
49 bossGroup.shutdownGracefully();
50 workerGroup.shutdownGracefully();
51 }
52 }
53
54 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
55
56 @Override
57 protected void initChannel(SocketChannel arg0) throws Exception {
58 arg0.pipeline().addLast(new TimeServerHandler());
59 }
60
61 }
62 }
1 package com.rampage.netty.time;
2
3 import java.util.Date;
4
5 import io.netty.buffer.ByteBuf;
6 import io.netty.buffer.Unpooled;
7 import io.netty.channel.ChannelHandlerAdapter;
8 import io.netty.channel.ChannelHandlerContext;
9
10 /**
11 * 时间服务器的处理类,只有netty5中的ChannelHandlerAdapter中才有ChannelRead和ChannelReadComplete方法。
12 * @author zyq
13 *
14 */
15 public class TimeServerHandler extends ChannelHandlerAdapter {
16 @Override
17 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
18 // netty中的ByteBuf类相当于jdk中的ByteBuffer类,但是功能更加强大
19 ByteBuf buf = (ByteBuf) msg;
20
21 // readableBytes返回缓冲区可读的字节数
22 byte[] req = new byte[buf.readableBytes()];
23
24 // 将缓冲区的字节数复制到新的字节数组中去
25 buf.readBytes(req);
26
27 // 根据客户端传来的信息得到应答信息
28 String body = new String(req, "UTF-8");
29 System.out.println("The time server receive order:" + body);
30 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
31
32 // 给客户端的回应
33 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
34
35 // 为了防止频繁唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入到SocketChannel中,而是把消息放入到缓冲数组
36 ctx.write(resp);
37 }
38
39 @Override
40 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
41 // 将放到缓冲数组中的消息写入到SocketChannel中去
42 ctx.flush();
43 }
44
45 @Override
46 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
47 ctx.close();
48 }
49
50
51 }
1 package com.rampage.netty.time;
2
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.channel.ChannelFuture;
5 import io.netty.channel.ChannelInitializer;
6 import io.netty.channel.ChannelOption;
7 import io.netty.channel.EventLoopGroup;
8 import io.netty.channel.nio.NioEventLoopGroup;
9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11
12 /**
13 * 时钟程序的客户端
14 * @author zyq
15 *
16 */
17 public class TimeClient {
18
19 public static void main(String[] args) throws Exception {
20 new TimeClient().connect("127.0.0.1", 8080);
21 }
22
23 public void connect(String host, int port) throws Exception {
24 // 配置客户端NIO线程池
25 EventLoopGroup group = new NioEventLoopGroup();
26
27 Bootstrap strap = new Bootstrap();
28 try {
29 // 这里用了匿名内部类,各个函数的含义同Server端
30 strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
31 .handler(new ChannelInitializer<SocketChannel>() {
32
33 @Override
34 protected void initChannel(SocketChannel arg0) throws Exception {
35 arg0.pipeline().addLast(new TimeClientHandler());
36 }
37
38 });
39
40 // 发起异步连接操作
41 ChannelFuture future = strap.connect(host, port).sync();
42
43 // 等待客户端关闭(注意调用的是closeFuture如果直接调用close会立马关闭)
44 future.channel().closeFuture().sync();
45 } finally {
46 // 优雅的关闭
47 group.shutdownGracefully();
48 }
49 }
50 }
1 package com.rampage.netty.time;
2
3 import java.util.logging.Logger;
4
5 import io.netty.buffer.ByteBuf;
6 import io.netty.buffer.Unpooled;
7 import io.netty.channel.ChannelHandlerAdapter;
8 import io.netty.channel.ChannelHandlerContext;
9
10 public class TimeClientHandler extends ChannelHandlerAdapter {
11
12 private static final Logger LOGGER = Logger.getLogger(TimeClientHandler.class.getName());
13
14 private final ByteBuf firstMsg;
15
16 public TimeClientHandler() {
17 byte[] req = "QUERY TIME ORDER".getBytes();
18 firstMsg = Unpooled.buffer(req.length);
19 firstMsg.writeBytes(req);
20 }
21
22 /**
23 * channel连通之后的处理
24 */
25 @Override
26 public void channelActive(ChannelHandlerContext ctx) throws Exception {
27 ctx.writeAndFlush(firstMsg);
28 }
29
30 @Override
31 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
32 ByteBuf buf = (ByteBuf) msg;
33 byte[] resp = new byte[buf.readableBytes()];
34 buf.readBytes(resp);
35
36 String body = new String(resp, "UTF-8");
37 System.out.println("Now is:" + body);
38
39 // 两秒钟后继续向服务器端发送消息
40 Thread.sleep(2000);
41 byte[] req = "QUERY TIME ORDER".getBytes();
42 ByteBuf sendMsg = Unpooled.buffer(req.length);
43 sendMsg.writeBytes(req);
44 ctx.writeAndFlush(sendMsg);
45 }
46
47 @Override
48 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
49 LOGGER.warning("Unexpected exception from downstream:" + cause.getMessage());
50 ctx.close();
51 }
52
53
54 }
服务器端的运行结果如下:
The time server receive order:QUERY TIME ORDER The time server receive order:QUERY TIME ORDER The time server receive order:QUERY TIME ORDER The time server receive order:QUERY TIME ORDER
...
客户端的运行结果如下:
Now is:Wed Aug 03 05:55:30 PDT 2016 Now is:Wed Aug 03 05:55:33 PDT 2016 Now is:Wed Aug 03 05:55:35 PDT 2016 Now is:Wed Aug 03 05:55:37 PDT 2016
...
可以发现通过Netty框架来实现NIO极大的简化了编码和维护难度。代码调理更加清晰。
TimeClientHandler