TAF 必修课(二):Reactor多线程模型

作者:温昂展

一、概念理解

1. 同步 VS 异步

首先理解IO操作需要调用系统服务,那么:

同步IO,每个IO请求逐一处理,用户发起请求后需要等待或者轮询内核,直到IO操作完成继续执行;

异步IO,多个IO请求并发执行,用户发起请求后仍然继续执行,当内核完成后通知用户线程。

2. 阻塞 VS 非阻塞

阻塞,请求发出后,若条件不满足,线程Blocking;

非阻塞,请求发出后,若条件不满足返回标志信息,告知条件不满足,线程仍然Running。

个人理解:

以上两个概念,在某些场景下其实是没有什么区别的,因为通常异步不会阻塞。但是如果严格去探讨,两者是不等价的,同步可以实现非阻塞,异步也可以阻塞。理解上有点绕,我认为同步和异步表述的是线程间的协作关系,即之前的任务需要等先来的完成之后才进行(比如需要拿到上一次的返回值),而阻塞非阻塞强调的是发起一个请求是否立即返回结果(严格来说也可以是void,理解为返回空)。

二、Java NIO + Reactor多线程

下面回归正题,TAF的网络线程采用了JDK原生NIO实现Reactor模型,此时服务端一个IO线程可以同时处理客户端请求,如果放到Unix的IO模型去理解,这就是大名鼎鼎的IO多路复用!其实传统BIO+多线程也并非一无是处,如果考虑只是处理并发量不大的长连接,用这种模式实现可能更加简便,性能上也不会有多大差别。但是如果是应对高并发场景,IO多路复用势在必行。

在理解NIO上可以参照本系列上一节的两张附图,注意和BIO最大的两点区别是:

  1. 从面向流改为面向缓冲区ByteBuffer;
  2. 引入选择器selector,允许一个单独的线程同时监视多个通道。即:通过调用select/poll方法由一个用户态线程负责轮询多个Channel,直到某个阶段的数据就绪,再通知相应的用户线程(当前线程)执行IO操作,以此实现非阻塞和模拟异步化。

进一步考虑,上述实现是否还存在不足呢?

答案是肯定的,

  1. 尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理,因此需要将IO线程和业务线程解耦。
  2. 单Reactor局限性,可引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟,同时具有一定的容错性。并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。另外,在选择Reactor进行注册时可以通过轮询的方式达到一定的均衡。

这里贴一个简易实现代码,方便阅读:

NIOServer:

public class NIOServer {
  private static final Logger
          LOGGER = LoggerFactory.getLogger(NIOServer.class);
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    int coreNum = Runtime.getRuntime().availableProcessors();
    Reactor[] reactors = new Reactor[coreNum];
    for (int i = 0; i < reactors.length; i++) {
      reactors[i] = new Reactor();
    }
    int index = 0;
    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      for (SelectionKey key : keys) {
        keys.remove(key);
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          Reactor reactor = reactors[(int) ((index++) % coreNum)];
          reactor.addChannel(socketChannel);
          reactor.wakeup();
        }
      }
    }
  }
}

Reactor类:

public class Reactor {

  private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
  private static final ExecutorService service =
      Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
  private Selector selector;
  public Reactor() throws IOException {
    this.selector = SelectorProvider.provider().openSelector();
    start();
  }
  public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }
  public void wakeup() {
    this.selector.wakeup();
  }
  public void start() {
    service.submit(() -> {
      while (true) {
        if (selector.select(500) <= 0) {
          continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          if (key.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
              socketChannel.close();
              key.cancel();
              LOGGER.info("{}\t Read ended", socketChannel);
              continue;
            } else if (count == 0) {
              LOGGER.info("{}\t Message size is 0", socketChannel);
              continue;
            } else {
              LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
            }
          }
        }
      }
    });
  }
}

三、TAF线程模型

现在终于到了正题,理解了上面的问题,再来看TAF的线程模型就很容易理解了,其设计思路基本如上,并无二异。

