TAF 必修课(二):Reactor多线程模型

作者:温昂展

一、概念理解

1. 同步 VS 异步

首先理解IO操作需要调用系统服务,那么:

同步IO,每个IO请求逐一处理,用户发起请求后需要等待或者轮询内核,直到IO操作完成继续执行;

异步IO,多个IO请求并发执行,用户发起请求后仍然继续执行,当内核完成后通知用户线程。

2. 阻塞 VS 非阻塞

阻塞,请求发出后,若条件不满足,线程Blocking;

非阻塞,请求发出后,若条件不满足返回标志信息,告知条件不满足,线程仍然Running。

个人理解:

以上两个概念,在某些场景下其实是没有什么区别的,因为通常异步不会阻塞。但是如果严格去探讨,两者是不等价的,同步可以实现非阻塞,异步也可以阻塞。理解上有点绕,我认为同步和异步表述的是线程间的协作关系,即之前的任务需要等先来的完成之后才进行(比如需要拿到上一次的返回值),而阻塞非阻塞强调的是发起一个请求是否立即返回结果(严格来说也可以是void,理解为返回空)。

二、Java NIO + Reactor多线程

下面回归正题,TAF的网络线程采用了JDK原生NIO实现Reactor模型,此时服务端一个IO线程可以同时处理客户端请求,如果放到Unix的IO模型去理解,这就是大名鼎鼎的IO多路复用!其实传统BIO+多线程也并非一无是处,如果考虑只是处理并发量不大的长连接,用这种模式实现可能更加简便,性能上也不会有多大差别。但是如果是应对高并发场景,IO多路复用势在必行。

在理解NIO上可以参照本系列上一节的两张附图,注意和BIO最大的两点区别是:

  1. 从面向流改为面向缓冲区ByteBuffer;
  2. 引入选择器selector,允许一个单独的线程同时监视多个通道。即:通过调用select/poll方法由一个用户态线程负责轮询多个Channel,直到某个阶段的数据就绪,再通知相应的用户线程(当前线程)执行IO操作,以此实现非阻塞和模拟异步化。

进一步考虑,上述实现是否还存在不足呢?

答案是肯定的,

  1. 尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理,因此需要将IO线程和业务线程解耦。
  2. 单Reactor局限性,可引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟,同时具有一定的容错性。并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。另外,在选择Reactor进行注册时可以通过轮询的方式达到一定的均衡。

这里贴一个简易实现代码,方便阅读:

NIOServer:

public class NIOServer {
  private static final Logger
          LOGGER = LoggerFactory.getLogger(NIOServer.class);
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    int coreNum = Runtime.getRuntime().availableProcessors();
    Reactor[] reactors = new Reactor[coreNum];
    for (int i = 0; i < reactors.length; i++) {
      reactors[i] = new Reactor();
    }
    int index = 0;
    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      for (SelectionKey key : keys) {
        keys.remove(key);
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          Reactor reactor = reactors[(int) ((index++) % coreNum)];
          reactor.addChannel(socketChannel);
          reactor.wakeup();
        }
      }
    }
  }
}

Reactor类:

public class Reactor {

  private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
  private static final ExecutorService service =
      Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
  private Selector selector;
  public Reactor() throws IOException {
    this.selector = SelectorProvider.provider().openSelector();
    start();
  }
  public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }
  public void wakeup() {
    this.selector.wakeup();
  }
  public void start() {
    service.submit(() -> {
      while (true) {
        if (selector.select(500) <= 0) {
          continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          if (key.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
              socketChannel.close();
              key.cancel();
              LOGGER.info("{}\t Read ended", socketChannel);
              continue;
            } else if (count == 0) {
              LOGGER.info("{}\t Message size is 0", socketChannel);
              continue;
            } else {
              LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
            }
          }
        }
      }
    });
  }
}

三、TAF线程模型

现在终于到了正题,理解了上面的问题,再来看TAF的线程模型就很容易理解了,其设计思路基本如上,并无二异。

TAF源码的线程模型部分相关类图如下:

其中,Reactor即为IO线程,SelecorManager就是负责启动多个Reactor线程并分发事件,代码实现上采用Round Robin方式;

