前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >NIO学习四-Selector

NIO学习四-Selector

作者头像
路行的亚洲
发布2020-07-16 15:58:24
3660
发布2020-07-16 15:58:24
举报
文章被收录于专栏:后端技术学习后端技术学习

前面我们已经简单的学习了channel,知道channel作为通道,可以在通道中进行读写操作,同时知道ByteChannel是双向的。对于NIO的优势在于多路复用选择器上,在Nginx、Redis、Netty中都有多路复用的体现。因此学习Selector是有必要的。

1.使用多路复用选择器的方式

/**
 * selector 选择器 多路复用,选择器结合selectable-channel实现非阻塞效果,提高效率
 * 可以将通道注册进选择器中,其主要注意是使用一个线程来对多个通道中的已就绪进行选择,然后就可以对选择
 * 的通道进行数据处理,属于一对多的关系
 */
public class SelectorTest {
    public static void main(String[] args) throws IOException {
        //创建serverSocketChannel对象
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置websocket通道为非阻塞方式
        serverSocketChannel.configureBlocking(false);
        //获取websocket
        ServerSocket serverSocket = serverSocketChannel.socket();
        //进行绑定操作
        serverSocket.bind(new InetSocketAddress("localhost", 8888));

        //核心代码开始
        Selector selector = Selector.open();
        SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //核心代码结束
        System.out.println("selector=" + selector);
        System.out.println("key=" + key);
        serverSocket.close();
        serverSocketChannel.close();
    }


}

通常的步骤是:打开ServerSocket通道,然后将通道配置成非阻塞模式,同时拿到socket进行绑定操作。然后打开选择器,将通道注册到选择器中,进行业务处理操作,然后关闭socket,如果需要长连接,此时就不关闭了。

2.判断当前是否向任何选择器进行了注册

/**
 * 判断注册的状态:判断当前是否向任何选择器进行了注册。可以看到新创建的通道总是未注册的
 */
public class SelectorTest1 {
    public static void main(String[] args) throws IOException {
        //打开serverSocket通道,同时设置为非阻塞,拿到serverSocket,进行ip和端口绑定
        //将选择器打开,将选择器key进行注册,关闭socket和socket通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); //需要部分,通常需要将其设置为非阻塞
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress("localhost", 8888));

        System.out.println("A isRegistered=" + serverSocketChannel.isRegistered());

        Selector selector = Selector.open();
        SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("B isRegistered=" + serverSocketChannel.isRegistered());

        serverSocket.close();
        serverSocketChannel.close();
    }
}

3.获取支持的socketOption列表

/**
 * 获取支持的socketOption列表
 * Set<SocketOption<?> supportedOption()方法:返回通道支持的Socket Option
 */
public class SelectorTest2 {
    public static void main(String[] args) throws IOException {
        Thread t = new Thread() {
            public void run() {
                try {
                    Thread.sleep(2000);
                    Socket socket = new Socket("localhost", 8088);
                    socket.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        t.start();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8088));
        SocketChannel socketChannel = serverSocketChannel.accept();

        Set<SocketOption<?>> set1 = serverSocketChannel.supportedOptions();
        Set<SocketOption<?>> set2 = socketChannel.supportedOptions();

        Iterator iterator1 = set1.iterator();
        Iterator iterator2 = set2.iterator();

        System.out.println("ServerSocketChannel supportedOptions:");
        while (iterator1.hasNext()) {
            SocketOption each = (SocketOption) iterator1.next();
            System.out.println(each.name() + " " + each.getClass().getName());
        }
        System.out.println();
        System.out.println();
        System.out.println("SocketChannel supportedOptions:");
        while (iterator2.hasNext()) {
            SocketOption each1 = (SocketOption) iterator2.next();
            System.out.println(each1.name() + " " + each1.getClass().getName());
        }
        socketChannel.close();
        serverSocketChannel.close();
    }
}

4.进行socket地址获取、设置阻塞模式

/**
 * 进行socket地址获取、设置阻塞模式
 */
public class SocketAddressTest {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        InetSocketAddress address = (InetSocketAddress)serverSocketChannel.getLocalAddress();

