TAF 必修课(二):Reactor多线程模型

作者:温昂展

一、概念理解

1. 同步 VS 异步

首先理解IO操作需要调用系统服务,那么:

同步IO,每个IO请求逐一处理,用户发起请求后需要等待或者轮询内核,直到IO操作完成继续执行;

异步IO,多个IO请求并发执行,用户发起请求后仍然继续执行,当内核完成后通知用户线程。

2. 阻塞 VS 非阻塞

阻塞,请求发出后,若条件不满足,线程Blocking;

非阻塞,请求发出后,若条件不满足返回标志信息,告知条件不满足,线程仍然Running。

个人理解:

以上两个概念,在某些场景下其实是没有什么区别的,因为通常异步不会阻塞。但是如果严格去探讨,两者是不等价的,同步可以实现非阻塞,异步也可以阻塞。理解上有点绕,我认为同步和异步表述的是线程间的协作关系,即之前的任务需要等先来的完成之后才进行(比如需要拿到上一次的返回值),而阻塞非阻塞强调的是发起一个请求是否立即返回结果(严格来说也可以是void,理解为返回空)。

二、Java NIO + Reactor多线程

下面回归正题,TAF的网络线程采用了JDK原生NIO实现Reactor模型,此时服务端一个IO线程可以同时处理客户端请求,如果放到Unix的IO模型去理解,这就是大名鼎鼎的IO多路复用!其实传统BIO+多线程也并非一无是处,如果考虑只是处理并发量不大的长连接,用这种模式实现可能更加简便,性能上也不会有多大差别。但是如果是应对高并发场景,IO多路复用势在必行。

在理解NIO上可以参照本系列上一节的两张附图,注意和BIO最大的两点区别是:

  1. 从面向流改为面向缓冲区ByteBuffer;
  2. 引入选择器selector,允许一个单独的线程同时监视多个通道。即:通过调用select/poll方法由一个用户态线程负责轮询多个Channel,直到某个阶段的数据就绪,再通知相应的用户线程(当前线程)执行IO操作,以此实现非阻塞和模拟异步化。

进一步考虑,上述实现是否还存在不足呢?

答案是肯定的,

  1. 尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理,因此需要将IO线程和业务线程解耦。
  2. 单Reactor局限性,可引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟,同时具有一定的容错性。并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。另外,在选择Reactor进行注册时可以通过轮询的方式达到一定的均衡。

这里贴一个简易实现代码,方便阅读:

NIOServer:

public class NIOServer {
  private static final Logger
          LOGGER = LoggerFactory.getLogger(NIOServer.class);
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    int coreNum = Runtime.getRuntime().availableProcessors();
    Reactor[] reactors = new Reactor[coreNum];
    for (int i = 0; i < reactors.length; i++) {
      reactors[i] = new Reactor();
    }
    int index = 0;
    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      for (SelectionKey key : keys) {
        keys.remove(key);
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          Reactor reactor = reactors[(int) ((index++) % coreNum)];
          reactor.addChannel(socketChannel);
          reactor.wakeup();
        }
      }
    }
  }
}

Reactor类:

public class Reactor {

  private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
  private static final ExecutorService service =
      Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
  private Selector selector;
  public Reactor() throws IOException {
    this.selector = SelectorProvider.provider().openSelector();
    start();
  }
  public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }
  public void wakeup() {
    this.selector.wakeup();
  }
  public void start() {
    service.submit(() -> {
      while (true) {
        if (selector.select(500) <= 0) {
          continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          if (key.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
              socketChannel.close();
              key.cancel();
              LOGGER.info("{}\t Read ended", socketChannel);
              continue;
            } else if (count == 0) {
              LOGGER.info("{}\t Message size is 0", socketChannel);
              continue;
            } else {
              LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
            }
          }
        }
      }
    });
  }
}

三、TAF线程模型

现在终于到了正题,理解了上面的问题,再来看TAF的线程模型就很容易理解了,其设计思路基本如上,并无二异。