ServerThreadPoolDispatcher为业务线程池,TCPSession将解码后request和response创建一个workThread任务放进去跑,实现了IO线程和业务线程的解耦。

TAF中核心代码如下:

/**
 *  SelectorManager 管理多个Reactor
 */
public synchronized void start()
{
    if (this.started)
    {
        return ;
    }

    this.started = true;
    for (Reactor reactor : this.reactorSet)
    {
        reactor.start();
    }
}

//轮询
public final Reactor nextReactor()
{
    return this.reactorSet[(int)(this.sets.incrementAndGet() % this.selectorPoolSize)];
}


public void run()
{
    try
    {
        for (;;)
        {
            selector.select();
            processRegister();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext())
            {
                SelectionKey key = iter.next();
                iter.remove();

                if( !key.isValid() ) continue;

                try {
                    //1. Update the last operation time
                    if (key.attachment() != null
                            && key.attachment() instanceof Session) {
                        ((Session)key.attachment()).updateLastOperationTime();
                    }

                    //2. Dispatch I/O event
                    dispatchEvent(key);

                } catch (Throwable ex) {
                    disConnectWithException(key, ex);
                }
            }
            processUnRegister();
        }
    } catch (Throwable e)
    {
        crashed = true;
        e.printStackTrace();
    }
}

当然从类图中也可以看出,具体实现上TAF还考虑了很多其他的问题和优化方案,如:过载保护等,这些留待下节说明。

感谢阅读,有错误之处还请不吝赐教。

致谢,在TAF的学习上要感谢实习中leader毛哥和导师jack的指导,让我少走了不少弯路,特别是思考问题的方式上,受到了极大的启发; 还有组里像terry浩哥,kahn哥、jeff哥,菠菜、loki 等所有小伙伴给予热情的解答和帮助。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏SAP最佳业务实践

SAP最佳业务实践:SD–客户寄售(119)-2寄售补货

一、VA01寄售补货订单 寄售补货不完全代表实际向客户销售货物。但是,订单输入使用的机制与标准订单输入相同。例如,当输入订单时,系统将对物料执行可用性检查并自动...

3855
来自专栏专注于主流技术和业务

SpringMVC源码阅读:ContextLoaderListener初始化过程

ContextLoaderListener监听器的作用就是启动web容器时,自动装配ApplicationContext的配置信息。它实现了ServletCon...

2214
来自专栏chenssy

【死磕Netty】-----NIO基础详解

原文出处http://cmsblogs.com/ 『chenssy』 转载请注明原创出处,谢谢! Netty 是基于Java NIO 封装的网络通讯框架,只有充...

4546
来自专栏JavaEdge

Tomcat架构解析之3 Connector NIOAcceptorPollerWorkerNioSelectorPool

2824
来自专栏实战docker

Docker下部署dubbo,消费者应用无法使用link参数的问题

在前一篇文章《Docker下dubbo开发,三部曲之一:极速体验》中,我们快速体验了部署在Docker环境下的dubbo服务,当时一共启动了四个容器,具体情况为...

2729
来自专栏SAP最佳业务实践

SAP最佳业务实践:SD–外贸出口处理(118)-4发货

一、VL10C创建交货 1. 在 销售订单项目 屏幕上,进行以下输入: 字段名称用户操作和值注释装运点/接收点<装运点> 交货创建日期(从)<输入交货创建日期>...

38510
来自专栏蓝天

基于zookeeper的主备切换方法

继承CZookeeperHelper即可快速实现主备切换: https://github.com/eyjian/mooon/blob/master/mooo...

1292
来自专栏YG小书屋

ES节点丢失导致实时数据导入速度特别慢

8172
来自专栏java 成神之路

NIO 之 Channel实现原理

46014
来自专栏coolblog.xyz技术专栏

基于 Java NIO 实现简单的 HTTP 服务器

本文是上一篇文章实践篇,在上一篇文章中,我分析了选择器 Selector 的原理。本篇文章,我们来说说 Selector 的应用,如标题所示,这里我基于 Jav...

69312

扫码关注云+社区

领取腾讯云代金券