作者:温昂展
1. 同步 VS 异步
首先理解IO操作需要调用系统服务,那么:
同步IO,每个IO请求逐一处理,用户发起请求后需要等待或者轮询内核,直到IO操作完成继续执行;
异步IO,多个IO请求并发执行,用户发起请求后仍然继续执行,当内核完成后通知用户线程。
2. 阻塞 VS 非阻塞
阻塞,请求发出后,若条件不满足,线程Blocking;
非阻塞,请求发出后,若条件不满足返回标志信息,告知条件不满足,线程仍然Running。
个人理解:
以上两个概念,在某些场景下其实是没有什么区别的,因为通常异步不会阻塞。但是如果严格去探讨,两者是不等价的,同步可以实现非阻塞,异步也可以阻塞。理解上有点绕,我认为同步和异步表述的是线程间的协作关系,即之前的任务需要等先来的完成之后才进行(比如需要拿到上一次的返回值),而阻塞非阻塞强调的是发起一个请求是否立即返回结果(严格来说也可以是void,理解为返回空)。
下面回归正题,TAF的网络线程采用了JDK原生NIO实现Reactor模型,此时服务端一个IO线程可以同时处理客户端请求,如果放到Unix的IO模型去理解,这就是大名鼎鼎的IO多路复用!其实传统BIO+多线程也并非一无是处,如果考虑只是处理并发量不大的长连接,用这种模式实现可能更加简便,性能上也不会有多大差别。但是如果是应对高并发场景,IO多路复用势在必行。
在理解NIO上可以参照本系列上一节的两张附图,注意和BIO最大的两点区别是:
进一步考虑,上述实现是否还存在不足呢?
答案是肯定的,
这里贴一个简易实现代码,方便阅读:
NIOServer:
public class NIOServer {
private static final Logger
LOGGER = LoggerFactory.getLogger(NIOServer.class);
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int coreNum = Runtime.getRuntime().availableProcessors();
Reactor[] reactors = new Reactor[coreNum];
for (int i = 0; i < reactors.length; i++) {
reactors[i] = new Reactor();
}
int index = 0;
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
keys.remove(key);
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
Reactor reactor = reactors[(int) ((index++) % coreNum)];
reactor.addChannel(socketChannel);
reactor.wakeup();
}
}
}
}
}
Reactor类:
public class Reactor {
private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
private static final ExecutorService service =
Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
private Selector selector;
public Reactor() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
start();
}
public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
public void wakeup() {
this.selector.wakeup();
}
public void start() {
service.submit(() -> {
while (true) {
if (selector.select(500) <= 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
key.cancel();
LOGGER.info("{}\t Read ended", socketChannel);
continue;
} else if (count == 0) {
LOGGER.info("{}\t Message size is 0", socketChannel);
continue;
} else {
LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
}
}
}
}
});
}
}
现在终于到了正题,理解了上面的问题,再来看TAF的线程模型就很容易理解了,其设计思路基本如上,并无二异。
TAF源码的线程模型部分相关类图如下:
其中,Reactor即为IO线程,SelecorManager就是负责启动多个Reactor线程并分发事件,代码实现上采用Round Robin方式;
ServerThreadPoolDispatcher为业务线程池,TCPSession将解码后request和response创建一个workThread任务放进去跑,实现了IO线程和业务线程的解耦。
TAF中核心代码如下:
/**
* SelectorManager 管理多个Reactor
*/
public synchronized void start()
{
if (this.started)
{
return ;
}
this.started = true;
for (Reactor reactor : this.reactorSet)
{
reactor.start();
}
}
//轮询
public final Reactor nextReactor()
{
return this.reactorSet[(int)(this.sets.incrementAndGet() % this.selectorPoolSize)];
}
public void run()
{
try
{
for (;;)
{
selector.select();
processRegister();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext())
{
SelectionKey key = iter.next();
iter.remove();
if( !key.isValid() ) continue;
try {
//1. Update the last operation time
if (key.attachment() != null
&& key.attachment() instanceof Session) {
((Session)key.attachment()).updateLastOperationTime();
}
//2. Dispatch I/O event
dispatchEvent(key);
} catch (Throwable ex) {
disConnectWithException(key, ex);
}
}
processUnRegister();
}
} catch (Throwable e)
{
crashed = true;
e.printStackTrace();
}
}
当然从类图中也可以看出,具体实现上TAF还考虑了很多其他的问题和优化方案,如:过载保护等,这些留待下节说明。
感谢阅读,有错误之处还请不吝赐教。
致谢,在TAF的学习上要感谢实习中leader毛哥和导师jack的指导,让我少走了不少弯路,特别是思考问题的方式上,受到了极大的启发; 还有组里像terry浩哥,kahn哥、jeff哥,菠菜、loki 等所有小伙伴给予热情的解答和帮助。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。