前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >第十二节 netty前传-NIO 实现reactor模式

第十二节 netty前传-NIO 实现reactor模式

作者头像
用户1418372
发布2018-12-28 12:08:24
4120
发布2018-12-28 12:08:24
举报
文章被收录于专栏:清晨我上码清晨我上码

本节所要了解java nio的reactor模式,参考来源Doug lee java并发的作者。当然作为netty的底层实现,对于nio的reactor模式的实现,对于学习netty也是尤为重要的一步。

首先先作为对比先看下经典BIO模型

图片.png

  • 伪代码片段
代码语言:javascript
复制
class Server implements Runnable {
  public void run() {
    try {
//创建socket server套接字
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
//启动额外线程处理socket客户端连接后的业务处理
        new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
    } catch (IOException ex) { /* ... */ }
  }
//业务处理
  static class Handler implements Runnable {
    final Socket socket;
    Handler(Socket s) { socket = s; }
    public void run() {
      try {
byte[] input = new byte[MAX_INPUT];
        socket.getInputStream().read(input);
        byte[] output = process(input);
        socket.getOutputStream().write(output);
      } catch (IOException ex) { /* ... */ }
    }
    private byte[] process(byte[] cmd) { /* ... */ }
  }
}

nio不再详细介绍,可参考前面文章

主要说下reactor模式:简单来说reactor模式用于同时处理一个或多个传递给服务端的请求的事件的处理模式。 然后,服务端处理程序解析输入别的请求,并将它们同步分派给与之关联的请求异步处理程序。不恰当可类比web页面事件,当点击某个按钮时,浏览器收到这个信号(监听),分派给相关的js处理程序处理(handler)。

  • 注意两点 1、同步分派 2、异步处理程序 而上面两点的分派由Dispatch承担,异步处理由handler处理。 详细可参考reactor wiki介绍 3、 reactor实现
  • 借用doug的图 直观的感受下:

图片.png

关于Channels、Buffers、Selectors、SelectionKeys核心前面已介绍,下面实现会用到

基本代码讲解

  • Reactor类 作为nio 响应器模式的核心部分。承载了selector选择器、ServerSocketChannel 服务端的通道。三个重要的功能
  1. 对ServerSocketChannel、Selector初始化。 serverSocket.register方法将服务通道注册到选择器,并绑定ACCEPT兴趣事件(初始绑定,当客户端通道连接后会先处理accpet事件)
  2. 初始附加一个Acceptor对象(通过SelectionKey 的attach方法)。目的是用于分发时,借助Acceptor处理相关逻辑.
  3. dispatch方法的同步分发功能
  • Acceptor 类的作用其一是获取客户端与服务端的连接,其二是获取连接后调用handler处理(为了简化,handler使用状态模式来模拟其他事件,所以这里一旦有客户端连接,就会通过初始设置READING = 0 读事件)
代码语言:javascript
复制
class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        //注册ServerSocketChannel的兴趣事件为连接OP_ACCEPT
        SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
        //附加Acceptor,稍后调用attachment可以取得该对象
        sk.attach(new Acceptor());
    }
    public void run() {  //normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                //阻塞 直到有有一個通道返回
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //循环检测是否有新事件注册
                while (it.hasNext())
                    //同步分发
                    dispatch((SelectionKey)(it.next()));
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }
    void dispatch(SelectionKey k) {
        //取得attach附加的对象处理
        Runnable r = (Runnable)(k.attachment());
        if (r != null)
            r.run();
    }

    // class Reactor continued
class Acceptor implements Runnable {
        // inner
        public void run() {
            try {
          //接受到通道套接字的连接
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            }
            catch(IOException ex) { /* ... */ }
        }
    }
}
  • Handler 类 1、将自身绑定到选择器、并注册读事件( sk = socket.register( sel, 0); sk.attach(this);)2、 根据READING SENDING 状态判断事件
代码语言:javascript
复制
/**
 * handler用到状态模式,根据当前读写的状态分别处理
 */
final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(1024);
    ByteBuffer output = ByteBuffer.allocate(1024);
    static final int READING  = 0 ,SENDING = 1;
    int state = READING;
    Handler(Selector sel , SocketChannel c) throws IOException {
        socket = c;
        //设置通道为非阻塞
        c.configureBlocking(false);
// Optionally try first read now
        sk = socket.register( sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        //select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select  那么执行完wakeup后下一个执行select就会立即返回。
        sel.wakeup();
    }
    boolean inputIsComplete()  {/* 相关处理略... */ return true; }
    boolean outputIsComplete() { /*相关处理略 ... */return true; }
    void process(){}
    public void run() {
        try {
            if (state == READING)
                read();
            else if (state ==SENDING)
                send();
        } catch (IOException ex) { /* ... */ }
    }
    void read()throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            // 处理完成后设置发送状态
            state =SENDING;
            //注册写事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }
    void send()throws IOException {
        socket.write(output);
        if (outputIsComplete())
            sk.cancel();
    }
}

参考: github java设计模式 Doug Lea - java 并发包的作者讲解NIO

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 首先先作为对比先看下经典BIO模型
  • nio不再详细介绍,可参考前面文章
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档