        //获取ip和端口
        System.out.println(address.getHostString());
        System.out.println(address.getPort());
        //查看阻塞模式
        System.out.println(serverSocketChannel.isBlocking());
        serverSocketChannel.configureBlocking(false);
        System.out.println(serverSocketChannel.isBlocking());
        //获取选择器
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        System.out.println("A = "+selectionKey+" "+selectionKey.hashCode());
        SelectionKey selectionKey1 = serverSocketChannel.keyFor(selector);
        System.out.println("B = "+selectionKey1.hashCode());
        serverSocketChannel.close();
    }
}

5.SelectionKey不是同一个对象

/**
 * 相同的通道可以注册不同的选择器,返回的SelectionKey不是同一个对象
 */
public class SelectorKeyDemo {
    public static void main(String[] args) throws IOException {
        //相同的通道可以注册不同的选择器,返回的SelectionKey不是同一个对象
        selectionKeyTest1();
        selectionKeyTest2();
    }

    private static void selectionKeyTest1() throws IOException {
        //打开ServerSocketChannel
        ServerSocketChannel serverSocketChannel =ServerSocketChannel.open();
        //进行ip和端口绑定
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));


        //配置非阻塞状态
        serverSocketChannel.configureBlocking(false);

        //打开选择器
        Selector selector1 = Selector.open();
        Selector selector2 = Selector.open();

        //将通道注册到选择器中,返回key
        SelectionKey selectionKey1 = serverSocketChannel.register(selector1,SelectionKey.OP_ACCEPT);
        System.out.println("SelectionKey1="+selectionKey1.hashCode());
        SelectionKey selectionKey2 = serverSocketChannel.register(selector2,SelectionKey.OP_ACCEPT);
        System.out.println("SelectionKey2="+selectionKey2.hashCode());
        serverSocketChannel.close();
    }

    //不同的通道注册到相同的选择器中,返回的SelectionKey不是同一个对象
    private static void selectionKeyTest2() throws IOException {
        ServerSocketChannel serverSocketChannel1 =ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel1.configureBlocking(false);

        ServerSocketChannel serverSocketChannel2 =ServerSocketChannel.open();
        serverSocketChannel2.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel2.configureBlocking(false);

        Selector selector = Selector.open();

        SelectionKey selectionKey1 = serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT);
        System.out.println("SelectionKey1="+selectionKey1.hashCode());
        SelectionKey selectionKey2 = serverSocketChannel2.register(selector,SelectionKey.OP_ACCEPT);
        System.out.println("SelectionKey2="+selectionKey2.hashCode());
        serverSocketChannel1.close();
        serverSocketChannel2.close();
    }


}

6.获取selectorProvider

/**
 * 获取selectorProvider
 */
public class SelectorProviderTest {
    public static void main(String[] args) throws IOException {
        SelectorProvider selectorProvider = SelectorProvider.provider();
        System.out.println(selectorProvider);

        ServerSocketChannel serverSocketChannel = null;
        serverSocketChannel =serverSocketChannel.open();
        SelectorProvider provider = SelectorProvider.provider();
        System.out.println(provider);
        serverSocketChannel.close();
    }
}

学习了Selector,我们来学习应答模式案例

BIO模式下的客户端:

/**
 * BIO服务端
 */
