前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >System|网络|SEDA Web Server

System|网络|SEDA Web Server

作者头像
朝闻君
发布2021-11-22 10:47:57
2800
发布2021-11-22 10:47:57
举报
文章被收录于专栏:用户9199536的专栏

SEDA(分阶段事件驱动模型)是高性能异步流水线服务器模型,每个阶段由一个事件队列和一个线程池组成。

线程将会对事件进行批处理并制造新的事件到其他阶段,从而提供异步的IO与业务逻辑,并进行流水线化。和Netty实现不同,netty的流水线本质上是职责链模式,同一个handler由单一线程完成,而SEDA的流水线每个阶段都可以由独立的线程完成,线程只负责当前stage。

主从Reactor

主Reactor负责accept,从Reactor负责监听IO。

Acceptor负责将建立的连接端口注册到从Reactor上

代码语言:javascript
复制
private void accept(SelectionKey key) throws IOException {
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    SocketChannel clientChannel = ssc.accept();
    clientChannel.configureBlocking(false);
    SelectionKey newkey = clientChannel.register(subSelector, SelectionKey.OP_READ);
    newkey.attach(ByteBuffer.allocate(1024));
    System.out.println("a new client connected "+clientChannel.getRemoteAddress());
}

而从Reactor单纯地发出事件,并屏蔽信号,因此开销很低。

这里屏蔽信号的原因是因为异步处理时由于NIO是水平触发,会产生大量重复事件。

代码语言:javascript
复制
               if (!key.isValid()) {
                    continue;
                }
               if (key.isReadable()) {
                    key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
                    Event event = new Event(key, Event.Type.Read);
                    StageMap.getInstance().stageMap.get("read").Enqueue(event);
                }
               if(key.isWritable()){
                   key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                   Event event = new Event(key, Event.Type.Flush);
                   StageMap.getInstance().stageMap.get("flush").Enqueue(event);
               }

Stage

每个Stage都是通过信箱的方式进行通信,然后通过线程池创建线程进行消费。

ReadStage(Input + Encode)

负责读取TCP流并进行Http请求的Decode,读取一旦完成,立刻恢复OP_READ的监听。

此后,通知AppStage处理业务逻辑

这里的ReadStage可能需要拆分成Read和Encode两个Stage

代码语言:javascript
复制
                        int numRead = socketChannel.read(readBuffer);
                        e.key.interestOps(e.key.interestOps() | SelectionKey.OP_READ);
                        String str = new String(readBuffer.array(), 0, numRead);
                        System.out.println("Async " + str);
                        Event event = new Event(e.key, Event.Type.ReadRepsonse);
                        String lines[] = str.split("\\r?\\n");
                        String param[] = lines[0].split(" ");
                        event.httpType = parseHttpType(param[0]);
                        event.Packet = param[1];
                        StageMap.getInstance().stageMap.get("app").Enqueue(event);

AppStage(Application)

负责处理业务逻辑并将结果通过事件传递给Write Stage,并且处理Write的回调。

在这里,我实现了两种业务逻辑,分别是求幂和加法

代码语言:javascript
复制
                    if (e.type == Event.Type.ReadRepsonse) {
                        System.out.println("APP Read " + e.Packet);
                        HashMap<String,String >params = RestfulParser.parse(e.Packet);
                        Event event = new Event(e.key, Event.Type.Write);
                        event.Packet = Dispatcher.dispatch(e.httpType,params);
                        //event.Packet = params.toString();
                        StageMap.getInstance().stageMap.get("write").Enqueue(event);
                    }
                    else if(e.type == Event.Type.WriteResponse){
                        System.out.println("Write Done");
                    }

WriteStage(Decode)

负责将业务逻辑的结果进行Http响应的Encode并写入缓冲区,并且监听OP_WRITE。此后,从Reactor将会在writable时要求进行异步的flush。

并向AppStage发送事件处理回调(暂时没想好要怎么回调)

代码语言:javascript
复制
                        ByteBuffer sendBuffer = (ByteBuffer) e.key.attachment();
                        SocketChannel channel = (SocketChannel)e.key.channel();
                        sendBuffer.put((header+String.format(template,e.Packet)).getBytes());
                        e.key.interestOps(e.key.interestOps() | SelectionKey.OP_WRITE);
                        Event event = new Event(e.key, Event.Type.WriteResponse);
                        StageMap.getInstance().stageMap.get("app").Enqueue(event);

FlushStage(Output)

负责将缓冲区的数据进行输出

代码语言:javascript
复制
                        ByteBuffer sendBuffer = (ByteBuffer) e.key.attachment();
                        SocketChannel channel = (SocketChannel)e.key.channel();
                        sendBuffer.flip();
                        if(sendBuffer.hasRemaining())
                            channel.write(sendBuffer);
                        sendBuffer.clear();

演示

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 主从Reactor
  • Stage
  • 演示
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档