专栏首页个人分享Netty服务端与客户端(源码一)

Netty服务端与客户端(源码一)

首先,整理NIO进行服务端开发的步骤:

  (1)创建ServerSocketChannel,配置它为非阻塞模式

  (2)绑定监听配置TCP参数backlog的大小

  (3)创建一个独立的I/O线程,用于轮询多路复用器Selector

  (4)创建Selector,将之前创建的ServerSocketChannel注册到Selector上监听SelectionKeyACCEPT

  (5)启动I/O线程,在循环体中执行Selector.select()方法,轮训就绪的Channel。

  (6)当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端。

  (7)设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数。

  (8)将SocketChannel注册到Selector,监听OP_READ操作位

  (9)如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包。

  (10)如果轮询的Channel为OP_WRITE,则说明还有数据没有发送完成,需要继续发送。  

 Netty时间服务器服务端 TimeServer:

 1 package netty;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 
12 
13 public class TimeServer {
14     
15     public void bind(int port) throws Exception{
16         //配置服务端的NIO线程组 一个用于服务端接收客户端的连接,另一个用于进行SocketChannel的网络读写
17         EventLoopGroup bossGroup = new NioEventLoopGroup();
18         EventLoopGroup workerGroup = new NioEventLoopGroup();
19         try{
20             ServerBootstrap b = new ServerBootstrap();
21             b.group(bossGroup,workerGroup)
22             .channel(NioServerSocketChannel.class)
23             .option(ChannelOption.SO_BACKLOG, 1024)
24             .childHandler(new ChildChannelHandler());
25             //绑定端口,同步等待成功
26             ChannelFuture f = b.bind(port).sync();
27             //等待服务器监听端口关闭
28             f.channel().closeFuture().sync();
29         }finally{
30             //优雅退出,释放线程池资源
31             bossGroup.shutdownGracefully();
32             workerGroup.shutdownGracefully();
33         }
34     }
35     
36     private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
37         protected void initChannel(SocketChannel arg0) throws Exception{
38             arg0.pipeline().addLast(new TimeServerHandler());
39         }
40     }
41 }

ServerBootstrap是Netty用于启动NIO服务端的辅助类,目的是降低服务端的开发难度。

绑定childChannelHandler,其作用类似于Reactor模式中的handler类,主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。

使用bind绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成,完成后Netty会返回一个ChannelFuture,主要用于异步操作的通知回调

Netty时间服务器服务端 TimeServerHandler:

 1 package netty;
 2 
 3 import java.io.IOException;
 4 import io.netty.buffer.ByteBuf;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelHandlerAdapter;
 7 import io.netty.channel.ChannelHandlerContext;
 8 
 9 public class TimeServerHandler extends ChannelHandlerAdapter{
10     
11     public void channelRead(ChannelHandlerContext ctx,Object msg) throws IOException{
12         //将msg转换成Netty的ByteBuf对象
13         ByteBuf buf = (ByteBuf)msg;
14         //将缓冲区中的字节数组复制到新建的byte数组中,
15         byte[] req = new byte[buf.readableBytes()];
16         buf.readBytes(req);
17         //获取请求消息
18         String body = new String(req,"UTF-8");
19         System.out.println("The time server receive order:" + body);
20         //如果是"QUERY TIME ORDER"则创建应答消息
21         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
22                 System.currentTimeMillis()).toString() : "BAD ORDER";
23                 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
24         //异步发送应答消息给客户端
25         ctx.write(resp);
26     }
27     
28     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
29         ctx.flush();
30     }
31     
32     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
33         ctx.close();
34     }
35 }

相比昨天原生的NIO服务端,代码量大大减少。

 Netty时间服务器客户端 TimeClient:

 1 package netty;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11 
