前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >「高并发通信框架Netty4 源码解读(八)」NIO应用——聊天案例及Reactor线程模式

「高并发通信框架Netty4 源码解读(八)」NIO应用——聊天案例及Reactor线程模式

作者头像
源码之路
发布2020-09-04 10:22:38
1.8K0
发布2020-09-04 10:22:38
举报
文章被收录于专栏:源码之路源码之路

前面对NIO原理进行了大篇幅的分析,最后我们举几个案例,教大家如何更好的使用NIO。

基于NIO编写的聊天DEMO

服务端

代码语言:javascript
复制
package NIO.luban.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

//聊天室服务端
public class ChatServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private long timeout=2000;

    public ChatServer(){
        try {
            //服务端channel
            serverSocketChannel=ServerSocketChannel.open();
            //选择器对象,底层就是IO多路复用
            selector=Selector.open();
            //绑定端口
            serverSocketChannel.bind(new InetSocketAddress(9090));
            //设置非阻塞式
            serverSocketChannel.configureBlocking(false);
            //注册"监听连接"给Selector
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务端准备就绪");
            start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void start() throws Exception{
        int count=0;
        long start=System.nanoTime();

        while (true){
            //等待感兴趣的事件,没有事件就会阻塞2秒钟,2秒钟没有感兴趣事件发生,程序继续往下执行
            selector.select(timeout);
//            System.out.println("2秒了");
            long end=System.nanoTime();
            if(end-start>= TimeUnit.MILLISECONDS.toNanos(timeout)){
                count=1;
            }else{
                count++;//记录空轮询的次数
            }
            //空轮询次数太多的话,重新建立连接
            if(count>=10){
                System.out.println("有可能发生空轮询"+count+"次");
                rebuildSelector();
                count=0;
                selector.selectNow();
                continue;
            }
            //得到所有就绪的SelectionKey对象
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            //遍历就绪事件,并判断就绪的事件类型
            while (iterator.hasNext()){
                SelectionKey selectionKey=iterator.next();
                //连接事件
                if(selectionKey.isAcceptable()){
                    //获取网络通道,有客户端来链接啦
                    SocketChannel accept = serverSocketChannel.accept();
                    //设置非阻塞式
                    accept.configureBlocking(false);
                    //连接上了  注册读取事件
                    accept.register(selector,SelectionKey.OP_READ);
                    System.out.println(accept.getRemoteAddress().toString()+"上线了");
                }
                //读事件
                if(selectionKey.isReadable()){     //读取客户端数据事件
                    //读取客户端发来的数据
                    readClientData(selectionKey);
                }
                //手动从当前集合将本次运行完的对象删除,事件处理完了就要删除
                iterator.remove();
            }
        }
    }

    //重新建立链接
    private void rebuildSelector() throws IOException {
        Selector newSelector=Selector.open();
        Selector oldSelect=selector;
        for (SelectionKey selectionKey : oldSelect.keys()) {
            //感兴趣事件对应的数值
            int i = selectionKey.interestOps();
            //取消旧的键
            selectionKey.cancel();
            //将channel注册到新的选择器上
            selectionKey.channel().register(newSelector,i);
        }
        selector=newSelector;
        oldSelect.close();//关闭旧的
    }

    //读取客户端发来的数据
    private void readClientData(SelectionKey selectionKey) throws IOException {
        //获取跟客户端连接的通道
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        //生成缓冲区,用于接收客户端传输进来的数据
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //读取数据到缓冲区,返回实际读取到的字节数,没有数据返回-1
        int read = socketChannel.read(byteBuffer);
        //读之前,将缓冲区设置为读状态
        byteBuffer.flip();
        if(read>0){//判断确实读到数据了
            //创建临时发送字节数组
            byte[] bytes=new byte[read];
            //将缓冲区数据写到临时数组
            byteBuffer.get(bytes,0,read);
            //读取了数据  广播
            String s = new String(bytes,"utf-8");
            //将此数据发送到其他客户端
            writeClientData(socketChannel,s);
        }
    }

    //广播  将读取的数据群发
    private void writeClientData(SocketChannel socketChannel,String s) throws IOException {
        //获取到所有的注册事件,不管有没有就绪
        Set<SelectionKey> keys = selector.keys();
        //遍历事件
        for (SelectionKey key : keys) {
            //判断事件是否还有效
            if(key.isValid()){
                //获取事件对应的channel
                SelectableChannel channel = key.channel();
                //注意,我们只需要将信息发送给客户端
                if(channel instanceof  SocketChannel){
                    SocketChannel socketChannel1= (SocketChannel) channel;
                    //不需要发送给自己了
                    if(channel!=socketChannel){
                        ByteBuffer wrap = ByteBuffer.wrap(s.getBytes());
                        socketChannel1.write(wrap);
                    }
                }
            }
        }
    }


    public static void main(String[] args) throws Exception {
        new ChatServer().start();
    }
}

客户端

代码语言:javascript
复制
package NIO.luban.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

public class ChatClient implements  Runnable{

    private SocketChannel socketChannel;

    private Selector selector;

    public ChatClient(){
        try {
            //得到一个网络通道
            socketChannel=SocketChannel.open();
            //打开一个选择器
            selector=Selector.open();
            //设置非阻塞式
            socketChannel.configureBlocking(false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }



    public void doCon(){
        //提供服务器ip与端口
        InetSocketAddress inetSocketAddress=new InetSocketAddress("127.0.0.1",9090);
        //连接服务器端
        try {
            //连接服务器,如果成功了
            if(socketChannel.connect(inetSocketAddress)){
                //注册读事件
                socketChannel.register(selector,SelectionKey.OP_READ);
                //写数据
                writeData(socketChannel);
            }else{
                //注册连接事件
                socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果连接不上
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeData(SocketChannel socketChannel) throws IOException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (true){
                        //等待你的输入
                        Scanner scanner=new Scanner(System.in);
                        String str = scanner.nextLine();
                        if(str.equals("by")){
                            socketChannel.close();
                            return;
                        }
                        //将你的输入包装成缓冲区
                        ByteBuffer byteBuffer=ByteBuffer.wrap((socketChannel.getLocalAddress().toString()+"说:"+str).getBytes());
                        //发送你的数据
                        socketChannel.write(byteBuffer);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    //读数据
    public void readData() throws IOException {
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        int read = socketChannel.read(byteBuffer);
        if(read>0){
            byte[] array = byteBuffer.array();
            System.out.println(new String(array,"utf-8"));
        }
    }


    public static void main(String[] args) throws IOException {
        new Thread(new ChatClient()).start();
    }

    @Override
    public void run() {
        doCon();
        try {
            while (true){
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isValid()){
                        if(selectionKey.isConnectable()){
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            if (channel.finishConnect()){
                                channel.register(selector,SelectionKey.OP_READ);
                                System.out.println("bbbbbbbbbbbbb");
                                //写数据
                                writeData(channel);
                            }else{
                                System.exit(1);
                            }
                        }
                        if(selectionKey.isReadable()){
                            readData();
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

NIO的Reactor单线程模型

上面的聊天案例无论是服务端还是客户端,都是单线程的,所有的链接及读写都是在一个main方法所在的主线程内运行。

拿服务器代码来说,一个main线程,要做以下工作

  1. 接收客户端连接
  2. 读取已连接上的客户端发来的数据
  3. 读到数据后要解码,处理业务逻辑
  4. 编码,响应客户端,向客户端写回数据

一个线程,在同一时刻只能做上面的一件事情,如果线程在读取数据的时候阻塞了,那其他三件事都不能做,新的客户端也无法链接成功。我们可以让服务器端只处理链接,读和写交给另一个线程处理。如下图所示:

服务端,主线程处理链接,读写交给其他线程

代码语言:javascript
复制
    // Reactor線程  
    package NIO.luban.oneReactor;

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.util.Iterator;
    import java.util.Set;

    public class TCPReactor implements Runnable {  
        private final ServerSocketChannel ssc;  
        private final Selector selector;  
        public TCPReactor(int port) throws IOException {
            //打开选择器进行IO多路复用
            selector = Selector.open();
            //打开服务器通道
            ssc = ServerSocketChannel.open();  
            InetSocketAddress addr = new InetSocketAddress(port);
            //绑定端口
            ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
            //設置ServerSocketChannel為非阻塞
            ssc.configureBlocking(false);
            //注册链接事件
            SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT);
            //将时间绑定一个处理器,事件发生后由这个处理器完成后续操作
            sk.attach(new Acceptor(selector, ssc));
        }  
      
        @Override  
        public void run() {  
            while (!Thread.interrupted()) { // 在線程被中斷前持續運行  
                System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");  
                try {
                    // 若沒有事件就緒則不往下執行
                    if (selector.select() == 0)
                        continue;  
                } catch (IOException e) {  
                    e.printStackTrace();
                }
                // 取得所有已就緒事件的key集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                //遍历事件
                Iterator<SelectionKey> it = selectedKeys.iterator();  
                while (it.hasNext()) {
                    //调度事件,在这里我们开启另一个线程进行读写操作
                    dispatch((it.next()));
                    it.remove();  
                }  
            }  
        }  
      
        /* 
         * name: dispatch(SelectionKey key) 
         * description: 調度方法,根據事件綁定的對象開新線程 
         */  
        private void dispatch(SelectionKey key) {  
            Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程  
            if (r != null)  {
                r.run();
            }
        }
      
    }  

链接处理器

代码语言:javascript
复制
    public class Acceptor implements Runnable {  
      
        private final ServerSocketChannel ssc;  
        private final Selector selector;  
          
        public Acceptor(Selector selector, ServerSocketChannel ssc) {  
            this.ssc=ssc;  
            this.selector=selector;  
        }  
          
        @Override  
        public void run() {  
            try {
                // 接受client链接请求
                SocketChannel sc= ssc.accept();
                System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");  
                if(sc!=null) {
                    // 設置為非阻塞  
                    sc.configureBlocking(false);
                    // SocketChannel向selector註冊一個读事件,然後返回該通道的key  
                    SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
                    // 使一個阻塞住的selector操作立即返回
                    selector.wakeup();
                    // 給定key一個附加的TCPHandler對象,用来处理后续读写操作
                    sk.attach(new TCPHandler(sk, sc));
                }
            } catch (IOException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }  
          
    }  

读写处理器

代码语言:javascript
复制
    // Handler線程  
    package NIO.luban.oneReactor;

    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.SocketChannel;

    public class TCPHandler implements Runnable {  
      
        private final SelectionKey sk;  
        private final SocketChannel sc;  
      
        int state;   
      
        public TCPHandler(SelectionKey sk, SocketChannel sc) {  
            this.sk = sk;  
            this.sc = sc;  
            state = 0; // 初始狀態設定為READING ,第一次肯定是先读客户端数据
        }  
      
        @Override  
        public void run() {  
            try {  
                if (state == 0)  
                    read(); // 读取数据
                else  
                    send(); // 发送
      
            } catch (IOException e) {  
                System.out.println("[Warning!] A client has been closed.");  
                closeChannel();  
            }  
        }  
          
   
      
        private synchronized void read() throws IOException {  
            byte[] arr = new byte[1024];  
            ByteBuffer buf = ByteBuffer.wrap(arr);  
              
            int numBytes = sc.read(buf); // 讀取字符串  
            if(numBytes == -1)  
            {  
                System.out.println("[Warning!] A client has been closed.");  
                closeChannel();  
                return;  
            }  
            String str = new String(arr); // 將读取到的byte內容转换字符串  
            if ((str != null) && !str.equals(" ")) { 
                //处理数据
                process(str); // 
                System.out.println(sc.socket().getRemoteSocketAddress().toString()  
                        + " > " + str);  
                //在这个通道读完了后,下一步往这个通道写数据
                //改成写状态
                state = 1; 
                sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件  
                sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  
            }  
        }  
      
        private void send() throws IOException  {  
            // get message from message queue  
              
            String str = "Your message has sent to "  
                    + sc.socket().getLocalSocketAddress().toString() + "\r\n";  
            ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()  
      
            while (buf.hasRemaining()) {  
                sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容  
            }  
              
            state = 0; // 改變狀態  
            sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件  
            sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  
        }  
          
        void process(String str) {  
            // do process(decode, logically process, encode)..
            // ..
            try {
                //等待6秒,模拟数据处理
                Thread.sleep(60000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //关闭通道
        private void closeChannel() {
            try {
                sk.cancel();
                sc.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }  

启动服务器

代码语言:javascript
复制
public class Main {
   public static void main(String[] args) {
       System.out.println(Main.class.getName());
       // TODO Auto-generated method stub
       try {
           TCPReactor reactor = new TCPReactor(1333);
           reactor.run();
       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       }
   }
}

客户端

代码语言:javascript
复制
public class Client {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        String hostname="127.0.0.1";
        int port = 9999;
        //String hostname="127.0.0.1";
        //int port=1333;
        try {
            Socket client = new Socket(hostname, port); // 連接至目的地
            System.out.println("連接至目的地:"+ hostname);
            PrintWriter out = new PrintWriter(client.getOutputStream());
            BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
            String input;

            while((input=stdIn.readLine()) != null) { // 讀取輸入
                out.println(input); // 發送輸入的字符串
                out.flush(); // 強制將緩衝區內的數據輸出
                if(input.equals("exit"))
                {
                    break;
                }
                System.out.println("server: "+in.readLine());
            }
            client.close();
            System.out.println("client stop.");
        } catch (UnknownHostException e) {
            // TODO Auto-generated catch block
            System.err.println("Don't know about host: " + hostname);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            System.err.println("Couldn't get I/O for the socket connection");
        }

    }

}

NIO的Reactor多线程模型

上面的单线程模型有个缺点,就是每一个连接都要开启一个线程,如果有10000个请求,服务器需要开启10000个线程,显然是不合理的,我们可以使用线程池技术来实现多线程模型。 首先,编写服务端ServerSocketChannel对应的Selector

代码语言:javascript
复制
    public TCPReactor(int port) throws IOException {
        //打开一个selector IO多路复用器
        selector = Selector.open();
        //打开服务端通道
        ssc = ServerSocketChannel.open();  
        InetSocketAddress addr = new InetSocketAddress(port);
        //绑定端口
        ssc.socket().bind(addr);
        ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
        //注册连接请求事件
        SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
        //绑定连接处理器,连接进来后用Acceptor做后续处理
        sk.attach(new Acceptor(selector, ssc));
    }  
    
    @Override  
    public void run() {  
        while (!Thread.interrupted()) { // 在線程被中斷前持續運行  
            System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");  
            try {
                //轮询查看是否有事件就绪, 若沒有事件就緒則不往下執行
                if (selector.select() == 0)
                    continue;  
            } catch (IOException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }
            //程序执行到这里说明有连接事件发生了,也就是说有客户端请求连接了
            //获取所有的连接事件,遍历处理
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();  
            while (it.hasNext()) {
                //连接请求转发
                dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
                //删除事件,表示已经处理完了,下次循环不再处理已经处理过的连接
                it.remove();
            }  
        }  
    }  

    //获取事件的处Acceptor理器,开启一个线程进行处理
    private void dispatch(SelectionKey key) {
        Runnable r = (Runnable) (key.attachment());  
        if (r != null)  
            r.run();  
    }}  

下一步,我们看Acceptor处理器是如何处理的:

代码语言:javascript
复制
    public class Acceptor implements Runnable {  
      
        private final ServerSocketChannel ssc;  
        private final Selector selector;  
          
        public Acceptor(Selector selector, ServerSocketChannel ssc) {  
            this.ssc=ssc;  
            this.selector=selector;  
        }  
          
        @Override  
        public void run() {  
            try {
                // 接受client连接请求
                SocketChannel sc= ssc.accept();
                System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");  
                  
                if(sc!=null) {  
                    sc.configureBlocking(false); // 設置為非阻塞
                    //注册读事件
                    SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key  
//                    System.out.println(sk.selector()==selector);
                    selector.wakeup(); // 使一個阻塞住的selector操作立即返回
                    // 将读事件交给TCPHandler进行处理
                    sk.attach(new TCPHandler(sk, sc));
                }  
                  
            } catch (IOException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }        
    } 

Acceptor的处理很简单,就是接收请求,然后注册读事件,同事读事件的后续处理交给处理器TCPHandler处理,我们看一下TCPHandler如何处理的:

代码语言:javascript
复制
    public class TCPHandler implements Runnable {  
      
        private final SelectionKey sk;  
        private final SocketChannel sc;  
        private static final int THREAD_COUNTING = 10;
        //读写事件交给线程池处理
        private static ThreadPoolExecutor pool = new ThreadPoolExecutor(  
                THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,  
                new LinkedBlockingQueue<Runnable>()); // 線程池  

        //读写状态处理器
        HandlerState state;
      
        public TCPHandler(SelectionKey sk, SocketChannel sc) {  
            this.sk = sk;  
            this.sc = sc;
            // 初始状态设置为读状态
            state = new ReadState();
            pool.setMaximumPoolSize(32); // 設置線程池最大線程數  
        }  
      
        @Override  
        public void run() {  
            try {
                //利用线程池处理读写
                state.handle(this, sk, sc, pool);  
            } catch (IOException e) {
                System.out.println("[Warning!] A client has been closed.");  
                closeChannel();  
            }  
        }  
          
        public void closeChannel() {  
            try {  
                sk.cancel();  
                sc.close();  
            } catch (IOException e1) {  
                e1.printStackTrace();  
            }  
        }  

        //读写状态的更改,读事件处理完改为写状态,写状态处理完改为读状态
        public void setState(HandlerState state) {  
            this.state = state;  
        }  
    }  

TCPHandler 处理器维护一个线程池,用于处理真正的读写事件,客户端连接服务器后初始时处理读事件,读事件处理完后处理写事件,写事件处理完后继续处理读事件,来回反复处理。我们看一下读事件是如何处理的

代码语言:javascript
复制
public class ReadState implements HandlerState{

    private SelectionKey sk;

    public ReadState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new WorkState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                       ThreadPoolExecutor pool) throws IOException { // read()
        this.sk = sk;
        // non-blocking下不可用Readers,因為Readers不支援non-blocking
        byte[] arr = new byte[1024];
        ByteBuffer buf = ByteBuffer.wrap(arr);

        int numBytes = sc.read(buf); // 讀取字符串
        if(numBytes == -1)
        {
            System.out.println("[Warning!] A client has been closed.");
            h.closeChannel();
            return;
        }
        String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
        if ((str != null) && !str.equals(" ")) {
            h.setState(new WorkState()); // 改變狀態(READING->WORKING)
            pool.execute(new WorkerThread(h, str)); // do process in worker thread
            System.out.println(sc.socket().getRemoteSocketAddress().toString()
                    + " > " + str);
        }

    }

    /*
     * 執行邏輯處理之函數
     */
    synchronized void process(TCPHandler h, String str) {
//             do process(decode, logically process, encode)..
        // ..
        h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
        this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
        this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
    }

    /*
     * 工作者線程
     */
    class WorkerThread implements Runnable {

        TCPHandler h;
        String str;

        public WorkerThread(TCPHandler h, String str) {
            this.h = h;
            this.str=str;
        }

        @Override
        public void run() {
            process(h, str);
        }

    }
}

读完后,将写事件注册。写一次轮询到读事件后,交由WriteState处理器处理

代码语言:javascript
复制
public class WriteState implements HandlerState{

    public WriteState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new ReadState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                       ThreadPoolExecutor pool) throws IOException { // send()
        // get message from message queue

        String str = "Your message has sent to "
                + sc.socket().getLocalSocketAddress().toString() + "\r\n";
        ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()

        while (buf.hasRemaining()) {
            sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
        }

        h.setState(new ReadState()); // 改變狀態(SENDING->READING)
        sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
        sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
    }
}

WriteState 写处理器和ReadState读处理器都继承了HandlerState接口,

代码语言:javascript
复制
public interface HandlerState {

     void changeState(TCPHandler h);

     void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                 ThreadPoolExecutor pool) throws IOException ;
}

上面的工作状态转换有WorkState完成

代码语言:javascript
复制
public class WorkState implements HandlerState {

    public WorkState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new WriteState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                       ThreadPoolExecutor pool) throws IOException {
        // TODO Auto-generated method stub

    }
}

编写测试类

代码语言:javascript
复制
public class Main {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try {
            TCPReactor reactor = new TCPReactor(1333);
//                new Thread(reactor).start();
            reactor.run();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

客户端用单线程模式的就可以啦。

NIO主从Reactor模型

上面的代码,有没有发现所有的事件都注册在同一个selector上,selector表示好累!讲道理,ServerSocketChannel只是用来处理链接就可以了,它不需要处理读事件和写事件。读事件和写事件完全可以交给另一个选择器。这就是NIO的主从Reactor模型。

主线程只负责接收客户端连接,然后交其他从线程,使当有客户端连接时,可以很快的受到处理。同时,从线程专门负责读取注册到自己selector上面的客户端数据。并发读写能力得到了大大的提高。当然,如果,每一个SocketChannel的读写事件都注册到单独的selector上显然是浪费资源的,我们可以用一个selecort管理N个SocketChannel,也就是说对selector进行了分组。比如,用户管理模块注册一个selector,权限模块注册一个selector,日志模块注册一个selector,这样模块间的读写互不影响。selector数量取决你电脑CPU的核数,一般来说selecor数量为cpu核数2。也就是说,我们的主selector有1个,从selector有cpu2个。

OK!下面我们看这种主从Reactor模式的代码如何编写。 首先编写服务端

代码语言:javascript
复制
    public class TCPReactor implements Runnable {  
      
        private final ServerSocketChannel ssc;  
        private final Selector selector; // mainReactor用的selector  
      
        public TCPReactor(int port) throws IOException {  
            selector = Selector.open();  
            ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞

            Acceptor acceptor = new Acceptor(ssc);

            SelectionKey sk = ssc.register(selector,SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
            sk.attach(acceptor); // 給定key一個附加的Acceptor對象

            InetSocketAddress addr = new InetSocketAddress(port);
            ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
        }  
      
        @Override  
        public void run() {  
            while (!Thread.interrupted()) { // 在線程被中斷前持續運行  
                System.out.println("mainReactor waiting for new event on port: "  
                        + ssc.socket().getLocalPort() + "...");  
                try {  
                    if (selector.select() == 0) // 若沒有事件就緒則不往下執行  
                        continue;  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
                Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合  
                Iterator<SelectionKey> it = selectedKeys.iterator();  
                while (it.hasNext()) {  
                    dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度  
                    it.remove();  
                }  
            }  
        }  
      
        /* 
         * name: dispatch(SelectionKey key) 
         * description: 調度方法,根據事件綁定的對象開新線程 
         */  
        private void dispatch(SelectionKey key) {  
            Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程  
            if (r != null)  
                r.run();  
        }  
      
    }  

代码跟多线层模式基本一样,不解释了。 再来看Acceptor处理器

代码语言:javascript
复制
public class Acceptor implements Runnable {  
      
        private final ServerSocketChannel ssc; // mainReactor監聽的socket通道  
        private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數
        private final Selector[] selectors = new Selector[cores]; // 創建核心數個selector給subReactor用  
        private int selIdx = 0; // 當前可使用的subReactor索引  
        private TCPSubReactor[] r = new TCPSubReactor[cores]; // subReactor線程
        private Thread[] t = new Thread[cores]; // subReactor線程  
      
        public Acceptor(ServerSocketChannel ssc) throws IOException {  
            this.ssc = ssc;  
            // 創建多個selector以及多個subReactor線程  
            for (int i = 0; i < cores; i++) {  
                selectors[i] = Selector.open();  
                r[i] = new TCPSubReactor(selectors[i], ssc, i);
                t[i] = new Thread(r[i]);  
                t[i].start();
            }
        }
      
        @Override  
        public synchronized void run() {  
            try {  
                SocketChannel sc = ssc.accept(); // 接受client連線請求  
                System.out.println(sc.socket().getRemoteSocketAddress().toString()  
                        + " is connected.");  
      
                if (sc != null) {  
                    sc.configureBlocking(false); // 設置為非阻塞  
                    r[selIdx].setRestart(true); // 暫停線程  
                    selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回  
                    SelectionKey sk = sc.register(selectors[selIdx],  
                            SelectionKey.OP_READ); // SocketChannel向selector[selIdx]註冊一個OP_READ事件,然後返回該通道的key  
                    selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回  
                    r[selIdx].setRestart(false); // 重啟線程  
                    sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
                    if (++selIdx == selectors.length)  
                        selIdx = 0;  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
      
    }  

此时,我们将读写事件注册到其他selector中,读写事件轮询注册到不同的子selector上,实现高并发处理。

代码语言:javascript
复制
private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數
private final Selector[] selectors = new Selector[cores]; // 創建核心數個selector給subReactor用  

子selector

代码语言:javascript
复制
    public class TCPReactor implements Runnable {  
      
        private final ServerSocketChannel ssc;  
        private final Selector selector; // mainReactor用的selector  
      
        public TCPReactor(int port) throws IOException {  
            selector = Selector.open();  
            ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞

            Acceptor acceptor = new Acceptor(ssc);

            SelectionKey sk = ssc.register(selector,SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
            sk.attach(acceptor); // 給定key一個附加的Acceptor對象

            InetSocketAddress addr = new InetSocketAddress(port);
            ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
        }  
      
        @Override  
        public void run() {  
            while (!Thread.interrupted()) { // 在線程被中斷前持續運行  
                System.out.println("mainReactor waiting for new event on port: "  
                        + ssc.socket().getLocalPort() + "...");  
                try {  
                    if (selector.select() == 0) // 若沒有事件就緒則不往下執行  
                        continue;  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
                Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合  
                Iterator<SelectionKey> it = selectedKeys.iterator();  
                while (it.hasNext()) {  
                    dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度  
                    it.remove();  
                }  
            }  
        }  
      
        /* 
         * name: dispatch(SelectionKey key) 
         * description: 調度方法,根據事件綁定的對象開新線程 
         */  
        private void dispatch(SelectionKey key) {  
            Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程  
            if (r != null)  
                r.run();  
        }  
      
    }  

读写处理器

代码语言:javascript
复制
 public class TCPHandler implements Runnable {  
      
        private final SelectionKey sk;  
        private final SocketChannel sc;  
        private static final int THREAD_COUNTING = 10;  
        private static ThreadPoolExecutor pool = new ThreadPoolExecutor(  
                THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,  
                new LinkedBlockingQueue<Runnable>()); // 線程池  
      
        HandlerState state; // 以狀態模式實現Handler  
      
        public TCPHandler(SelectionKey sk, SocketChannel sc) {  
            this.sk = sk;  
            this.sc = sc;  
            state = new ReadState(); // 初始狀態設定為READING
            pool.setMaximumPoolSize(32); // 設置線程池最大線程數  
        }  
      
        @Override  
        public void run() {  
            try {
                state.handle(this, sk, sc, pool);
            } catch (IOException e) {
                System.out.println("[Warning!] A client has been closed.");  
                closeChannel();  
            }  
        }  
      
        public void closeChannel() {  
            try {  
                sk.cancel();  
                sc.close();  
            } catch (IOException e1) {  
                e1.printStackTrace();  
            }  
        }  
      
        public void setState(HandlerState state) {  
            this.state = state;  
        }  
    }  

真正的读

代码语言:javascript
复制
public class ReadState implements HandlerState {

    private SelectionKey sk;

    public ReadState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new WorkState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
            ThreadPoolExecutor pool) throws IOException { // read()
        this.sk = sk;
        // non-blocking下不可用Readers,因為Readers不支援non-blocking
        byte[] arr = new byte[1024];
        ByteBuffer buf = ByteBuffer.wrap(arr);

        int numBytes = sc.read(buf); // 讀取字符串
        if(numBytes == -1)
        {
            System.out.println("[Warning!] A client has been closed.");
            h.closeChannel();
            return;
        }
        String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
        if ((str != null) && !str.equals(" ")) {
            h.setState(new WorkState()); // 改變狀態(READING->WORKING)
            pool.execute(new WorkerThread(h, str)); // do process in worker thread
            System.out.println(sc.socket().getRemoteSocketAddress().toString()
                    + " > " + str);
        }

    }

    /*
     * 執行邏輯處理之函數
     */
    synchronized void process(TCPHandler h, String str) {
        // do process(decode, logically process, encode)..
        // ..
        h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
        this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
        this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
    }

    /*
     * 工作者線程
     */
    class WorkerThread implements Runnable {

        TCPHandler h;
        String str;

        public WorkerThread(TCPHandler h, String str) {
            this.h = h;
            this.str=str;
        }

        @Override
        public void run() {
            process(h, str);
        }

    }
}

真正的写

代码语言:javascript
复制
public class WriteState implements HandlerState {

    public WriteState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new ReadState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
            ThreadPoolExecutor pool) throws IOException { // send()
        // get message from message queue

        String str = "Your message has sent to "
                + sc.socket().getLocalSocketAddress().toString() + "\r\n";
        ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()

        while (buf.hasRemaining()) {
            sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
        }

        h.setState(new ReadState()); // 改變狀態(SENDING->READING)
        sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
        sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
    }
}

编写客户端

代码语言:javascript
复制
public class Main {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try {
            TCPReactor reactor = new TCPReactor(1333);
//                reactor.run();
            Thread thread = new Thread(reactor);
            thread.start();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

如果第一次接触NIO,上面的代码读起来比较费劲,NIO编程确实麻烦,而且很容易出错,现实开发中不会用原生NIO库,小编都是用netty这个NIO框架进行编程,简单 高效 稳定,所以看不懂上面的代码没关系,只要理解上面的三幅图即可,这三幅图是netty最最核心的。下篇开始讲netty应用及源码。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基于NIO编写的聊天DEMO
    • 服务端
      • 客户端
      • NIO的Reactor单线程模型
        • 服务端,主线程处理链接,读写交给其他线程
          • 链接处理器
            • 读写处理器
              • 启动服务器
                • 客户端
                • NIO的Reactor多线程模型
                  • NIO主从Reactor模型
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档