public class BIOServer {
    public static void main(String[] args) throws IOException {
        //创建一个ServerSocket对象,带端口
        ServerSocket serverSocket = new ServerSocket(8888);
        while(true){
            //监听客户端,阻塞
            Socket socket = serverSocket.accept();
            //从serverSocket中拿到输入流,进行消息的接收,阻塞
            InputStream is = socket.getInputStream();
            byte[] b =new byte[20];
            is.read(b);
            String clientIp = socket.getInetAddress().getHostAddress();
            System.out.println(clientIp + "说:" + new String(b).trim());
            //从serverScoket中拿到输出流,进行消息的响应
            OutputStream os = socket.getOutputStream();
            os.write("你好,客户端".getBytes());
            //关闭socket
            socket.close();
        }

    }
}

BIO模式下的客户端

/**
 * BIO客户端
 */
public class BIOClient {
    public static void main(String[] args) throws IOException {
        while (true){
            //创建客户端socket
            Socket socket = new Socket("localhost",8888);
            //从客户端socket中拿到输出流,进行消息发送
            OutputStream os = socket.getOutputStream();
            System.out.println("输入信息:");
            //你好,服务端
            Scanner sc = new Scanner(System.in);
            String msg = sc.nextLine();
            os.write(msg.getBytes());
            //从客户端socket中拿到输入流,进行消息回复
            InputStream is = socket.getInputStream();
            byte[] b= new byte[20];
            is.read(b);
            System.out.println("服务端说:"+new String(b).trim());
        }
    }
}

运行:客户端输入

可以看到服务端

NIO的服务端

/**
 * NIO服务端
 */
public class NIOServer {
    public static void main(String[] args) throws IOException {
        //开启ServerScoketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //开启selector
        Selector selector = Selector.open();
        //绑定端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));
        //设置非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //将serverSocketChannel对象注册给Selector对象
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //进行操作
        while(true){
            //如果在限定时间没有客户端的请求,则进行别的操作
            if(selector.select(2000)==0){
                System.out.println("server:没有客户端信息需要处理,做别的事情");
                continue;
            }
            //拿到所以的selectionkey,进行迭代,获取SelectorKey,判断通道里的时间
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while (keyIterator.hasNext()){
                SelectionKey key = keyIterator.next();
                //可接收
                if(key.isAcceptable()){
                    System.out.println("OP_ACCEPT");
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                //可读
                if (key.isReadable()){
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    channel.read(buffer);
                    System.out.println("客户端发来请求:"+new String(buffer.array()));
                }
                //移除所有的key
                keyIterator.remove();
            }
        }
    }
}

NIO的客户端

/**
 * NIO客户端
 */
public class NIOClient {
    public static void main(String[] args) throws IOException {
       //开启网络通道
        SocketChannel channel = SocketChannel.open();//
        //设置非阻塞
        channel.configureBlocking(false);
        //绑定ip和端口
        InetSocketAddress address = new InetSocketAddress("localhost",8888);
        if(!channel.connect(address)){
            while (!channel.finishConnect()){
                System.out.println("连接服务器socket进行对话,做别的事情");

            }
            //获取缓冲区并存入数据
            String msg = "hello,l'm Client";
            ByteBuffer witerBuffer = ByteBuffer.wrap(msg.getBytes());
         //发送数据信息
            channel.write(witerBuffer);
            System.in.read();
        }

    }
}

基于NIO的聊天:

服务器端

/**
 * 聊天室服务端
 */
public class ChatServer {
    private ServerSocketChannel listenerChannel; //监听通道  老大
    private Selector selector;//选择器对象  间谍
    private static final int PORT = 9999; //服务器端口

