专栏首页白驹过隙ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

框架描述

服务器层次:

  • I/O层:对应具体的文件描述符处理,对应ACE中的handle。
  • Dispatch层:事件分发,将I/O事件分发到对应绑定的处理队列等待业务处理,对应ACE中的Event_handle。
  • 业务层:处理具体业务,包含一组线程或进程,并发处理业务。对应ACE中的ACE_Task。

三层结构与五层网络的网络层,传输层,应用层类似对应。

Reactor模式:

  • I/O处理:ACE_Reactor使用select复用完成,将注册进去的IOhandle进行事件监听。
  • 消息队列:ACE_Task中包含一个消息队列。I/O产生事件后执行绑定的Event函数将消息插入对应的消息队列。
  • 服务进程:ACE_Task内可以构造一个线程池,获取消息队列进行业务并发处理。

下面是Reactor代码实现,包含I/O、dispatch、反应器、和线程池:

  1 /*-----------------------------------------------------------------
  2 *  filename:    Reactor.cpp
  3 *  author:        bing
  4 *  time:        2016-06-29 15:26
  5 *  function:    using ACE Reactor implement I/O multiplex server, 
  6 *               include service thread pool.
  7 *-----------------------------------------------------------------*/
  8 #include <ace/INET_Addr.h>
  9 #include <ace/SOCK_Acceptor.h>
 10 #include <ace/SOCK_Stream.h>
 11 #include <ace/Reactor.h> 
 12 #include <ace/Log_Msg.h>
 13 #include "ace/Task.h"
 14 #include "ace/OS.h" 
 15 #include <list>
 16 
 17 #define MAX_BUFF_SIZE     1024
 18 #define LISTEN_PORT     5010
 19 #define SERVER_IP        ACE_LOCALHOST
 20 #define THREAD_NUM        10
 21 
 22 struct MsgData
 23 {
 24     ACE_HANDLE* IOHandle;
 25     int DataFlag;
 26     char Data[MAX_BUFF_SIZE];
 27     MsgData()
 28     {
 29         IOHandle = NULL;
 30         DataFlag = -1;
 31         ACE_OS::memset(Data, 0, sizeof(Data));
 32     }
 33 };
 34 
 35 class TaskThread;
 36 
 37 class ServerStream : public ACE_Event_Handler
 38 {
 39 public:
 40     ServerStream(TaskThread* pMsgQueue);
 41     ~ServerStream();
 42     ACE_SOCK_Stream& GetStream(){return m_Svr_stream;}      //给accept提供接口绑定数据通道
 43     virtual int handle_input(ACE_HANDLE fd);        //I/O触发事件后调用
 44     void close();
 45     virtual ACE_HANDLE get_handle(void) const {return m_Svr_stream.get_handle();}   //不重载需要手动将handle传入ACE_Reactor
 46 private:
 47     ACE_SOCK_Stream m_Svr_stream;
 48     TaskThread* m_MsgQueue;
 49 };
 50 
 51 std::list<ServerStream*> g_StreamPool;  //stream pool
 52 
 53 class TaskThread: public ACE_Task<ACE_MT_SYNCH>
 54 {
 55 public:
 56     virtual int svc(void)
 57     {
 58         ACE_Message_Block *Msg;// = new ACE_Message_Block();
 59         while(1)
 60         {
 61             getq(Msg);        //空闲线程阻塞
 62             
 63             ACE_Data_Block *Data_Block = Msg->data_block();
 64             MsgData *pData = reinterpret_cast <MsgData*>(Data_Block->base());
 65             if (0 == pData->DataFlag)
 66             {
 67                 std::list<ServerStream*>::iterator it;
 68                 for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
 69                 {
 70                     if (get_handle() == (*it)->get_handle())
 71                     {
 72                         g_StreamPool.erase(it);
 73                         delete *it;
 74                         break;  
 75                     }
 76                 }
 77                 return 0;
 78             }
 79             char strBuffer[MAX_BUFF_SIZE];
 80             ACE_OS::memset(strBuffer, 0, sizeof(strBuffer));
 81             ACE_OS::memcpy(strBuffer, pData->Data, sizeof(strBuffer));
 82             /*
 83                 这里接口业务代码分发数据
 84             */
 85             ACE_DEBUG((LM_INFO,"[time:%d]recevie msg:%s\n",(int)ACE_OS::time(),strBuffer));
 86             //ACE_SOCK_Stream Stream(*(pData->IOHandle));
 87             //Stream.send("server recive data!\n",sizeof("server recive data!"));  //响应client数据
 88             //ACE_OS::sleep(1);        //模拟业务耗时
 89             Msg->release();         //release,inclue data_block
 90             //ACE_DEBUG((LM_INFO,"thread end queue count:%d\n",msg_queue_->message_count()));
 91         }
 92         return 0;
 93     }
 94 };
 95 typedef ACE_Singleton<TaskThread, ACE_Thread_Mutex> TaskThreadPool;
 96 
 97 ServerStream::ServerStream(TaskThread* pMsgQueue)
 98 {
 99     m_MsgQueue = pMsgQueue;
100 }
101 
102 ServerStream::~ServerStream()
103 {
104     close();
105 }
106 
107 /*------------------------------------------------------
108 *    IO上报流数据,使用select复用上报,这里单线程处理
109 *   原来考虑直接把IO插队列给线程池处理,但是线程池和
110 *   这里是异步操作,线程没有处理队列这条消息ACE底层会
111 *   一直上报这个IO插消息队列,暂时在这里做单线程revc
112 *   考虑epoll边沿触发,一次上报处理
113 *------------------------------------------------------*/
114 int ServerStream::handle_input(ACE_HANDLE fd)
115 {
116     MsgData Message;
117     char strBuffer[MAX_BUFF_SIZE];
118     Message.DataFlag = m_Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //获取数据回select响应避免反复通知
119     if (-1 == Message.DataFlag)
120     {
121         ACE_DEBUG((LM_INFO, ACE_TEXT("recive data error!\n")));
122         return -1;
123     }
124     else if(0 == Message.DataFlag)
125     {
126         close();
127         ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));
128     }        
129     ACE_Data_Block *Data_Block = new ACE_Data_Block; //线程做释放
130     ACE_HANDLE Cli_IO = get_handle();
131 
132     Message.IOHandle = &Cli_IO;
133     ACE_OS::memcpy(Message.Data,strBuffer,sizeof(strBuffer));//传的data可带length信息来适配消息大小
134 
135     char *p = reinterpret_cast <char*>(&Message);
136     Data_Block->base(p,sizeof(Message));
137     ACE_Message_Block* msg = new ACE_Message_Block(Data_Block);    
138     m_MsgQueue->putq(msg);    //put
139     //Data_Block->release();
140     return 0;
141 }
142 
143 void ServerStream::close()
144 {
145     m_Svr_stream.close();
146     ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
147 }
148 
149 class ServerAcceptor : public ACE_Event_Handler
150 {
151 public:
152     ServerAcceptor(int port,char* ip);
153     ~ServerAcceptor();
154     bool open();
155     virtual int handle_input(ACE_HANDLE fd);  //有client连接
156     void close();
157     virtual ACE_HANDLE get_handle(void) const {return m_Svr_aceept.get_handle();}
158 private:
159     ACE_INET_Addr m_Svr_addr;
160     ACE_SOCK_Acceptor m_Svr_aceept;
161 };
162 
163 ServerAcceptor::ServerAcceptor(int port,char* ip):m_Svr_addr(port,ip)
164 {
165     if (!open())    //open listen port
166     {
167         ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!\n")));
168     }
169     else
170     {
171         ACE_DEBUG((LM_INFO, ACE_TEXT("open success!\n")));
172         TaskThreadPool::instance()->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , THREAD_NUM);//创建10个线程处理业务
173     }
174 }
175 
176 ServerAcceptor::~ServerAcceptor()
177 {
178     close();
179     std::list<ServerStream*>::iterator it;
180     for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
181     {
182         if (NULL != (*it))
183         {
184             (*it)->close();
185             delete (*it);
186         }
187     }
188 }
189 
190 bool ServerAcceptor::open()
191 {
192     if (-1 == m_Svr_aceept.open(m_Svr_addr,1))
193     {
194         ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept\n")));
195         m_Svr_aceept.close();
196         return false;
197     }
198     return true;
199 }
200 
201 int ServerAcceptor::handle_input(ACE_HANDLE fd )  
202 {
203     ServerStream *stream = new ServerStream(TaskThreadPool::instance());    //产生新通道
204     if (NULL != stream)
205     {
206         g_StreamPool.push_back(stream);//暂时存储全局变量用于内存管理,优化可增加一个连接管理类管理连接通道
207     }
208     if (m_Svr_aceept.accept(stream->GetStream()) == -1)  //绑定通道
209     {  
210         printf("accept client fail\n");  
211         return -1;  
212     }
213     ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK);  //通道注册到ACE_Reactor
214     ACE_DEBUG((LM_INFO,"User connect success!,ClientPool num = %d\n",g_StreamPool.size()));
215     return 0;
216 }  
217     
218 void ServerAcceptor::close()
219 {
220     ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
221     m_Svr_aceept.close();
222 }
223 
224 int ACE_TMAIN()
225 {
226     ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
227     ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);    //listen port注册到ACE_Reactor
228     
229     ACE_Reactor::instance()->run_reactor_event_loop();  //进入消息循环,有I/O事件回调handle_input
230     return 0;
231 }

