Reactor模式的.net版本简单实现--DEMO

     近期在学习DotNetty,遇到不少的问题。由于dotnetty是次netty的.net版本的实现。导致在网上叙述dotnetty的原理,以及实现技巧方面的东西较少,这还是十分恼人的。在此建议学习和使用Dotnetty的和位小伙伴,真心阅读下netty的相关书籍,如《netty权威指南》。

     闲话少说,进入正题。netty的性能之所以能够达到如此的高度。主要由于他使用Reactor模式处理socket的请求,让服务器的使用率最大化,且尽量减少线程的开销。本文章主要简单介绍下Reactor模式。

一、reactor概论

reactor模式主要解决处理多个客户端请求的设计模式。

首先从类图我们可以得知:

Dispatcher:Handler管理器,以及调用度。他依赖于Demultiplexer类

Demultiplexer:事件管理器,接受外部的事件,并提供给Dispatch使用。

Handle:事件源,表示触发了那些事件

EventHandler:各种类型的处理器,用于处理具体的业务,以及I/O的读写

当然,也可以通过序列图看出首先需要初始化Dispatcher, Demultiplexer等相关类,以及注册具体的事件处理器。

二、代码的具体实现

类图如下[源码下载]:

2.1 多路复用事件处理器的代码

public class Demultiplexer
    {
        private ConcurrentQueue<Event> eventQuene = new ConcurrentQueue<Event>();
        private Object lockObj = new Object();

        public List<Event> Select()
        {
            return this.Select(0);
        }
        public List<Event> Select(int time)
        {
            if(time > 0)
            {
                if (this.eventQuene.IsEmpty)
                {
                    lock (lockObj)
                    {
                        if (this.eventQuene.IsEmpty)
                        {
                            System.Threading.Thread.Sleep(time);
                        }
                    }
                }
            }
            List<Event> events = new List<Event>();
            while(this.eventQuene.Count > 0)
            {
                Event tmp;
                if(this.eventQuene.TryDequeue(out tmp))
                {
                    events.Add(tmp);
                }
            }
            return events;
        }
        public void AddEvent(Event argEvent)
        {
            this.eventQuene.Enqueue(argEvent);
        }
    }

此类主要防止多线程的共同竞争,因为多路径复用选择器会被多个线程同时使用。所以使用的线程安全的Queue。

2.2 Handler触发器和管理器

/// <summary>
    /// Reactor的事件Handler触发器,提供事件Handler的注册,移除
    /// </summary>
    public class EventDispatch
    {
        private Demultiplexer demultiplexer;
        Dictionary<EventType, EventHandler> eventHandlerMap = new Dictionary<EventType, EventHandler>();

        public EventDispatch(Demultiplexer demultiplexer)
        {
            this.demultiplexer = demultiplexer;
        }

        public void RegisterHandler(EventType eventType, EventHandler eventHandler)
        {
            this.eventHandlerMap.Add(eventType, eventHandler);
        }
        public void RemoveHandler(EventType eventType)
        {
            this.eventHandlerMap.Remove(eventType);
        }

        public void HandleEvents()
        {
            this.Dispatch();
        }
        public void Dispatch()
        {
            string log = string.Format("thread id: {0} Dispatch", System.Threading.Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine(log);
            while (true)
            {
                List<Event> events = this.demultiplexer.Select();
                foreach(var itemEvent in events)
                {
                    EventHandler eventHandler = this.eventHandlerMap[itemEvent.EventType];
                    eventHandler.Handle(itemEvent);
                }
                System.Threading.Thread.Sleep(1000);
            }
        }
    }

主要职责,对Handler的注册、移除的管理,以及通过 多路复用选择器 选择相应的Handler进行处理。

2.3 服务端的实现

/// <summary>
    /// 开启接受请求的服务端
    /// </summary>
    public class AcceptRuner
    {
        private System.Collections.Concurrent.ConcurrentQueue<object> sourceQueue = new System.Collections.Concurrent.ConcurrentQueue<object>();

        private Demultiplexer demultiplexer;

        public AcceptRuner(Demultiplexer demultiplexer)
        {
            this.demultiplexer = demultiplexer;
        }

        public void adConnection(object source)
        {
            this.sourceQueue.Enqueue(source);
        }

        public void Run()
        {
            string log = string.Format("thread id: {0} AcceptRunner", System.Threading.Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine(log);
            while (true)
            {
                object source;
                if(this.sourceQueue.TryDequeue(out source))
                {
                    Event acceptEvent = new Event()
                    {
                        EventType = EventType.Accept,
                        Source = source
                    };
                    this.demultiplexer.AddEvent(acceptEvent);
                }
            }
        }
    }

此类效仿netty的serverBoostrap的实现,将外部新的连接以事件对象的形式添加到 多路复用选择器上。

2.4 其他类

Event:事件基类

EventHandler:事件处理器抽象基类。他派生了:AcceptEventHandler,ReadEventHandler。

EventType:事件类型

三、备注说明

1. 代码没有贴完整。但下载包就是完整的。

2. 这只我对Reactor模式的理解,如有偏颇之处,还望各拉指点一二。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏GuZhenYin

使用localResizeIMG3+WebAPI实现手机端图片上传

前言 惯例~惯例~昨天发表的使用OWIN作为WebAPI的宿主..嗯..有很多人问..是不是缺少了什么 - - 好吧,如果你要把OWIN寄宿在其他的地方...代...

23280
来自专栏恰童鞋骚年

自己动手写一个简单的MVC框架(第一版)

  路由(Route)、控制器(Controller)、行为(Action)、模型(Model)、视图(View)

16020
来自专栏Jackson0714

WCF传输1-你是否使用过压缩或Json序列化?

16740
来自专栏知识分享

51采集PCF8591数据通过ESP8266上传C#上位机android 之TCP客户端编程ESP8266使用详解NodeMCU初探ESP8266刷AT固件与nodemcu固件ESP8266使用详解-

这两天测试程序还发现一个bug就是如果客户端断开了,应该检测一下哪个断开了,数据就不应该发向那个连接,,,否则就会报错,然后模块会复位重启 所以加上这段代码 c...

59650
来自专栏Jackson0714

WCF传输1-你是否使用过压缩或Json序列化?

414100
来自专栏林德熙的博客

UWP WPF 解决 xaml 设计显示异常

例如我创建一个用户控件 TsjcyubtnTtqtjem 那么就可以在构造函数添加下面代码

21520
来自专栏菩提树下的杨过

FluorineFx:基于RSO(远程共享对象)的文本聊天室

在前一篇“FluorineFx:远程共享对象(Remote SharedObjects)”里,已经大致知道了在FluorineFX中如何使用RSO,这一篇将利用...

27580
来自专栏linjinhe的专栏

LevelDB:使用介绍

Get 接口和 Put 接口比较像,除了 leveldb::ReadOptions 参数是用来控制读操作的,具体见链接指向的代码。

77150
来自专栏哲学驱动设计

精通 WPF UI Virtualization

    本篇博客主要说明如何使用 UI Virtualization(以下简称为 UIV) 来提升 OEA 框架中 TreeGrid 控件的性能,同时,给出了一...

25790
来自专栏技术小讲堂

探寻ASP.NET MVC鲜为人知的奥秘(3):寻找多语言的最佳实践方式

如果你的网站需要被世界各地的人访问,访问者会使用各种不同的语言和文字书写习惯,那么创建一个支持多语言的网站就是十分必要的了,这一篇文章就讲述怎么快速合理的创建网...

28980

扫码关注云+社区

领取腾讯云代金券