前面我们已经简单的学习了channel,知道channel作为通道,可以在通道中进行读写操作,同时知道ByteChannel是双向的。对于NIO的优势在于多路复用选择器上,在Nginx、Redis、Netty中都有多路复用的体现。因此学习Selector是有必要的。
1.使用多路复用选择器的方式
/**
* selector 选择器 多路复用,选择器结合selectable-channel实现非阻塞效果,提高效率
* 可以将通道注册进选择器中,其主要注意是使用一个线程来对多个通道中的已就绪进行选择,然后就可以对选择
* 的通道进行数据处理,属于一对多的关系
*/
public class SelectorTest {
public static void main(String[] args) throws IOException {
//创建serverSocketChannel对象
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置websocket通道为非阻塞方式
serverSocketChannel.configureBlocking(false);
//获取websocket
ServerSocket serverSocket = serverSocketChannel.socket();
//进行绑定操作
serverSocket.bind(new InetSocketAddress("localhost", 8888));
//核心代码开始
Selector selector = Selector.open();
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//核心代码结束
System.out.println("selector=" + selector);
System.out.println("key=" + key);
serverSocket.close();
serverSocketChannel.close();
}
}
通常的步骤是:打开ServerSocket通道,然后将通道配置成非阻塞模式,同时拿到socket进行绑定操作。然后打开选择器,将通道注册到选择器中,进行业务处理操作,然后关闭socket,如果需要长连接,此时就不关闭了。
2.判断当前是否向任何选择器进行了注册
/**
* 判断注册的状态:判断当前是否向任何选择器进行了注册。可以看到新创建的通道总是未注册的
*/
public class SelectorTest1 {
public static void main(String[] args) throws IOException {
//打开serverSocket通道,同时设置为非阻塞,拿到serverSocket,进行ip和端口绑定
//将选择器打开,将选择器key进行注册,关闭socket和socket通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); //需要部分,通常需要将其设置为非阻塞
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress("localhost", 8888));
System.out.println("A isRegistered=" + serverSocketChannel.isRegistered());
Selector selector = Selector.open();
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("B isRegistered=" + serverSocketChannel.isRegistered());
serverSocket.close();
serverSocketChannel.close();
}
}
3.获取支持的socketOption列表
/**
* 获取支持的socketOption列表
* Set<SocketOption<?> supportedOption()方法:返回通道支持的Socket Option
*/
public class SelectorTest2 {
public static void main(String[] args) throws IOException {
Thread t = new Thread() {
public void run() {
try {
Thread.sleep(2000);
Socket socket = new Socket("localhost", 8088);
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost", 8088));
SocketChannel socketChannel = serverSocketChannel.accept();
Set<SocketOption<?>> set1 = serverSocketChannel.supportedOptions();
Set<SocketOption<?>> set2 = socketChannel.supportedOptions();
Iterator iterator1 = set1.iterator();
Iterator iterator2 = set2.iterator();
System.out.println("ServerSocketChannel supportedOptions:");
while (iterator1.hasNext()) {
SocketOption each = (SocketOption) iterator1.next();
System.out.println(each.name() + " " + each.getClass().getName());
}
System.out.println();
System.out.println();
System.out.println("SocketChannel supportedOptions:");
while (iterator2.hasNext()) {
SocketOption each1 = (SocketOption) iterator2.next();
System.out.println(each1.name() + " " + each1.getClass().getName());
}
socketChannel.close();
serverSocketChannel.close();
}
}
4.进行socket地址获取、设置阻塞模式
/**
* 进行socket地址获取、设置阻塞模式
*/
public class SocketAddressTest {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
InetSocketAddress address = (InetSocketAddress)serverSocketChannel.getLocalAddress();
//获取ip和端口
System.out.println(address.getHostString());
System.out.println(address.getPort());
//查看阻塞模式
System.out.println(serverSocketChannel.isBlocking());
serverSocketChannel.configureBlocking(false);
System.out.println(serverSocketChannel.isBlocking());
//获取选择器
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("A = "+selectionKey+" "+selectionKey.hashCode());
SelectionKey selectionKey1 = serverSocketChannel.keyFor(selector);
System.out.println("B = "+selectionKey1.hashCode());
serverSocketChannel.close();
}
}
5.SelectionKey不是同一个对象
/**
* 相同的通道可以注册不同的选择器,返回的SelectionKey不是同一个对象
*/
public class SelectorKeyDemo {
public static void main(String[] args) throws IOException {
//相同的通道可以注册不同的选择器,返回的SelectionKey不是同一个对象
selectionKeyTest1();
selectionKeyTest2();
}
private static void selectionKeyTest1() throws IOException {
//打开ServerSocketChannel
ServerSocketChannel serverSocketChannel =ServerSocketChannel.open();
//进行ip和端口绑定
serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
//配置非阻塞状态
serverSocketChannel.configureBlocking(false);
//打开选择器
Selector selector1 = Selector.open();
Selector selector2 = Selector.open();
//将通道注册到选择器中,返回key
SelectionKey selectionKey1 = serverSocketChannel.register(selector1,SelectionKey.OP_ACCEPT);
System.out.println("SelectionKey1="+selectionKey1.hashCode());
SelectionKey selectionKey2 = serverSocketChannel.register(selector2,SelectionKey.OP_ACCEPT);
System.out.println("SelectionKey2="+selectionKey2.hashCode());
serverSocketChannel.close();
}
//不同的通道注册到相同的选择器中,返回的SelectionKey不是同一个对象
private static void selectionKeyTest2() throws IOException {
ServerSocketChannel serverSocketChannel1 =ServerSocketChannel.open();
serverSocketChannel1.bind(new InetSocketAddress("localhost",8888));
serverSocketChannel1.configureBlocking(false);
ServerSocketChannel serverSocketChannel2 =ServerSocketChannel.open();
serverSocketChannel2.bind(new InetSocketAddress("localhost",8888));
serverSocketChannel2.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey selectionKey1 = serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("SelectionKey1="+selectionKey1.hashCode());
SelectionKey selectionKey2 = serverSocketChannel2.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("SelectionKey2="+selectionKey2.hashCode());
serverSocketChannel1.close();
serverSocketChannel2.close();
}
}
6.获取selectorProvider
/**
* 获取selectorProvider
*/
public class SelectorProviderTest {
public static void main(String[] args) throws IOException {
SelectorProvider selectorProvider = SelectorProvider.provider();
System.out.println(selectorProvider);
ServerSocketChannel serverSocketChannel = null;
serverSocketChannel =serverSocketChannel.open();
SelectorProvider provider = SelectorProvider.provider();
System.out.println(provider);
serverSocketChannel.close();
}
}
学习了Selector,我们来学习应答模式案例
BIO模式下的客户端:
/**
* BIO服务端
*/
public class BIOServer {
public static void main(String[] args) throws IOException {
//创建一个ServerSocket对象,带端口
ServerSocket serverSocket = new ServerSocket(8888);
while(true){
//监听客户端,阻塞
Socket socket = serverSocket.accept();
//从serverSocket中拿到输入流,进行消息的接收,阻塞
InputStream is = socket.getInputStream();
byte[] b =new byte[20];
is.read(b);
String clientIp = socket.getInetAddress().getHostAddress();
System.out.println(clientIp + "说:" + new String(b).trim());
//从serverScoket中拿到输出流,进行消息的响应
OutputStream os = socket.getOutputStream();
os.write("你好,客户端".getBytes());
//关闭socket
socket.close();
}
}
}
BIO模式下的客户端
/**
* BIO客户端
*/
public class BIOClient {
public static void main(String[] args) throws IOException {
while (true){
//创建客户端socket
Socket socket = new Socket("localhost",8888);
//从客户端socket中拿到输出流,进行消息发送
OutputStream os = socket.getOutputStream();
System.out.println("输入信息:");
//你好,服务端
Scanner sc = new Scanner(System.in);
String msg = sc.nextLine();
os.write(msg.getBytes());
//从客户端socket中拿到输入流,进行消息回复
InputStream is = socket.getInputStream();
byte[] b= new byte[20];
is.read(b);
System.out.println("服务端说:"+new String(b).trim());
}
}
}
运行:客户端输入
可以看到服务端
NIO的服务端
/**
* NIO服务端
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
//开启ServerScoketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//开启selector
Selector selector = Selector.open();
//绑定端口号
serverSocketChannel.bind(new InetSocketAddress(8888));
//设置非阻塞模式
serverSocketChannel.configureBlocking(false);
//将serverSocketChannel对象注册给Selector对象
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//进行操作
while(true){
//如果在限定时间没有客户端的请求,则进行别的操作
if(selector.select(2000)==0){
System.out.println("server:没有客户端信息需要处理,做别的事情");
continue;
}
//拿到所以的selectionkey,进行迭代,获取SelectorKey,判断通道里的时间
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
//可接收
if(key.isAcceptable()){
System.out.println("OP_ACCEPT");
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
//可读
if (key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
channel.read(buffer);
System.out.println("客户端发来请求:"+new String(buffer.array()));
}
//移除所有的key
keyIterator.remove();
}
}
}
}
NIO的客户端
/**
* NIO客户端
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
//开启网络通道
SocketChannel channel = SocketChannel.open();//
//设置非阻塞
channel.configureBlocking(false);
//绑定ip和端口
InetSocketAddress address = new InetSocketAddress("localhost",8888);
if(!channel.connect(address)){
while (!channel.finishConnect()){
System.out.println("连接服务器socket进行对话,做别的事情");
}
//获取缓冲区并存入数据
String msg = "hello,l'm Client";
ByteBuffer witerBuffer = ByteBuffer.wrap(msg.getBytes());
//发送数据信息
channel.write(witerBuffer);
System.in.read();
}
}
}
基于NIO的聊天:
服务器端
/**
* 聊天室服务端
*/
public class ChatServer {
private ServerSocketChannel listenerChannel; //监听通道 老大
private Selector selector;//选择器对象 间谍
private static final int PORT = 9999; //服务器端口
//构造方法
public ChatServer() {
try {
// 1. 得到监听通道
listenerChannel = ServerSocketChannel.open();
// 2. 得到选择器
selector = Selector.open();
// 3. 绑定端口
listenerChannel.bind(new InetSocketAddress(PORT));
// 4. 设置为非阻塞模式
listenerChannel.configureBlocking(false);
// 5. 将选择器绑定到监听通道并监听accept事件
listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
printInfo("Chat Server is ready.......");
} catch (IOException e) {
e.printStackTrace();
}
}
//6.业务处理,首先匹配selectorkey的状态,是连接请求事件还是读取数据事件
//如果是连接请求事件,则进行key的迭代,进行连接请求操作,否者进行数据的读取
//读取完成或者请求之后,将selectorkey进行删除,避免重复处理
public void start() throws Exception{
try {
while (true) { //不停监控
if (selector.select(2000) == 0) {
System.out.println("Server:没有客户端找我, 我就干别的事情");
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) { //连接请求事件
SocketChannel sc=listenerChannel.accept();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress().toString().substring(1)+"上线了...");
}
if (key.isReadable()) { //读取数据事件
readMsg(key);
}
//一定要把当前key删掉,防止重复处理
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//读取客户端发来的消息并广播出去
public void readMsg(SelectionKey key) throws Exception{
SocketChannel channel=(SocketChannel) key.channel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
int count=channel.read(buffer);
if(count>0){
String msg=new String(buffer.array());
printInfo(msg);
//发广播
broadCast(channel,msg);
}
}
//给所有的客户端发广播
public void broadCast(SocketChannel except,String msg) throws Exception{
System.out.println("服务器发送了广播...");
for(SelectionKey key:selector.keys()){
Channel targetChannel=key.channel();
if(targetChannel instanceof SocketChannel && targetChannel!=except){
SocketChannel destChannel=(SocketChannel)targetChannel;
ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes());
destChannel.write(buffer);
}
}
}
private void printInfo(String str) { //往控制台打印消息
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("[" + sdf.format(new Date()) + "] -> " + str);
}
public static void main(String[] args) throws Exception {
new ChatServer().start();
}
}
客户端
//聊天程序客户端
public class ChatClient {
private final String HOST = "127.0.0.1"; //服务器地址
private int PORT = 9999; //服务器端口
private SocketChannel socketChannel; //网络通道
private String userName; //聊天用户名
//构造方法
public ChatClient() throws IOException {
//1. 得到一个网络通道
socketChannel=SocketChannel.open();
//2. 设置非阻塞方式
socketChannel.configureBlocking(false);
//3. 提供服务器端的IP地址和端口号
InetSocketAddress address=new InetSocketAddress(HOST,PORT);
//4. 连接服务器端
if(!socketChannel.connect(address)){
while(!socketChannel.finishConnect()){ //nio作为非阻塞式的优势
System.out.println("Client:连接服务器端的同时,我还可以干别的一些事情");
}
}
//5. 得到客户端IP地址和端口信息,作为聊天用户名使用
userName = socketChannel.getLocalAddress().toString().substring(1);
System.out.println("---------------Client(" + userName + ") is ready---------------");
}
//向服务器端发送数据
public void sendMsg(String msg) throws Exception{
if(msg.equalsIgnoreCase("bye")){
socketChannel.close();
return;
}
msg = userName + "说:"+ msg;
ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer);
}
//从服务器端接收数据
public void receiveMsg() throws Exception{
ByteBuffer buffer = ByteBuffer.allocate(1024);
int size=socketChannel.read(buffer);
if(size>0){
String msg=new String(buffer.array());
System.out.println(msg.trim());
}
}
}
//启动聊天程序客户端
public class TestChat {
public static void main(String[] args) throws Exception {
ChatClient chatClient=new ChatClient();
new Thread(){
public void run(){
while(true){
try {
chatClient.receiveMsg();
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
}
}
}.start();
Scanner scanner=new Scanner(System.in);
while (scanner.hasNextLine()){
String msg=scanner.nextLine();
chatClient.sendMsg(msg);
}
}
}
启动运行:
客户端输入信息和服务端看到的信息