代码实现了最简单的完整并发服务器,有部分还值得思考和优化:

1.dispatch进行类封装

2.回话通道的数据流管理进行类封装

3.dispatch消息结构优化

4.dispatch处为单线程,直接传递I/O给线程获取数据流还是获取数据流完成后给线程,如何实现两个线程同步

5.底层I/O复用使用epoll边沿优化

6.业务buff处理优化,进行消息类型划分,进入不同业务处理

由于实现完整服务器代码以最简单形式实现,上述优化在实际商用代码中还需要大量封装优化考虑。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ACE - ACE_Task源码剖析及线程池实现

    Aichen
  • ACE - ACE_Task源码剖析及线程池实现

    原文出自http://www.cnblogs.com/binchen-china,禁止转载。

    Aichen
  • ACE - Reactor模式源码剖析及具体实现(大量源码慎入)

    Aichen
  • ACE - ACE_Task源码剖析及线程池实现

    Aichen
  • ACE - ACE_Task源码剖析及线程池实现

    原文出自http://www.cnblogs.com/binchen-china,禁止转载。

    Aichen
  • ACE - Reactor模式源码剖析及具体实现(大量源码慎入)

    Aichen
  • ACE - Reactor源码总结整理

    Aichen
  • ACE - 代码层次及Socket封装

    Aichen
  • ACE - 代码层次及Socket封装

    原文出自http://www.cnblogs.com/binchen-china,禁止转载。

    Aichen
  • WinRAR曝遗留19年重大漏洞,可完全控制电脑(附解决方法)

    作为目前最受欢迎的电脑解压缩工具之一,WinRAR 号称在全球拥有 5 亿用户。不过最近的情况表明,如果你正在使用这款软件,建议立即升级到 5.70 Beta ...

    AI科技大本营

扫码关注云+社区

领取腾讯云代金券