TAF源码的线程模型部分相关类图如下:

其中,Reactor即为IO线程,SelecorManager就是负责启动多个Reactor线程并分发事件,代码实现上采用Round Robin方式;

ServerThreadPoolDispatcher为业务线程池,TCPSession将解码后request和response创建一个workThread任务放进去跑,实现了IO线程和业务线程的解耦。

TAF中核心代码如下:

/**
 *  SelectorManager 管理多个Reactor
 */
public synchronized void start()
{
    if (this.started)
    {
        return ;
    }

    this.started = true;
    for (Reactor reactor : this.reactorSet)
    {
        reactor.start();
    }
}

//轮询
public final Reactor nextReactor()
{
    return this.reactorSet[(int)(this.sets.incrementAndGet() % this.selectorPoolSize)];
}


public void run()
{
    try
    {
        for (;;)
        {
            selector.select();
            processRegister();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext())
            {
                SelectionKey key = iter.next();
                iter.remove();

                if( !key.isValid() ) continue;

                try {
                    //1. Update the last operation time
                    if (key.attachment() != null
                            && key.attachment() instanceof Session) {
                        ((Session)key.attachment()).updateLastOperationTime();
                    }

                    //2. Dispatch I/O event
                    dispatchEvent(key);

                } catch (Throwable ex) {
                    disConnectWithException(key, ex);
                }
            }
            processUnRegister();
        }
    } catch (Throwable e)
    {
        crashed = true;
        e.printStackTrace();
    }
}

当然从类图中也可以看出,具体实现上TAF还考虑了很多其他的问题和优化方案,如:过载保护等,这些留待下节说明。

感谢阅读,有错误之处还请不吝赐教。

致谢,在TAF的学习上要感谢实习中leader毛哥和导师jack的指导,让我少走了不少弯路,特别是思考问题的方式上,受到了极大的启发; 还有组里像terry浩哥,kahn哥、jeff哥,菠菜、loki 等所有小伙伴给予热情的解答和帮助。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大数据架构

Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式

1505
来自专栏个人分享

NIO源码阅读

  自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:

814
来自专栏Java Edge

MySQL的锁1 MySql的三种锁2 表锁的锁模式3 MyISAM的并发锁4 InnoDB锁问题5 关于死锁6 总结7 索引与锁

5496
来自专栏青枫的专栏

MySQL学习小结

771
来自专栏小灰灰

Mysql之锁与事务

平时的业务中,顶多也就是写写简单的sql,连事务都用的少,对锁这一块的了解就更加欠缺了,之前一个大神分享了下mysql的事务隔离级别,感觉挺有意思的,正好发现一...

43913
来自专栏飞总聊IT

总结一下SQL NULL吧

这篇文章主要回答网友姜锐(森原)。 网上并没有太好的文章总结NULL,比较有效的办法是自己去读SQL标准了。通常SQL98最重要。 我总结一下NULL在标准里...

30211
来自专栏搜云库

MySQL/InnoDB中,乐观锁、悲观锁、共享锁、排它锁、行锁、表锁、死锁概念的理解

MySQL/InnoDB的加锁,一直是一个面试中常问的话题。例如,数据库如果有高并发请求,如何保证数据完整性?产生死锁问题如何排查并解决?我在工作过程中,也会经...

1584
来自专栏数据和云

如何理解并正确使用MySql索引

索引是存储引擎用于快速查找记录的一种数据结构,通过合理的使用数据库索引可以大大提高系统的访问性能,本文主要介绍在MySql数据库中索引类型,以及如何创建出更加合...

3386
来自专栏决胜机器学习

优化页面访问速度(二) ——数据库优化

数据库优化,主要包括数据表设计、索引、sql语句、表拆分、数据库服务器架构等方向的优化。

965
来自专栏琯琯博客

Yii2 学习笔记之数据库篇

3327

扫码关注云+社区