前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

作者头像
Aichen
发布2018-05-18 10:53:23
1.4K0
发布2018-05-18 10:53:23
举报
文章被收录于专栏:白驹过隙
框架描述

服务器层次:

  • I/O层:对应具体的文件描述符处理,对应ACE中的handle。
  • Dispatch层:事件分发,将I/O事件分发到对应绑定的处理队列等待业务处理,对应ACE中的Event_handle。
  • 业务层:处理具体业务,包含一组线程或进程,并发处理业务。对应ACE中的ACE_Task。

三层结构与五层网络的网络层,传输层,应用层类似对应。

Reactor模式:

  • I/O处理:ACE_Reactor使用select复用完成,将注册进去的IOhandle进行事件监听。
  • 消息队列:ACE_Task中包含一个消息队列。I/O产生事件后执行绑定的Event函数将消息插入对应的消息队列。
  • 服务进程:ACE_Task内可以构造一个线程池,获取消息队列进行业务并发处理。

下面是Reactor代码实现,包含I/O、dispatch、反应器、和线程池:

代码语言:javascript
复制
  1 /*-----------------------------------------------------------------
  2 *  filename:    Reactor.cpp
  3 *  author:        bing
  4 *  time:        2016-06-29 15:26
  5 *  function:    using ACE Reactor implement I/O multiplex server, 
  6 *               include service thread pool.
  7 *-----------------------------------------------------------------*/
  8 #include <ace/INET_Addr.h>
  9 #include <ace/SOCK_Acceptor.h>
 10 #include <ace/SOCK_Stream.h>
 11 #include <ace/Reactor.h> 
 12 #include <ace/Log_Msg.h>
 13 #include "ace/Task.h"
 14 #include "ace/OS.h" 
 15 #include <list>
 16 
 17 #define MAX_BUFF_SIZE     1024
 18 #define LISTEN_PORT     5010
 19 #define SERVER_IP        ACE_LOCALHOST
 20 #define THREAD_NUM        10
 21 
 22 struct MsgData
 23 {
 24     ACE_HANDLE* IOHandle;
 25     int DataFlag;
 26     char Data[MAX_BUFF_SIZE];
 27     MsgData()
 28     {
 29         IOHandle = NULL;
 30         DataFlag = -1;
 31         ACE_OS::memset(Data, 0, sizeof(Data));
 32     }
 33 };
 34 
 35 class TaskThread;
 36 
 37 class ServerStream : public ACE_Event_Handler
 38 {
 39 public:
 40     ServerStream(TaskThread* pMsgQueue);
 41     ~ServerStream();
 42     ACE_SOCK_Stream& GetStream(){return m_Svr_stream;}      //给accept提供接口绑定数据通道
 43     virtual int handle_input(ACE_HANDLE fd);        //I/O触发事件后调用
 44     void close();
 45     virtual ACE_HANDLE get_handle(void) const {return m_Svr_stream.get_handle();}   //不重载需要手动将handle传入ACE_Reactor
 46 private:
 47     ACE_SOCK_Stream m_Svr_stream;
 48     TaskThread* m_MsgQueue;
 49 };
 50 
 51 std::list<ServerStream*> g_StreamPool;  //stream pool
 52 
 53 class TaskThread: public ACE_Task<ACE_MT_SYNCH>
 54 {
 55 public:
 56     virtual int svc(void)
 57     {
 58         ACE_Message_Block *Msg;// = new ACE_Message_Block();
 59         while(1)
 60         {
 61             getq(Msg);        //空闲线程阻塞
 62             
 63             ACE_Data_Block *Data_Block = Msg->data_block();
 64             MsgData *pData = reinterpret_cast <MsgData*>(Data_Block->base());
 65             if (0 == pData->DataFlag)
 66             {
 67                 std::list<ServerStream*>::iterator it;
 68                 for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
 69                 {
 70                     if (get_handle() == (*it)->get_handle())
 71                     {
 72                         g_StreamPool.erase(it);
 73                         delete *it;
 74                         break;  
 75                     }
 76                 }
 77                 return 0;
 78             }
 79             char strBuffer[MAX_BUFF_SIZE];
 80             ACE_OS::memset(strBuffer, 0, sizeof(strBuffer));
 81             ACE_OS::memcpy(strBuffer, pData->Data, sizeof(strBuffer));
 82             /*
 83                 这里接口业务代码分发数据
 84             */
 85             ACE_DEBUG((LM_INFO,"[time:%d]recevie msg:%s\n",(int)ACE_OS::time(),strBuffer));
 86             //ACE_SOCK_Stream Stream(*(pData->IOHandle));
 87             //Stream.send("server recive data!\n",sizeof("server recive data!"));  //响应client数据
 88             //ACE_OS::sleep(1);        //模拟业务耗时
 89             Msg->release();         //release,inclue data_block
 90             //ACE_DEBUG((LM_INFO,"thread end queue count:%d\n",msg_queue_->message_count()));
 91         }
 92         return 0;
 93     }
 94 };
 95 typedef ACE_Singleton<TaskThread, ACE_Thread_Mutex> TaskThreadPool;
 96 
 97 ServerStream::ServerStream(TaskThread* pMsgQueue)
 98 {
 99     m_MsgQueue = pMsgQueue;
100 }
101 
102 ServerStream::~ServerStream()
103 {
104     close();
105 }
106 
107 /*------------------------------------------------------
108 *    IO上报流数据,使用select复用上报,这里单线程处理
109 *   原来考虑直接把IO插队列给线程池处理,但是线程池和
110 *   这里是异步操作,线程没有处理队列这条消息ACE底层会
111 *   一直上报这个IO插消息队列,暂时在这里做单线程revc
112 *   考虑epoll边沿触发,一次上报处理
113 *------------------------------------------------------*/
114 int ServerStream::handle_input(ACE_HANDLE fd)
115 {
116     MsgData Message;
117     char strBuffer[MAX_BUFF_SIZE];
118     Message.DataFlag = m_Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //获取数据回select响应避免反复通知
119     if (-1 == Message.DataFlag)
120     {
121         ACE_DEBUG((LM_INFO, ACE_TEXT("recive data error!\n")));
122         return -1;
123     }
124     else if(0 == Message.DataFlag)
125     {
126         close();
127         ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));
128     }        
129     ACE_Data_Block *Data_Block = new ACE_Data_Block; //线程做释放
130     ACE_HANDLE Cli_IO = get_handle();
131 
132     Message.IOHandle = &Cli_IO;
133     ACE_OS::memcpy(Message.Data,strBuffer,sizeof(strBuffer));//传的data可带length信息来适配消息大小
134 
135     char *p = reinterpret_cast <char*>(&Message);
136     Data_Block->base(p,sizeof(Message));
137     ACE_Message_Block* msg = new ACE_Message_Block(Data_Block);    
138     m_MsgQueue->putq(msg);    //put
139     //Data_Block->release();
140     return 0;
141 }
142 
143 void ServerStream::close()
144 {
145     m_Svr_stream.close();
146     ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
147 }
148 
149 class ServerAcceptor : public ACE_Event_Handler
150 {
151 public:
152     ServerAcceptor(int port,char* ip);
153     ~ServerAcceptor();
154     bool open();
155     virtual int handle_input(ACE_HANDLE fd);  //有client连接
156     void close();
157     virtual ACE_HANDLE get_handle(void) const {return m_Svr_aceept.get_handle();}
158 private:
159     ACE_INET_Addr m_Svr_addr;
160     ACE_SOCK_Acceptor m_Svr_aceept;
161 };
162 
163 ServerAcceptor::ServerAcceptor(int port,char* ip):m_Svr_addr(port,ip)
164 {
165     if (!open())    //open listen port
166     {
167         ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!\n")));
168     }
169     else
170     {
171         ACE_DEBUG((LM_INFO, ACE_TEXT("open success!\n")));
172         TaskThreadPool::instance()->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , THREAD_NUM);//创建10个线程处理业务
173     }
174 }
175 
176 ServerAcceptor::~ServerAcceptor()
177 {
178     close();
179     std::list<ServerStream*>::iterator it;
180     for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
181     {
182         if (NULL != (*it))
183         {
184             (*it)->close();
185             delete (*it);
186         }
187     }
188 }
189 
190 bool ServerAcceptor::open()
191 {
192     if (-1 == m_Svr_aceept.open(m_Svr_addr,1))
193     {
194         ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept\n")));
195         m_Svr_aceept.close();
196         return false;
197     }
198     return true;
199 }
200 
201 int ServerAcceptor::handle_input(ACE_HANDLE fd )  
202 {
203     ServerStream *stream = new ServerStream(TaskThreadPool::instance());    //产生新通道
204     if (NULL != stream)
205     {
206         g_StreamPool.push_back(stream);//暂时存储全局变量用于内存管理,优化可增加一个连接管理类管理连接通道
207     }
208     if (m_Svr_aceept.accept(stream->GetStream()) == -1)  //绑定通道
209     {  
210         printf("accept client fail\n");  
211         return -1;  
212     }
213     ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK);  //通道注册到ACE_Reactor
214     ACE_DEBUG((LM_INFO,"User connect success!,ClientPool num = %d\n",g_StreamPool.size()));
215     return 0;
216 }  
217     
218 void ServerAcceptor::close()
219 {
220     ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
221     m_Svr_aceept.close();
222 }
223 
224 int ACE_TMAIN()
225 {
226     ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
227     ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);    //listen port注册到ACE_Reactor
228     
229     ACE_Reactor::instance()->run_reactor_event_loop();  //进入消息循环,有I/O事件回调handle_input
230     return 0;
231 }

代码实现了最简单的完整并发服务器,有部分还值得思考和优化:

1.dispatch进行类封装

2.回话通道的数据流管理进行类封装

3.dispatch消息结构优化

4.dispatch处为单线程,直接传递I/O给线程获取数据流还是获取数据流完成后给线程,如何实现两个线程同步

5.底层I/O复用使用epoll边沿优化

6.业务buff处理优化,进行消息类型划分,进入不同业务处理

由于实现完整服务器代码以最简单形式实现,上述优化在实际商用代码中还需要大量封装优化考虑。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016-07-01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 框架描述
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档