因为NIO本身是非阻塞的,所以他的消息选择器Selector可以在单线程下连接多台客户端的访问。
为了加强NIO的性能,我们加入多线程的操作,当然NIO并不能简单的把Selector.select()放入Executor.execute(Runnable)的run方法中。
为完成NIO的多线程,我们应该有一个调度类,一个服务类。
调度类的目的是初始化一定数量的线程,以及线程交接。
package com.netty.nionetty.pool;
import com.netty.nionetty.NioServerBoss;
import com.netty.nionetty.NioServerWorker;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Administrator on 2018-05-17.
*/
public class NioSelectorRunnablePool {
private final AtomicInteger bossIndex = new AtomicInteger();
//欢迎线程数组
private Boss[] bosses;
private final AtomicInteger workerIndex = new AtomicInteger();
//工作线程数组
private Worker[] workers;
public NioSelectorRunnablePool(Executor boss,Executor worker) {
initBoss(boss,1);
initWorker(worker,Runtime.getRuntime().availableProcessors() * 2);
}
//初始化1个欢迎线程
private void initBoss(Executor boss,int count) {
this.bosses = new NioServerBoss[count];
for (int i = 0;i < bosses.length;i++) {
bosses[i] = new NioServerBoss(boss,"boss thread " + (i + 1),this);
}
}
//初始化2倍计算机核数的工作线程
private void initWorker(Executor worker,int count) {
this.workers = new NioServerWorker[count];
for (int i = 0; i < workers.length;i++) {
workers[i] = new NioServerWorker(worker,"worker thread" + (i + 1),this);
}
}
//交接工作线程(从工作线程数组中挑出)
public Worker nextWorker() {
return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
}
//交接欢迎线程(从欢迎线程数组中挑出)
public Boss nextBoss() {
return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
}
}
另外带一个欢迎线程接口,一个工作线程接口
package com.netty.nionetty.pool;
import java.nio.channels.ServerSocketChannel;
/**
* Created by Administrator on 2018-05-17.
*/
public interface Boss {
void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.netty.nionetty.pool;
import java.nio.channels.SocketChannel;
/**
* Created by Administrator on 2018-05-17.
*/
public interface Worker {
void registerNewChannelTask(SocketChannel channel);
}
有两种线程(欢迎线程和工作线程),所以我们有一个抽象线程类
package com.netty.nionetty;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by Administrator on 2018-05-17.
*/
public abstract class AbstractNioSelector implements Runnable {
//线程池
private final Executor executor;
//NIO消息选择器
protected Selector selector;
protected final AtomicBoolean wakeUp = new AtomicBoolean();
//线程任务队列
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
//线程名
private String threadName;
//线程调度器
protected NioSelectorRunnablePool selectorRunnablePool;
AbstractNioSelector(Executor executor,String threadName,NioSelectorRunnablePool selectorRunnablePool) {
this.executor = executor;
this.threadName = threadName;
this.selectorRunnablePool = selectorRunnablePool;
openSelector();
}
private void openSelector() {
try {
this.selector = Selector.open(); //打开消息选择器
} catch (IOException e) {
e.printStackTrace();
}
//把线程放入线程池,开始执行run方法
executor.execute(this);
}
public void run() {
Thread.currentThread().setName(this.threadName);
while (true) {
try {
wakeUp.set(false); //把消息选择器的状态定为未唤醒状态
select(selector); //消息选择器选择消息方式
processTaskQueue(); //因为在主程序中绑定端口的时候已经注册了接收通道任务线程,所以这里是读出任务。
process(selector); //任务处理,欢迎线程跟工作线程各不相同
} catch (IOException e) {
e.printStackTrace();
}
}
}
//欢迎线程跟工作线程各自添加不同的线程,再把消息选择器唤醒
protected final void registerTask(Runnable task) {
taskQueue.add(task);
Selector selector = this.selector;
if (selector != null) {
if (wakeUp.compareAndSet(false,true)) {
selector.wakeup();
}
}else {
taskQueue.remove(task);
}
}
public NioSelectorRunnablePool getSelectorRunnablePool() {
return selectorRunnablePool;
}
private void processTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
protected abstract int select(Selector selector) throws IOException;
protected abstract void process(Selector selector) throws IOException;
}
欢迎线程跟工作线程的具体实现
package com.netty.nionetty;
import com.netty.nionetty.pool.Boss;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import com.netty.nionetty.pool.Worker;
import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
/**
* Created by Administrator on 2018-05-17.
*/
public class NioServerBoss extends AbstractNioSelector implements Boss {
public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
super(executor,threadName,selectorRunnablePool);
}
//注册接收任务,会先调用抽象类,把任务线程先添加到任务队列,再注册接收消息类型
public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) {
final Selector selector = this.selector;
registerTask(new Runnable() {
public void run() {
try {
serverChannel.register(selector,SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
});
}
@Override
protected int select(Selector selector) throws IOException {
return selector.select();
}
//NIO操作,开始接收,接收后再启用工作线程,接收线程依然存在,而且工作线程也不断给到线程池未使用线程
//具体看初始化的时候初始了多少工作线程,但是是几个连接对应一个工作线程。
@Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator();i.hasNext();) {
SelectionKey key = i.next();
i.remove();
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
Worker nextWorker = getSelectorRunnablePool().nextWorker();
nextWorker.registerNewChannelTask(channel);
System.out.println("新客户端连接");
}
}
}
package com.netty.nionetty;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import com.netty.nionetty.pool.Worker;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
/**
* Created by Administrator on 2018-05-17.
*/
public class NioServerWorker extends AbstractNioSelector implements Worker {
public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
super(executor,threadName,selectorRunnablePool);
}
public void registerNewChannelTask(final SocketChannel channel) {
final Selector selector = this.selector;
registerTask(new Runnable() {
public void run() {
try {
channel.register(selector,SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
});
}
@Override
protected int select(Selector selector) throws IOException {
return selector.select(500);
}
@Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey)ite.next();
ite.remove();
SocketChannel channel = (SocketChannel)key.channel();
int ret = 0;
boolean failure = true;
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
ret = channel.read(buffer);
failure = false;
} catch (IOException e) {
e.printStackTrace();
}
if (ret <= 0 || failure) {
key.cancel();
System.out.println("客户端断开连接");
}else {
System.out.println("收到数据:" + new String(buffer.array()));
ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
channel.write(outBuffer);
}
}
}
}
服务类
package com.netty.nionetty;
import com.netty.nionetty.pool.Boss;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
/**
* Created by Administrator on 2018-05-17.
*/
public class ServerBootstap {
private NioSelectorRunnablePool selectorRunnablePool;
public ServerBootstap(NioSelectorRunnablePool selectorRunnablePool) {
this.selectorRunnablePool = selectorRunnablePool;
}
public void bind(final SocketAddress localAddress) {
try {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(localAddress);
Boss nextBoss = selectorRunnablePool.nextBoss();
nextBoss.registerAcceptChannelTask(serverChannel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
主程序
package com.netty.nionetty;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Created by Administrator on 2018-05-17.
*/
public class Start {
public static void main(String[] args) {
NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
ServerBootstap bootstrap = new ServerBootstap(nioSelectorRunnablePool);
bootstrap.bind(new InetSocketAddress(10101));
System.out.println("start");
}
}
其实最主要的就是在线程调度器中,各种线程已经被初始化存在于线程池内存中了,所以后面只是把这些线程拿出来,并注册消息类型,进行处理,这就是NIO的多线程处理了。