前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >NIO源码阅读

NIO源码阅读

作者头像
用户3003813
发布2018-09-06 14:31:51
4610
发布2018-09-06 14:31:51
举报
文章被收录于专栏:个人分享个人分享

  自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:

  1、客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECTION等待后续结果,不需要像BIO的客户端一样被同步阻塞。

  2、SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信模型就可以处理其他的链路,不需要同步等待这个链路可用。

  3、线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,没有连接句柄的限制,那么Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。所以它非常适合做高性能、高负载的网络服务器。

  TimeClient:

代码语言:javascript
复制
 1 package nio;
 2 
 3 public class TimeClient {
 4     public static void main(String args[]){
 5         int port = 8080;
 6         if(args != null && args.length > 0){
 7             try{
 8                 port = Integer.valueOf(args[0]);
 9             }catch(NumberFormatException e){
10                 //采用默认值
11             }
12         }
13         new Thread(new TimeClientHandle("120.0.0.1",port),"TimeClient-001").start();
14     }
15 }

TimeClientHandler:

代码语言:javascript
复制
  1 package nio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.SocketChannel;
  9 import java.util.Iterator;
 10 import java.util.Set;
 11 
 12 public class TimeClientHandle implements Runnable{
 13     private String host;
 14     private int port;
 15     private Selector selector;
 16     private SocketChannel socketChannel;
 17     private volatile boolean stop;
 18     
 19     public TimeClientHandle(String host,int port){
 20         this.host = host == null ? "127.0.0.1" : host;
 21         this.port = port;
 22         try{
 23             selector = Selector.open();
 24             socketChannel = SocketChannel.open();
 25             socketChannel.configureBlocking(false);
 26         }catch(IOException e){
 27             e.printStackTrace();
 28             System.exit(1);
 29         }
 30     }
 31     
 32     
 33     public void run() {
 34         //发送请求连接
 35         try{
 36             doConnect();
 37         }catch(IOException e){
 38             e.printStackTrace();
 39             System.exit(1);
 40         }
 41         while(!stop){
 42             try{
 43                 selector.select(1000);
 44                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
 45                 Iterator<SelectionKey> it = selectedKeys.iterator();
 46                 SelectionKey key = null;
 47                 //当有就绪的Channel时,执行handleInput(key)方法
 48                 while(it.hasNext()){
 49                     key = it.next();
 50                     it.remove();
 51                     try{
 52                         handleInput(key);
 53                     }catch(Exception e){
 54                         if(key != null){
 55                         key.cancel();
 56                             if(key.channel() != null){
 57                                 key.channel().close();
 58                             }
 59                         }
 60                     }
 61                 }
 62             }catch(Exception e){
 63                 e.printStackTrace();
 64                 System.exit(1);
 65             }
 66         }
 67         
 68         //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
 69         if(selector != null){
 70             try{
 71                 selector.close();
 72             }catch(IOException e){
 73                 e.printStackTrace();
 74             }
 75         }
 76 
 77     }
 78     
 79     
 80     private void handleInput(SelectionKey key) throws IOException{
 81         if(key.isValid()){
 82             SocketChannel sc = (SocketChannel)key.channel();
 83             //判断是否连接成功
 84             if(key.isConnectable()){
 85                 if(sc.finishConnect()){
 86                     sc.register(selector, SelectionKey.OP_READ);
 87                 }else{
 88                     System.exit(1);
 89                 }
 90             }
 91             
 92             if(key.isReadable()){
 93                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 94                 int readBytes = sc.read(readBuffer);
 95                 if(readBytes > 0){
 96                     readBuffer.flip();
 97                         byte[] bytes = new byte[readBuffer.remaining()];
 98                         readBuffer.get(bytes);
 99                         String body = new String(bytes,"UTF-8");
100                         System.out.println("Now is :" + body);
101                         this.stop = true;
102                 }else if(readBytes < 0){
103                     //对端链路关闭
104                     key.cancel();
105                     sc.close();
106                 }else{
107                     ; //读到0字节,忽略
108                 }
109             }
110         }
111     }
112     
113     private void doConnect() throws IOException{
114         //如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答
115         if(socketChannel.connect(new InetSocketAddress(host,port))){
116             socketChannel.register(selector, SelectionKey.OP_READ);
117             doWrite(socketChannel);
118         }else{
119             //说明服务器没有返回TCP祸首应答消息,但这并不代表连接失败,当服务器返回TCP syn-ack消息后,Selector就能够轮训这个SocketChannel处于连接就绪状态
120             socketChannel.register(selector, SelectionKey.OP_CONNECT);
121         }
122     }
123     
124     private void doWrite(SocketChannel sc) throws IOException{
125         byte[] req = "QUERY TIME ORDER".getBytes();
126         ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
127         writeBuffer.put(req);
128         writeBuffer.flip();
129         sc.write(writeBuffer);
130         if(!writeBuffer.hasRemaining()){
131             System.out.println("Send order 2 server succeed.");
132         }
133     }
134 
135 }

