我们经常使用的就是BIO,在我们学习编程基础javaSE的时候,大家应该都会学过socket通信,这里面使用的就是同步阻塞。我们先看下BIO的模型:
在这里插入图片描述
在BIO模型中,一个连接会对应一个处理线程,如果服务端使用单线程进行处理,后续连接将会一直阻塞;
// 客户端
package com.example.netty.bio;
import java.io.IOException;
import java.net.Socket;
public class SocketClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1",9000);
socket.getOutputStream().write("我是客户端".getBytes());
socket.getOutputStream().flush();
System.out.println("向服务端发送数据结束");
byte[] bytes = new byte[1024];
try {
int read = socket.getInputStream().read(bytes);
System.out.println("服务端发送过来的数据为:"+new String(bytes,0,read));
} catch (IOException e) {
e.printStackTrace();
}finally {
}
}
}
//服务端
package com.example.netty.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class SocketServer{
public static void main(String[] args) throws IOException {
ServerSocket serverSocket=new ServerSocket(9000);
while (true){
System.out.println("等待连接");
// 阻塞等待
Socket client = serverSocket.accept();
System.out.println("有客户端连接了");
handleRead(client);
}
}
/**
*
* @param client
*/
private static void handleRead(Socket client) {
new Thread(new Runnable() {
@Override
public void run() {
byte[] bytes = new byte[1024];
try {
int read = client.getInputStream().read(bytes);
System.out.println("客户端发送过来的数据为:"+new String(bytes,0,read));
// Thread.sleep(Integer.MAX_VALUE);
client.getOutputStream().write("你好,我收到你的数据了".getBytes());
client.getOutputStream().flush();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}
}).start();
}
}
NIO在BIO的基础上进行了升级,将阻塞换成非阻塞,虽然只是模式变了下,但是代码复杂量却增加了不少。在NIO模型中,服务端可以开启一个线程处理多个连接,它是非阻塞的,客户端发送的数据都会注册到多路复用器selector上面,当selector(selector的select方法是阻塞的)轮询到有读、写或者连接请求时,才会转发到后端程序进行处理,没有数据的时候,业务程序并不需要阻塞等待。
在这里插入图片描述
看模型图大家有可能都知道,客户端所有的连接通道都会注册到selector上面,select会通过轮询去获取这些通道的状态,这些状态有accpet(连接请求)、READ读请求。
如果在轮询过程中发现已经有一个连接请求状态的话,这说明已经有一个客户端想要和服务端进行连接,直接把这个通道传给后端程序去处理连接操作;如果是在BIO模型下的话,会一直阻塞在accept上,直到有连接请求才会释放,继续执行后续的代码。
如果在轮询过程中发现已经有一个读请求状态的话,这说明已经有一个客户端把数据发送给服务端了,服务端可以直接把通道交给后端程序进行读操作的处理;如果是在BIO模型下的话,会一直阻塞的read上,直到有连接请求才会释放,继续执行后续的代码。
以上可以总结为:在NIO模型中,如果服务端执行了read操作的话,就说明已经有可用的数据进行读取了,如果执行了accpet操作的话,就说明此时一定有客户端发起了与服务端的连接,能够保证这种操作的前提是selector轮询到了可读可连接的通道状态。
接下来我们结合代码来整体看下NIO的工作机制
package com.example.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NioServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(9000));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("等待事件发生");
// 这里还是阻塞等待,
int select = selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handleChannel(selectionKey);
}
}
}
private static void handleChannel(SelectionKey selectionKey) {
if (selectionKey.isAcceptable()) {
System.out.println("有客户端发生了连接");
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
try {
SocketChannel client = channel.accept();
client.configureBlocking(false);
// 连接之后立即注册读操作,客户端发送数据过来才能监听到
client.register(selectionKey.selector(), SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
} else if (selectionKey.isReadable()) {
System.out.println("收到客户端发送数据的请求");
SocketChannel channel = (SocketChannel) selectionKey.channel();
// 如果这里你不读取数据,读事件会一直触发,这是有NIO属于水平触发决定的,
ByteBuffer allocate = ByteBuffer.allocate(1024);
try {
int read = channel.read(allocate);
if (read != -1) {
System.out.println("客户端发送的数据:" + new String(allocate.array(), 0, read));
}
channel.write(ByteBuffer.wrap("你好,我是服务端".getBytes()));
// 处理完读操作之后,需要重新注册下读操作,
// 如果下面一行被放开,将会一直会有可写操作触发,因为网络中99.999%的情况下都是可写的,一般不监听
// selectionKey.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
selectionKey.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
} else if (selectionKey.isWritable()) {
System.out.println("触发写事件");
}
}
}
在这里插入图片描述
package com.example.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NioClient {
private Selector selector;
public static void main(String[] args) throws IOException {
NioClient client = new NioClient();
client.initClient("127.0.0.1", 9000);
client.connect();
}
private void connect() throws IOException {
while (true) {
// 阻塞等待
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handler(selectionKey);
}
}
}
private void handler(SelectionKey selectionKey) throws IOException {
// 收到服务端发送的数据
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
if (len != -1) {
System.out.println("客户端收到信息:" + new String(buffer.array(), 0, len));
}
// 连接事件发生
} else if (selectionKey.isConnectable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
// 一般发起了连接后,会立即返回,需要使用isConnectionPending判断是否完成连接,如果正在连接,则调用finishConnect,如果不能连接则会抛出异常
if (channel.isConnectionPending()) {
channel.finishConnect();
}
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.wrap("你好,我是客户端".getBytes());
channel.write(buffer);
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
private void initClient(String s, int i) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.connect(new InetSocketAddress(s, i));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
在这里插入图片描述
poll相比selelct,没有最大连接的限制;epoll相对于前两者,是一个不一样的机制,基于事件通知的方式,通知调用者;
异步非阻塞, 由操作系统完成后回调通知服务端程序启动线程去处理, 一般适用于连接数较多且连接时间较长的应用 应用场景:AIO方式适用于连接数目多且连接比较长(重操作) 的架构,JDK7 开始支持
// 服务端代码
package com.example.netty.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AioServer {
public static void main(String[] args) throws IOException, InterruptedException {
final AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));
serverChannel.accept(null,new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
serverChannel.accept(attachment, this);
try {
System.out.println(socketChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, result));
socketChannel.write(ByteBuffer.wrap("HelloAioClient".getBytes()));
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
// 客户端代码
package com.example.netty.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
public class AioClient {
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000)).get();
socketChannel.write(ByteBuffer.wrap("HelloServer".getBytes()));
ByteBuffer buffer = ByteBuffer.allocate(512);
Integer len = socketChannel.read(buffer).get();
if (len!=-1) {
System.out.println("客户端收到信息:" + new String(buffer.array(), 0, len));
}
}
}
总结:可以看出异步非阻塞模式的代码是非常简单的,所有的操作都是通过回调机制触发的,我们只需要在回调方法中处理我们自己的逻辑就行了,其实AIO是基于NIO进行封装,后面会讲到netty也是基于NIO进行封装的