12 public class TimeClient {
13     
14     public void connect(int port,String host) throws Exception{
15         //创建客户端处理I/O读写的NioEventLoopGroup Group线程组
16         EventLoopGroup group = new NioEventLoopGroup();
17         try{
18             //创建客户端辅助启动类Bootstrap
19             Bootstrap b = new Bootstrap();
20             b.group(group).channel(NioSocketChannel.class)
21             .option(ChannelOption.TCP_NODELAY, true)
22             .handler(new ChannelInitializer<SocketChannel>(){
23                 //将ChannelHandler设置到ChannelPipleline中,用于处理网络I/O事件
24                 @Override
25                 protected void initChannel(SocketChannel ch) throws Exception {
26                     ch.pipeline().addLast(new TimeClientHandler());
27                 }
28             });
29             //发起异步连接操作,然后调用同步方法等待连接成功。
30             ChannelFuture f = b.connect(host,port).sync();
31             
32             //等待客户端链路关闭
33             f.channel().closeFuture().sync();
34         }finally{
35             //优雅退出,释放NIO线程组
36             group.shutdownGracefully();
37         }
38     }
39     
40     public static void main(String[] args) throws Exception{
41         int port = 8080;
42         if(args != null && args.length > 0){
43             try{
44                 port = Integer.valueOf(args[0]);
45             }catch(NumberFormatException e){
46                 //采用默认值
47             }
48         }
49         new TimeClient().connect(port, "127.0.0.1");
50     }
51     
52 }

 Netty时间服务器客户端 TimeClientHandler:

 1 package netty;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 import java.util.logging.Logger;
 9 
10 public class TimeClientHandler extends ChannelHandlerAdapter{
11     
12     private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
13     
14     private final ByteBuf firstMessage;
15     
16     public TimeClientHandler(){
17         byte[] req = "QUERY TIME ORDER".getBytes();
18         firstMessage = Unpooled.buffer(req.length);
19         firstMessage.writeBytes(req);
20     }
21     
22     //当客户端与服务端TCP链路简历成功后,Netty的NIO线程会调用该方法,发送查询时间的指令给服务器
23     public void channelActive(ChannelHandlerContext ctx){
24         //将请求消息发送给服务端
25         ctx.writeAndFlush(firstMessage);
26     }
27     
28     //当服务器返回应答消息时,该方法被调用
29     public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
30         ByteBuf buf = (ByteBuf) msg;
31         byte[] req = new byte[buf.readableBytes()];
32         buf.readBytes(req);
33         String body = new String(req,"UTF-8");
34         System.out.println("Now is :" + body);
35     }
36     
37     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
38         
39         //释放资源
40         logger.warning("Unexpected exception from downstream :" + cause.getMessage());
41         ctx.close();
42     }
43 }

 运行结果:

Server:

Client:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Netty的TCP粘包/拆包(源码二)

    假设客户端分别发送了两个数据包D1和D2给服务器,由于服务器端一次读取到的字节数是不确定的,所以可能发生四种情况:

    用户3003813
  • Hadoop常用命令

    HDFS基本命令: hadoop fs -cmd cmd: 具体的操作,基本上与UNIX的命令行相同 args:参数 HDFS资源URI格式: scheme:/...

    用户3003813
  • storm系统架构学习

        运行Storm nimbus后台服务的节点(Nimbus),它是storm系统的中心,负责接收用户提交的作业(如同spark submit一样 即为ja...

    用户3003813
  • 基于netty的websocket协议的对话小程序

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    suveng
  • Netty分隔符和定长解码器使用

      TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式

    用户4919348
  • Netty之入门案例

      前面给大家介绍了NIO,我们会发现用NIO实现异步非阻塞的网络通信代码量非常大,而且并不是很好理解,在实际的开发中一般我们也都是会实现基于NIO的框架来操作...

    用户4919348
  • Netty的TCP粘包/拆包(源码二)

    假设客户端分别发送了两个数据包D1和D2给服务器,由于服务器端一次读取到的字节数是不确定的,所以可能发生四种情况:

    用户3003813
  • Netty入门(一)

    在文章开始之前首先明确一个问题,为什么要使用Netty,Netty解决了什么问题,围绕着这个问题我们开始本篇文章的学习

    tanoak
  • Netty入门HelloWorld

    今天下班之后无聊,学习了一下长链接的一款非常秀的框架——netty,netty在很多?java开发的中间件中都有很坚实的地位。于是,在下班之余我学习了一下这款优...

    奕仁
  • java架构之路-(netty专题)netty的基本使用和netty聊天室

      上次博客,我们主要说了我们的IO模型,BIO同步阻塞,NIO同步非阻塞,AIO基于NIO二次封装的异步非阻塞,最重要的就是我们的NIO,脑海中应该有NIO的...

    小菜的不能再菜

扫码关注云+社区

领取腾讯云代金券