TAF源码的线程模型部分相关类图如下:

其中,Reactor即为IO线程,SelecorManager就是负责启动多个Reactor线程并分发事件,代码实现上采用Round Robin方式;

ServerThreadPoolDispatcher为业务线程池,TCPSession将解码后request和response创建一个workThread任务放进去跑,实现了IO线程和业务线程的解耦。

TAF中核心代码如下:

/**
 *  SelectorManager 管理多个Reactor
 */
public synchronized void start()
{
    if (this.started)
    {
        return ;
    }

    this.started = true;
    for (Reactor reactor : this.reactorSet)
    {
        reactor.start();
    }
}

//轮询
public final Reactor nextReactor()
{
    return this.reactorSet[(int)(this.sets.incrementAndGet() % this.selectorPoolSize)];
}


public void run()
{
    try
    {
        for (;;)
        {
            selector.select();
            processRegister();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext())
            {
                SelectionKey key = iter.next();
                iter.remove();

                if( !key.isValid() ) continue;

                try {
                    //1. Update the last operation time
                    if (key.attachment() != null
                            && key.attachment() instanceof Session) {
                        ((Session)key.attachment()).updateLastOperationTime();
                    }

                    //2. Dispatch I/O event
                    dispatchEvent(key);

                } catch (Throwable ex) {
                    disConnectWithException(key, ex);
                }
            }
            processUnRegister();
        }
    } catch (Throwable e)
    {
        crashed = true;
        e.printStackTrace();
    }
}

当然从类图中也可以看出,具体实现上TAF还考虑了很多其他的问题和优化方案,如:过载保护等,这些留待下节说明。

感谢阅读,有错误之处还请不吝赐教。

致谢,在TAF的学习上要感谢实习中leader毛哥和导师jack的指导,让我少走了不少弯路,特别是思考问题的方式上,受到了极大的启发; 还有组里像terry浩哥,kahn哥、jeff哥,菠菜、loki 等所有小伙伴给予热情的解答和帮助。

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Janti

Java中的NIO基础知识

上一篇介绍了五种NIO模型,本篇将介绍Java中的NIO类库,为学习netty做好铺垫

833
来自专栏Java Edge

Tomcat架构解析之3 Connector NIOAcceptorPollerWorkerNioSelectorPool

2484
来自专栏Java3y

JDK10都发布了,nio你了解多少?

1509
来自专栏菩提树下的杨过

webservice今日遇到的二个问题:DataTable + Namespace

自从ms推出wcf以后,几乎没在web项目中用过webservice了,基本上都是以wcf host在iis中替代的。今天利用公司以前的web框架做一个新项目,...

1848
来自专栏芋道源码1024

【Netty 专栏】Netty 源码分析之 accept 过程

在Netty源码分析之NioEventLoop章节中,已经分析了NioEventLoop的工作机制,当有客户端connect请求,selector可以返回其对应...

760
来自专栏熊二哥

JavaNIO快速入门

NIO是Jdk中非常重要的一个组成部分,基于它的Netty开源框架可以很方便的开发高性能、高可靠性的网络服务器和客户端程序。本文将就其核心基础类型Channel...

3869
来自专栏猿天地

Netty4自带编解码器详解

前言 本篇文章是Netty专题的第五篇,前面四篇文章如下: 高性能NIO框架Netty入门篇 高性能NIO框架Netty-对象传输 高性能NIO框架Netty-...

2596
来自专栏分布式系统进阶

Kafka源码分析-网络层-2

这里面最主要的就是accept(key, processors(currentProcessor)) (4) accept: 设置新连接socket的参数后交...

651
来自专栏java 成神之路

NIO 之 Channel实现原理

41914
来自专栏Cloud Native - 产品级敏捷

三分钟学会 Java 单元测试

前言: 此篇文章使用 Junit 4.0, 希望给无任何单元测试经验的开发者, 能在最短的时间内, 开展单元测试的工作◦ 本文:  学习 Junit 的测试框架...

2068

扫码关注云+社区