TimeServer:

代码语言:javascript
复制
 1 package nio;
 2 
 3 import java.io.IOException;
 4 
 5 public class TimeServer {
 6     
 7     public static void main(String[] args) throws IOException{
 8         int port = 8080;
 9         if(args != null && args.length >0){
10             try{
11                 port = Integer.valueOf(args[0]);
12             }catch(NumberFormatException e){
13                 //采用默认值
14             }
15         }
16         //多路复用类,是一个独立的线程,负责轮训多路复用器Selctor,处理多个客户端的并发接入。
17         MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
18         new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
19         }
20 }

MultiplexerTimeServer:

代码语言:javascript
复制
  1 package nio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.ServerSocketChannel;
  9 import java.nio.channels.SocketChannel;
 10 import java.util.Iterator;
 11 import java.util.Set;
 12 
 13 public class MultiplexerTimeServer implements Runnable {
 14     
 15     private Selector selector;
 16     
 17     private ServerSocketChannel servChannel;
 18     
 19     private volatile boolean stop;
 20 
 21     public MultiplexerTimeServer(int port){
 22         try{
 23             
 24             selector = Selector.open();
 25             servChannel.configureBlocking(false);
 26             //将ServerSocketChannel 设置为异步非阻塞,backlog设置为1024 
 27             servChannel.socket().bind(new InetSocketAddress(port),1024);
 28             //将ServerSocket Channel注册到Selector,监听SelectionKey.OP_ACCEPT操作位,如果初始化失败,则退出
 29             servChannel.register(selector,SelectionKey.OP_ACCEPT);
 30             System.out.println("The time server is start in port:" + port);
 31         }catch(IOException e){
 32             e.printStackTrace();
 33             System.exit(1);
 34         }
 35     }
 36     
 37     public void stop(){
 38         this.stop = true;
 39     }
 40     
 41     public void run() {
 42         while(!stop){
 43             try{
 44                 //遍历时间设置1秒,每隔一秒唤醒一次,当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合
 45                 selector.select(1000);
 46                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
 47                 Iterator<SelectionKey> it = selectedKeys.iterator();
 48                 SelectionKey key = null;
 49                 //通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作
 50                 while(it.hasNext()){
 51                     key = it.next();
 52                     it.remove();
 53                     try{
 54                         handleInput(key);
 55                     }catch(Exception e){
 56                         if(key != null){
 57                             key.cancel();
 58                             if(key.channel() != null){
 59                                 key.channel().close();
 60                             }
 61                         }
 62                     }
 63                 }
 64             }catch(Throwable t){
 65                 t.printStackTrace();
 66             }
 67         }
 68         
 69         //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
 70         if(selector != null){
 71             try{
 72                 selector.close();
 73             }catch(IOException e){
 74                 e.printStackTrace();
 75             }
 76         }
 77     }
 78     
 79     //处理新接入的请求消息
 80     private void handleInput(SelectionKey key) throws IOException{
 81         if(key.isValid()){
 82             
 83             //根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作相当于
 84             //完成了TCP的三次握手,TCP物理链路正式建立
 85             if(key.isAcceptable()){
 86                 ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
 87                 SocketChannel sc = ssc.accept();
 88                 sc.configureBlocking(false);
 89                 //Add the new connection tothe selector
 90                 sc.register(selector, SelectionKey.OP_READ);
 91             }
 92             
 93             if(key.isReadable()){
 94                 //Read the data
 95                 
 96                 SocketChannel sc = (SocketChannel)key.channel();
 97                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 98                 int readBytes = sc.read(readBuffer);
 99                 if(readBytes > 0){
100                     //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作
101                     readBuffer.flip();
102                     byte[] bytes = new byte[readBuffer.remaining()];
103                     readBuffer.get(bytes);
104                     String body = new String(bytes,"UTF-8");
105                     System.out.println("The time server receive order: + body");
106                     String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
107                     doWrite(sc,currentTime);
108                 }else if(readBytes < 0){
109                     //对端链路关闭
110                     key.cancel();
111                     sc.close();
112                 }else{
113                     ; //读到0字节,忽略
114                 }
115             }
116         }
117     }
118     
119     private void doWrite(SocketChannel channel,String response) throws IOException{
120         if(response != null && response.trim().length() >0){
121             byte[] bytes = response.getBytes();
122             ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
123             writeBuffer.put(bytes);
124             writeBuffer.flip();
125             channel.write(writeBuffer);
126         }
127     }
128 }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-12-05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档