本节所要了解java nio的reactor模式,参考来源Doug lee java并发的作者。当然作为netty的底层实现,对于nio的reactor模式的实现,对于学习netty也是尤为重要的一步。
图片.png
class Server implements Runnable {
public void run() {
try {
//创建socket server套接字
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
//启动额外线程处理socket客户端连接后的业务处理
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { /* ... */ }
}
//业务处理
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}
主要说下reactor模式:简单来说reactor模式用于同时处理一个或多个传递给服务端的请求的事件的处理模式。 然后,服务端处理程序解析输入别的请求,并将它们同步分派给与之关联的请求异步处理程序。不恰当可类比web页面事件,当点击某个按钮时,浏览器收到这个信号(监听),分派给相关的js处理程序处理(handler)。
图片.png
关于Channels、Buffers、Selectors、SelectionKeys核心前面已介绍,下面实现会用到
基本代码讲解
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
//注册ServerSocketChannel的兴趣事件为连接OP_ACCEPT
SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
//附加Acceptor,稍后调用attachment可以取得该对象
sk.attach(new Acceptor());
}
public void run() { //normally in a new Thread
try {
while (!Thread.interrupted()) {
//阻塞 直到有有一個通道返回
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
//循环检测是否有新事件注册
while (it.hasNext())
//同步分发
dispatch((SelectionKey)(it.next()));
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
//取得attach附加的对象处理
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}
// class Reactor continued
class Acceptor implements Runnable {
// inner
public void run() {
try {
//接受到通道套接字的连接
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) { /* ... */ }
}
}
}
/**
* handler用到状态模式,根据当前读写的状态分别处理
*/
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0 ,SENDING = 1;
int state = READING;
Handler(Selector sel , SocketChannel c) throws IOException {
socket = c;
//设置通道为非阻塞
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register( sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
//select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select 那么执行完wakeup后下一个执行select就会立即返回。
sel.wakeup();
}
boolean inputIsComplete() {/* 相关处理略... */ return true; }
boolean outputIsComplete() { /*相关处理略 ... */return true; }
void process(){}
public void run() {
try {
if (state == READING)
read();
else if (state ==SENDING)
send();
} catch (IOException ex) { /* ... */ }
}
void read()throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
// 处理完成后设置发送状态
state =SENDING;
//注册写事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send()throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}