前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >TAF 必修课(二):Reactor多线程模型

TAF 必修课(二):Reactor多线程模型

原创
作者头像
serena
修改2021-08-03 14:56:08
2.3K0
修改2021-08-03 14:56:08
举报
文章被收录于专栏:社区的朋友们社区的朋友们

作者:温昂展

一、概念理解

1. 同步 VS 异步

首先理解IO操作需要调用系统服务,那么:

同步IO,每个IO请求逐一处理,用户发起请求后需要等待或者轮询内核,直到IO操作完成继续执行;

异步IO,多个IO请求并发执行,用户发起请求后仍然继续执行,当内核完成后通知用户线程。

2. 阻塞 VS 非阻塞

阻塞,请求发出后,若条件不满足,线程Blocking;

非阻塞,请求发出后,若条件不满足返回标志信息,告知条件不满足,线程仍然Running。

个人理解:

以上两个概念,在某些场景下其实是没有什么区别的,因为通常异步不会阻塞。但是如果严格去探讨,两者是不等价的,同步可以实现非阻塞,异步也可以阻塞。理解上有点绕,我认为同步和异步表述的是线程间的协作关系,即之前的任务需要等先来的完成之后才进行(比如需要拿到上一次的返回值),而阻塞非阻塞强调的是发起一个请求是否立即返回结果(严格来说也可以是void,理解为返回空)。

二、Java NIO + Reactor多线程

下面回归正题,TAF的网络线程采用了JDK原生NIO实现Reactor模型,此时服务端一个IO线程可以同时处理客户端请求,如果放到Unix的IO模型去理解,这就是大名鼎鼎的IO多路复用!其实传统BIO+多线程也并非一无是处,如果考虑只是处理并发量不大的长连接,用这种模式实现可能更加简便,性能上也不会有多大差别。但是如果是应对高并发场景,IO多路复用势在必行。

在理解NIO上可以参照本系列上一节的两张附图,注意和BIO最大的两点区别是:

  1. 从面向流改为面向缓冲区ByteBuffer;
  2. 引入选择器selector,允许一个单独的线程同时监视多个通道。即:通过调用select/poll方法由一个用户态线程负责轮询多个Channel,直到某个阶段的数据就绪,再通知相应的用户线程(当前线程)执行IO操作,以此实现非阻塞和模拟异步化。

进一步考虑,上述实现是否还存在不足呢?

答案是肯定的,

  1. 尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理,因此需要将IO线程和业务线程解耦。
  2. 单Reactor局限性,可引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟,同时具有一定的容错性。并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。另外,在选择Reactor进行注册时可以通过轮询的方式达到一定的均衡。

这里贴一个简易实现代码,方便阅读:

NIOServer:

代码语言:javascript
复制
public class NIOServer {
  private static final Logger
          LOGGER = LoggerFactory.getLogger(NIOServer.class);
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    int coreNum = Runtime.getRuntime().availableProcessors();
    Reactor[] reactors = new Reactor[coreNum];
    for (int i = 0; i < reactors.length; i++) {
      reactors[i] = new Reactor();
    }
    int index = 0;
    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      for (SelectionKey key : keys) {
        keys.remove(key);
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          Reactor reactor = reactors[(int) ((index++) % coreNum)];
          reactor.addChannel(socketChannel);
          reactor.wakeup();
        }
      }
    }
  }
}

Reactor类:

代码语言:javascript
复制
public class Reactor {

  private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
  private static final ExecutorService service =
      Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
  private Selector selector;
  public Reactor() throws IOException {
    this.selector = SelectorProvider.provider().openSelector();
    start();
  }
  public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }
  public void wakeup() {
    this.selector.wakeup();
  }
  public void start() {
    service.submit(() -> {
      while (true) {
        if (selector.select(500) <= 0) {
          continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          if (key.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
              socketChannel.close();
              key.cancel();
              LOGGER.info("{}\t Read ended", socketChannel);
              continue;
            } else if (count == 0) {
              LOGGER.info("{}\t Message size is 0", socketChannel);
              continue;
            } else {
              LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
            }
          }
        }
      }
    });
  }
}

三、TAF线程模型

现在终于到了正题,理解了上面的问题,再来看TAF的线程模型就很容易理解了,其设计思路基本如上,并无二异。

TAF源码的线程模型部分相关类图如下:

其中,Reactor即为IO线程,SelecorManager就是负责启动多个Reactor线程并分发事件,代码实现上采用Round Robin方式;

ServerThreadPoolDispatcher为业务线程池,TCPSession将解码后request和response创建一个workThread任务放进去跑,实现了IO线程和业务线程的解耦。

TAF中核心代码如下:

代码语言:javascript
复制
/**
 *  SelectorManager 管理多个Reactor
 */
public synchronized void start()
{
    if (this.started)
    {
        return ;
    }

    this.started = true;
    for (Reactor reactor : this.reactorSet)
    {
        reactor.start();
    }
}

//轮询
public final Reactor nextReactor()
{
    return this.reactorSet[(int)(this.sets.incrementAndGet() % this.selectorPoolSize)];
}


public void run()
{
    try
    {
        for (;;)
        {
            selector.select();
            processRegister();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext())
            {
                SelectionKey key = iter.next();
                iter.remove();

                if( !key.isValid() ) continue;

                try {
                    //1. Update the last operation time
                    if (key.attachment() != null
                            && key.attachment() instanceof Session) {
                        ((Session)key.attachment()).updateLastOperationTime();
                    }

                    //2. Dispatch I/O event
                    dispatchEvent(key);

                } catch (Throwable ex) {
                    disConnectWithException(key, ex);
                }
            }
            processUnRegister();
        }
    } catch (Throwable e)
    {
        crashed = true;
        e.printStackTrace();
    }
}

当然从类图中也可以看出,具体实现上TAF还考虑了很多其他的问题和优化方案,如:过载保护等,这些留待下节说明。

感谢阅读,有错误之处还请不吝赐教。

致谢,在TAF的学习上要感谢实习中leader毛哥和导师jack的指导,让我少走了不少弯路,特别是思考问题的方式上,受到了极大的启发; 还有组里像terry浩哥,kahn哥、jeff哥,菠菜、loki 等所有小伙伴给予热情的解答和帮助。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概念理解
  • 二、Java NIO + Reactor多线程
  • 三、TAF线程模型
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档