前面对NIO原理进行了大篇幅的分析,最后我们举几个案例,教大家如何更好的使用NIO。
package NIO.luban.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
//聊天室服务端
public class ChatServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private long timeout=2000;
public ChatServer(){
try {
//服务端channel
serverSocketChannel=ServerSocketChannel.open();
//选择器对象,底层就是IO多路复用
selector=Selector.open();
//绑定端口
serverSocketChannel.bind(new InetSocketAddress(9090));
//设置非阻塞式
serverSocketChannel.configureBlocking(false);
//注册"监听连接"给Selector
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端准备就绪");
start();
}catch (Exception e){
e.printStackTrace();
}
}
public void start() throws Exception{
int count=0;
long start=System.nanoTime();
while (true){
//等待感兴趣的事件,没有事件就会阻塞2秒钟,2秒钟没有感兴趣事件发生,程序继续往下执行
selector.select(timeout);
// System.out.println("2秒了");
long end=System.nanoTime();
if(end-start>= TimeUnit.MILLISECONDS.toNanos(timeout)){
count=1;
}else{
count++;//记录空轮询的次数
}
//空轮询次数太多的话,重新建立连接
if(count>=10){
System.out.println("有可能发生空轮询"+count+"次");
rebuildSelector();
count=0;
selector.selectNow();
continue;
}
//得到所有就绪的SelectionKey对象
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//遍历就绪事件,并判断就绪的事件类型
while (iterator.hasNext()){
SelectionKey selectionKey=iterator.next();
//连接事件
if(selectionKey.isAcceptable()){
//获取网络通道,有客户端来链接啦
SocketChannel accept = serverSocketChannel.accept();
//设置非阻塞式
accept.configureBlocking(false);
//连接上了 注册读取事件
accept.register(selector,SelectionKey.OP_READ);
System.out.println(accept.getRemoteAddress().toString()+"上线了");
}
//读事件
if(selectionKey.isReadable()){ //读取客户端数据事件
//读取客户端发来的数据
readClientData(selectionKey);
}
//手动从当前集合将本次运行完的对象删除,事件处理完了就要删除
iterator.remove();
}
}
}
//重新建立链接
private void rebuildSelector() throws IOException {
Selector newSelector=Selector.open();
Selector oldSelect=selector;
for (SelectionKey selectionKey : oldSelect.keys()) {
//感兴趣事件对应的数值
int i = selectionKey.interestOps();
//取消旧的键
selectionKey.cancel();
//将channel注册到新的选择器上
selectionKey.channel().register(newSelector,i);
}
selector=newSelector;
oldSelect.close();//关闭旧的
}
//读取客户端发来的数据
private void readClientData(SelectionKey selectionKey) throws IOException {
//获取跟客户端连接的通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//生成缓冲区,用于接收客户端传输进来的数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取数据到缓冲区,返回实际读取到的字节数,没有数据返回-1
int read = socketChannel.read(byteBuffer);
//读之前,将缓冲区设置为读状态
byteBuffer.flip();
if(read>0){//判断确实读到数据了
//创建临时发送字节数组
byte[] bytes=new byte[read];
//将缓冲区数据写到临时数组
byteBuffer.get(bytes,0,read);
//读取了数据 广播
String s = new String(bytes,"utf-8");
//将此数据发送到其他客户端
writeClientData(socketChannel,s);
}
}
//广播 将读取的数据群发
private void writeClientData(SocketChannel socketChannel,String s) throws IOException {
//获取到所有的注册事件,不管有没有就绪
Set<SelectionKey> keys = selector.keys();
//遍历事件
for (SelectionKey key : keys) {
//判断事件是否还有效
if(key.isValid()){
//获取事件对应的channel
SelectableChannel channel = key.channel();
//注意,我们只需要将信息发送给客户端
if(channel instanceof SocketChannel){
SocketChannel socketChannel1= (SocketChannel) channel;
//不需要发送给自己了
if(channel!=socketChannel){
ByteBuffer wrap = ByteBuffer.wrap(s.getBytes());
socketChannel1.write(wrap);
}
}
}
}
}
public static void main(String[] args) throws Exception {
new ChatServer().start();
}
}
package NIO.luban.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class ChatClient implements Runnable{
private SocketChannel socketChannel;
private Selector selector;
public ChatClient(){
try {
//得到一个网络通道
socketChannel=SocketChannel.open();
//打开一个选择器
selector=Selector.open();
//设置非阻塞式
socketChannel.configureBlocking(false);
}catch (Exception e){
e.printStackTrace();
}
}
public void doCon(){
//提供服务器ip与端口
InetSocketAddress inetSocketAddress=new InetSocketAddress("127.0.0.1",9090);
//连接服务器端
try {
//连接服务器,如果成功了
if(socketChannel.connect(inetSocketAddress)){
//注册读事件
socketChannel.register(selector,SelectionKey.OP_READ);
//写数据
writeData(socketChannel);
}else{
//注册连接事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果连接不上
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void writeData(SocketChannel socketChannel) throws IOException {
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
//等待你的输入
Scanner scanner=new Scanner(System.in);
String str = scanner.nextLine();
if(str.equals("by")){
socketChannel.close();
return;
}
//将你的输入包装成缓冲区
ByteBuffer byteBuffer=ByteBuffer.wrap((socketChannel.getLocalAddress().toString()+"说:"+str).getBytes());
//发送你的数据
socketChannel.write(byteBuffer);
}
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
//读数据
public void readData() throws IOException {
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if(read>0){
byte[] array = byteBuffer.array();
System.out.println(new String(array,"utf-8"));
}
}
public static void main(String[] args) throws IOException {
new Thread(new ChatClient()).start();
}
@Override
public void run() {
doCon();
try {
while (true){
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if(selectionKey.isValid()){
if(selectionKey.isConnectable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
if (channel.finishConnect()){
channel.register(selector,SelectionKey.OP_READ);
System.out.println("bbbbbbbbbbbbb");
//写数据
writeData(channel);
}else{
System.exit(1);
}
}
if(selectionKey.isReadable()){
readData();
}
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
上面的聊天案例无论是服务端还是客户端,都是单线程的,所有的链接及读写都是在一个main方法所在的主线程内运行。
拿服务器代码来说,一个main线程,要做以下工作
一个线程,在同一时刻只能做上面的一件事情,如果线程在读取数据的时候阻塞了,那其他三件事都不能做,新的客户端也无法链接成功。我们可以让服务器端只处理链接,读和写交给另一个线程处理。如下图所示:
// Reactor線程
package NIO.luban.oneReactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TCPReactor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector;
public TCPReactor(int port) throws IOException {
//打开选择器进行IO多路复用
selector = Selector.open();
//打开服务器通道
ssc = ServerSocketChannel.open();
InetSocketAddress addr = new InetSocketAddress(port);
//绑定端口
ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
//設置ServerSocketChannel為非阻塞
ssc.configureBlocking(false);
//注册链接事件
SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT);
//将时间绑定一个处理器,事件发生后由这个处理器完成后续操作
sk.attach(new Acceptor(selector, ssc));
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續運行
System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");
try {
// 若沒有事件就緒則不往下執行
if (selector.select() == 0)
continue;
} catch (IOException e) {
e.printStackTrace();
}
// 取得所有已就緒事件的key集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();
//遍历事件
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//调度事件,在这里我们开启另一个线程进行读写操作
dispatch((it.next()));
it.remove();
}
}
}
/*
* name: dispatch(SelectionKey key)
* description: 調度方法,根據事件綁定的對象開新線程
*/
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
if (r != null) {
r.run();
}
}
}
public class Acceptor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector;
public Acceptor(Selector selector, ServerSocketChannel ssc) {
this.ssc=ssc;
this.selector=selector;
}
@Override
public void run() {
try {
// 接受client链接请求
SocketChannel sc= ssc.accept();
System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");
if(sc!=null) {
// 設置為非阻塞
sc.configureBlocking(false);
// SocketChannel向selector註冊一個读事件,然後返回該通道的key
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
// 使一個阻塞住的selector操作立即返回
selector.wakeup();
// 給定key一個附加的TCPHandler對象,用来处理后续读写操作
sk.attach(new TCPHandler(sk, sc));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
// Handler線程
package NIO.luban.oneReactor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class TCPHandler implements Runnable {
private final SelectionKey sk;
private final SocketChannel sc;
int state;
public TCPHandler(SelectionKey sk, SocketChannel sc) {
this.sk = sk;
this.sc = sc;
state = 0; // 初始狀態設定為READING ,第一次肯定是先读客户端数据
}
@Override
public void run() {
try {
if (state == 0)
read(); // 读取数据
else
send(); // 发送
} catch (IOException e) {
System.out.println("[Warning!] A client has been closed.");
closeChannel();
}
}
private synchronized void read() throws IOException {
byte[] arr = new byte[1024];
ByteBuffer buf = ByteBuffer.wrap(arr);
int numBytes = sc.read(buf); // 讀取字符串
if(numBytes == -1)
{
System.out.println("[Warning!] A client has been closed.");
closeChannel();
return;
}
String str = new String(arr); // 將读取到的byte內容转换字符串
if ((str != null) && !str.equals(" ")) {
//处理数据
process(str); //
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " > " + str);
//在这个通道读完了后,下一步往这个通道写数据
//改成写状态
state = 1;
sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
}
}
private void send() throws IOException {
// get message from message queue
String str = "Your message has sent to "
+ sc.socket().getLocalSocketAddress().toString() + "\r\n";
ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
while (buf.hasRemaining()) {
sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
}
state = 0; // 改變狀態
sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
}
void process(String str) {
// do process(decode, logically process, encode)..
// ..
try {
//等待6秒,模拟数据处理
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//关闭通道
private void closeChannel() {
try {
sk.cancel();
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
System.out.println(Main.class.getName());
// TODO Auto-generated method stub
try {
TCPReactor reactor = new TCPReactor(1333);
reactor.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class Client {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
String hostname="127.0.0.1";
int port = 9999;
//String hostname="127.0.0.1";
//int port=1333;
try {
Socket client = new Socket(hostname, port); // 連接至目的地
System.out.println("連接至目的地:"+ hostname);
PrintWriter out = new PrintWriter(client.getOutputStream());
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
String input;
while((input=stdIn.readLine()) != null) { // 讀取輸入
out.println(input); // 發送輸入的字符串
out.flush(); // 強制將緩衝區內的數據輸出
if(input.equals("exit"))
{
break;
}
System.out.println("server: "+in.readLine());
}
client.close();
System.out.println("client stop.");
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
System.err.println("Don't know about host: " + hostname);
} catch (IOException e) {
// TODO Auto-generated catch block
System.err.println("Couldn't get I/O for the socket connection");
}
}
}
上面的单线程模型有个缺点,就是每一个连接都要开启一个线程,如果有10000个请求,服务器需要开启10000个线程,显然是不合理的,我们可以使用线程池技术来实现多线程模型。 首先,编写服务端ServerSocketChannel对应的Selector
public TCPReactor(int port) throws IOException {
//打开一个selector IO多路复用器
selector = Selector.open();
//打开服务端通道
ssc = ServerSocketChannel.open();
InetSocketAddress addr = new InetSocketAddress(port);
//绑定端口
ssc.socket().bind(addr);
ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
//注册连接请求事件
SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
//绑定连接处理器,连接进来后用Acceptor做后续处理
sk.attach(new Acceptor(selector, ssc));
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續運行
System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");
try {
//轮询查看是否有事件就绪, 若沒有事件就緒則不往下執行
if (selector.select() == 0)
continue;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//程序执行到这里说明有连接事件发生了,也就是说有客户端请求连接了
//获取所有的连接事件,遍历处理
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//连接请求转发
dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
//删除事件,表示已经处理完了,下次循环不再处理已经处理过的连接
it.remove();
}
}
}
//获取事件的处Acceptor理器,开启一个线程进行处理
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment());
if (r != null)
r.run();
}}
下一步,我们看Acceptor处理器是如何处理的:
public class Acceptor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector;
public Acceptor(Selector selector, ServerSocketChannel ssc) {
this.ssc=ssc;
this.selector=selector;
}
@Override
public void run() {
try {
// 接受client连接请求
SocketChannel sc= ssc.accept();
System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");
if(sc!=null) {
sc.configureBlocking(false); // 設置為非阻塞
//注册读事件
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key
// System.out.println(sk.selector()==selector);
selector.wakeup(); // 使一個阻塞住的selector操作立即返回
// 将读事件交给TCPHandler进行处理
sk.attach(new TCPHandler(sk, sc));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Acceptor的处理很简单,就是接收请求,然后注册读事件,同事读事件的后续处理交给处理器TCPHandler处理,我们看一下TCPHandler如何处理的:
public class TCPHandler implements Runnable {
private final SelectionKey sk;
private final SocketChannel sc;
private static final int THREAD_COUNTING = 10;
//读写事件交给线程池处理
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); // 線程池
//读写状态处理器
HandlerState state;
public TCPHandler(SelectionKey sk, SocketChannel sc) {
this.sk = sk;
this.sc = sc;
// 初始状态设置为读状态
state = new ReadState();
pool.setMaximumPoolSize(32); // 設置線程池最大線程數
}
@Override
public void run() {
try {
//利用线程池处理读写
state.handle(this, sk, sc, pool);
} catch (IOException e) {
System.out.println("[Warning!] A client has been closed.");
closeChannel();
}
}
public void closeChannel() {
try {
sk.cancel();
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
//读写状态的更改,读事件处理完改为写状态,写状态处理完改为读状态
public void setState(HandlerState state) {
this.state = state;
}
}
TCPHandler 处理器维护一个线程池,用于处理真正的读写事件,客户端连接服务器后初始时处理读事件,读事件处理完后处理写事件,写事件处理完后继续处理读事件,来回反复处理。我们看一下读事件是如何处理的
public class ReadState implements HandlerState{
private SelectionKey sk;
public ReadState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new WorkState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // read()
this.sk = sk;
// non-blocking下不可用Readers,因為Readers不支援non-blocking
byte[] arr = new byte[1024];
ByteBuffer buf = ByteBuffer.wrap(arr);
int numBytes = sc.read(buf); // 讀取字符串
if(numBytes == -1)
{
System.out.println("[Warning!] A client has been closed.");
h.closeChannel();
return;
}
String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
if ((str != null) && !str.equals(" ")) {
h.setState(new WorkState()); // 改變狀態(READING->WORKING)
pool.execute(new WorkerThread(h, str)); // do process in worker thread
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " > " + str);
}
}
/*
* 執行邏輯處理之函數
*/
synchronized void process(TCPHandler h, String str) {
// do process(decode, logically process, encode)..
// ..
h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
}
/*
* 工作者線程
*/
class WorkerThread implements Runnable {
TCPHandler h;
String str;
public WorkerThread(TCPHandler h, String str) {
this.h = h;
this.str=str;
}
@Override
public void run() {
process(h, str);
}
}
}
读完后,将写事件注册。写一次轮询到读事件后,交由WriteState处理器处理
public class WriteState implements HandlerState{
public WriteState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new ReadState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // send()
// get message from message queue
String str = "Your message has sent to "
+ sc.socket().getLocalSocketAddress().toString() + "\r\n";
ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
while (buf.hasRemaining()) {
sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
}
h.setState(new ReadState()); // 改變狀態(SENDING->READING)
sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
}
}
WriteState 写处理器和ReadState读处理器都继承了HandlerState接口,
public interface HandlerState {
void changeState(TCPHandler h);
void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException ;
}
上面的工作状态转换有WorkState完成
public class WorkState implements HandlerState {
public WorkState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new WriteState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException {
// TODO Auto-generated method stub
}
}
编写测试类
public class Main {
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
TCPReactor reactor = new TCPReactor(1333);
// new Thread(reactor).start();
reactor.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
客户端用单线程模式的就可以啦。
上面的代码,有没有发现所有的事件都注册在同一个selector上,selector表示好累!讲道理,ServerSocketChannel只是用来处理链接就可以了,它不需要处理读事件和写事件。读事件和写事件完全可以交给另一个选择器。这就是NIO的主从Reactor模型。
主线程只负责接收客户端连接,然后交其他从线程,使当有客户端连接时,可以很快的受到处理。同时,从线程专门负责读取注册到自己selector上面的客户端数据。并发读写能力得到了大大的提高。当然,如果,每一个SocketChannel的读写事件都注册到单独的selector上显然是浪费资源的,我们可以用一个selecort管理N个SocketChannel,也就是说对selector进行了分组。比如,用户管理模块注册一个selector,权限模块注册一个selector,日志模块注册一个selector,这样模块间的读写互不影响。selector数量取决你电脑CPU的核数,一般来说selecor数量为cpu核数2。也就是说,我们的主selector有1个,从selector有cpu2个。
OK!下面我们看这种主从Reactor模式的代码如何编写。 首先编写服务端
public class TCPReactor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector; // mainReactor用的selector
public TCPReactor(int port) throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
Acceptor acceptor = new Acceptor(ssc);
SelectionKey sk = ssc.register(selector,SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
sk.attach(acceptor); // 給定key一個附加的Acceptor對象
InetSocketAddress addr = new InetSocketAddress(port);
ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續運行
System.out.println("mainReactor waiting for new event on port: "
+ ssc.socket().getLocalPort() + "...");
try {
if (selector.select() == 0) // 若沒有事件就緒則不往下執行
continue;
} catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
it.remove();
}
}
}
/*
* name: dispatch(SelectionKey key)
* description: 調度方法,根據事件綁定的對象開新線程
*/
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
if (r != null)
r.run();
}
}
代码跟多线层模式基本一样,不解释了。 再来看Acceptor处理器
public class Acceptor implements Runnable {
private final ServerSocketChannel ssc; // mainReactor監聽的socket通道
private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數
private final Selector[] selectors = new Selector[cores]; // 創建核心數個selector給subReactor用
private int selIdx = 0; // 當前可使用的subReactor索引
private TCPSubReactor[] r = new TCPSubReactor[cores]; // subReactor線程
private Thread[] t = new Thread[cores]; // subReactor線程
public Acceptor(ServerSocketChannel ssc) throws IOException {
this.ssc = ssc;
// 創建多個selector以及多個subReactor線程
for (int i = 0; i < cores; i++) {
selectors[i] = Selector.open();
r[i] = new TCPSubReactor(selectors[i], ssc, i);
t[i] = new Thread(r[i]);
t[i].start();
}
}
@Override
public synchronized void run() {
try {
SocketChannel sc = ssc.accept(); // 接受client連線請求
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " is connected.");
if (sc != null) {
sc.configureBlocking(false); // 設置為非阻塞
r[selIdx].setRestart(true); // 暫停線程
selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回
SelectionKey sk = sc.register(selectors[selIdx],
SelectionKey.OP_READ); // SocketChannel向selector[selIdx]註冊一個OP_READ事件,然後返回該通道的key
selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回
r[selIdx].setRestart(false); // 重啟線程
sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
if (++selIdx == selectors.length)
selIdx = 0;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
此时,我们将读写事件注册到其他selector中,读写事件轮询注册到不同的子selector上,实现高并发处理。
private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數
private final Selector[] selectors = new Selector[cores]; // 創建核心數個selector給subReactor用
子selector
public class TCPReactor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector; // mainReactor用的selector
public TCPReactor(int port) throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
Acceptor acceptor = new Acceptor(ssc);
SelectionKey sk = ssc.register(selector,SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
sk.attach(acceptor); // 給定key一個附加的Acceptor對象
InetSocketAddress addr = new InetSocketAddress(port);
ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續運行
System.out.println("mainReactor waiting for new event on port: "
+ ssc.socket().getLocalPort() + "...");
try {
if (selector.select() == 0) // 若沒有事件就緒則不往下執行
continue;
} catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
it.remove();
}
}
}
/*
* name: dispatch(SelectionKey key)
* description: 調度方法,根據事件綁定的對象開新線程
*/
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
if (r != null)
r.run();
}
}
读写处理器
public class TCPHandler implements Runnable {
private final SelectionKey sk;
private final SocketChannel sc;
private static final int THREAD_COUNTING = 10;
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); // 線程池
HandlerState state; // 以狀態模式實現Handler
public TCPHandler(SelectionKey sk, SocketChannel sc) {
this.sk = sk;
this.sc = sc;
state = new ReadState(); // 初始狀態設定為READING
pool.setMaximumPoolSize(32); // 設置線程池最大線程數
}
@Override
public void run() {
try {
state.handle(this, sk, sc, pool);
} catch (IOException e) {
System.out.println("[Warning!] A client has been closed.");
closeChannel();
}
}
public void closeChannel() {
try {
sk.cancel();
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
public void setState(HandlerState state) {
this.state = state;
}
}
真正的读
public class ReadState implements HandlerState {
private SelectionKey sk;
public ReadState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new WorkState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // read()
this.sk = sk;
// non-blocking下不可用Readers,因為Readers不支援non-blocking
byte[] arr = new byte[1024];
ByteBuffer buf = ByteBuffer.wrap(arr);
int numBytes = sc.read(buf); // 讀取字符串
if(numBytes == -1)
{
System.out.println("[Warning!] A client has been closed.");
h.closeChannel();
return;
}
String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
if ((str != null) && !str.equals(" ")) {
h.setState(new WorkState()); // 改變狀態(READING->WORKING)
pool.execute(new WorkerThread(h, str)); // do process in worker thread
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " > " + str);
}
}
/*
* 執行邏輯處理之函數
*/
synchronized void process(TCPHandler h, String str) {
// do process(decode, logically process, encode)..
// ..
h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
}
/*
* 工作者線程
*/
class WorkerThread implements Runnable {
TCPHandler h;
String str;
public WorkerThread(TCPHandler h, String str) {
this.h = h;
this.str=str;
}
@Override
public void run() {
process(h, str);
}
}
}
真正的写
public class WriteState implements HandlerState {
public WriteState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new ReadState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // send()
// get message from message queue
String str = "Your message has sent to "
+ sc.socket().getLocalSocketAddress().toString() + "\r\n";
ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
while (buf.hasRemaining()) {
sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
}
h.setState(new ReadState()); // 改變狀態(SENDING->READING)
sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
}
}
编写客户端
public class Main {
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
TCPReactor reactor = new TCPReactor(1333);
// reactor.run();
Thread thread = new Thread(reactor);
thread.start();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如果第一次接触NIO,上面的代码读起来比较费劲,NIO编程确实麻烦,而且很容易出错,现实开发中不会用原生NIO库,小编都是用netty这个NIO框架进行编程,简单 高效 稳定,所以看不懂上面的代码没关系,只要理解上面的三幅图即可,这三幅图是netty最最核心的。下篇开始讲netty应用及源码。