    //构造方法
    public ChatServer() {
        try {
            // 1. 得到监听通道
            listenerChannel = ServerSocketChannel.open();
            // 2. 得到选择器
            selector = Selector.open();
            // 3. 绑定端口
            listenerChannel.bind(new InetSocketAddress(PORT));
            // 4. 设置为非阻塞模式
            listenerChannel.configureBlocking(false);
            // 5. 将选择器绑定到监听通道并监听accept事件
            listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
            printInfo("Chat Server is ready.......");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //6.业务处理,首先匹配selectorkey的状态,是连接请求事件还是读取数据事件
    //如果是连接请求事件,则进行key的迭代,进行连接请求操作,否者进行数据的读取
    //读取完成或者请求之后,将selectorkey进行删除,避免重复处理
    public void start() throws  Exception{
        try {
            while (true) { //不停监控
                if (selector.select(2000) == 0) {
                    System.out.println("Server:没有客户端找我, 我就干别的事情");
                    continue;
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isAcceptable()) { //连接请求事件
                        SocketChannel sc=listenerChannel.accept();
                        sc.configureBlocking(false);
                        sc.register(selector,SelectionKey.OP_READ);
                        System.out.println(sc.getRemoteAddress().toString().substring(1)+"上线了...");
                    }
                    if (key.isReadable()) { //读取数据事件
                        readMsg(key);
                    }
                    //一定要把当前key删掉,防止重复处理
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //读取客户端发来的消息并广播出去
    public void readMsg(SelectionKey key) throws Exception{
        SocketChannel channel=(SocketChannel) key.channel();
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        int count=channel.read(buffer);
        if(count>0){
            String msg=new String(buffer.array());
            printInfo(msg);
            //发广播
            broadCast(channel,msg);
        }
    }

    //给所有的客户端发广播
    public void broadCast(SocketChannel except,String msg) throws Exception{
        System.out.println("服务器发送了广播...");
        for(SelectionKey key:selector.keys()){
            Channel targetChannel=key.channel();
            if(targetChannel instanceof SocketChannel && targetChannel!=except){
                SocketChannel destChannel=(SocketChannel)targetChannel;
                ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes());
                destChannel.write(buffer);
            }
        }
    }

    private void printInfo(String str) { //往控制台打印消息
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("[" + sdf.format(new Date()) + "] -> " + str);
    }

    public static void main(String[] args) throws Exception {
        new ChatServer().start();
    }
}

客户端

//聊天程序客户端
public class ChatClient {
    private final String HOST = "127.0.0.1"; //服务器地址
    private int PORT = 9999; //服务器端口
    private SocketChannel socketChannel; //网络通道
    private String userName; //聊天用户名

    //构造方法
    public ChatClient() throws IOException {
        //1. 得到一个网络通道
        socketChannel=SocketChannel.open();
        //2. 设置非阻塞方式
        socketChannel.configureBlocking(false);
        //3. 提供服务器端的IP地址和端口号
        InetSocketAddress address=new InetSocketAddress(HOST,PORT);
        //4. 连接服务器端
        if(!socketChannel.connect(address)){
            while(!socketChannel.finishConnect()){  //nio作为非阻塞式的优势
                System.out.println("Client:连接服务器端的同时,我还可以干别的一些事情");
            }
        }
        //5. 得到客户端IP地址和端口信息,作为聊天用户名使用
        userName = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println("---------------Client(" + userName + ") is ready---------------");
    }

    //向服务器端发送数据
    public void sendMsg(String msg) throws Exception{
        if(msg.equalsIgnoreCase("bye")){
            socketChannel.close();
            return;
        }
        msg = userName + "说:"+ msg;
        ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes());
        socketChannel.write(buffer);
    }

    //从服务器端接收数据
    public void receiveMsg() throws  Exception{
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int size=socketChannel.read(buffer);
        if(size>0){
            String msg=new String(buffer.array());
            System.out.println(msg.trim());
        }
    }

}
//启动聊天程序客户端
public class TestChat {
    public static void main(String[] args) throws Exception {
        ChatClient chatClient=new ChatClient();

        new Thread(){
            public void run(){
                while(true){
                    try {
                        chatClient.receiveMsg();
                        Thread.sleep(2000);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        Scanner scanner=new Scanner(System.in);
        while (scanner.hasNextLine()){
            String msg=scanner.nextLine();
            chatClient.sendMsg(msg);
        }

    }
}

启动运行:

客户端输入信息和服务端看到的信息

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档