服务器层次:
三层结构与五层网络的网络层,传输层,应用层类似对应。
Reactor模式:
下面是Reactor代码实现,包含I/O、dispatch、反应器、和线程池:
1 /*-----------------------------------------------------------------
2 * filename: Reactor.cpp
3 * author: bing
4 * time: 2016-06-29 15:26
5 * function: using ACE Reactor implement I/O multiplex server,
6 * include service thread pool.
7 *-----------------------------------------------------------------*/
8 #include <ace/INET_Addr.h>
9 #include <ace/SOCK_Acceptor.h>
10 #include <ace/SOCK_Stream.h>
11 #include <ace/Reactor.h>
12 #include <ace/Log_Msg.h>
13 #include "ace/Task.h"
14 #include "ace/OS.h"
15 #include <list>
16
17 #define MAX_BUFF_SIZE 1024
18 #define LISTEN_PORT 5010
19 #define SERVER_IP ACE_LOCALHOST
20 #define THREAD_NUM 10
21
22 struct MsgData
23 {
24 ACE_HANDLE* IOHandle;
25 int DataFlag;
26 char Data[MAX_BUFF_SIZE];
27 MsgData()
28 {
29 IOHandle = NULL;
30 DataFlag = -1;
31 ACE_OS::memset(Data, 0, sizeof(Data));
32 }
33 };
34
35 class TaskThread;
36
37 class ServerStream : public ACE_Event_Handler
38 {
39 public:
40 ServerStream(TaskThread* pMsgQueue);
41 ~ServerStream();
42 ACE_SOCK_Stream& GetStream(){return m_Svr_stream;} //给accept提供接口绑定数据通道
43 virtual int handle_input(ACE_HANDLE fd); //I/O触发事件后调用
44 void close();
45 virtual ACE_HANDLE get_handle(void) const {return m_Svr_stream.get_handle();} //不重载需要手动将handle传入ACE_Reactor
46 private:
47 ACE_SOCK_Stream m_Svr_stream;
48 TaskThread* m_MsgQueue;
49 };
50
51 std::list<ServerStream*> g_StreamPool; //stream pool
52
53 class TaskThread: public ACE_Task<ACE_MT_SYNCH>
54 {
55 public:
56 virtual int svc(void)
57 {
58 ACE_Message_Block *Msg;// = new ACE_Message_Block();
59 while(1)
60 {
61 getq(Msg); //空闲线程阻塞
62
63 ACE_Data_Block *Data_Block = Msg->data_block();
64 MsgData *pData = reinterpret_cast <MsgData*>(Data_Block->base());
65 if (0 == pData->DataFlag)
66 {
67 std::list<ServerStream*>::iterator it;
68 for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
69 {
70 if (get_handle() == (*it)->get_handle())
71 {
72 g_StreamPool.erase(it);
73 delete *it;
74 break;
75 }
76 }
77 return 0;
78 }
79 char strBuffer[MAX_BUFF_SIZE];
80 ACE_OS::memset(strBuffer, 0, sizeof(strBuffer));
81 ACE_OS::memcpy(strBuffer, pData->Data, sizeof(strBuffer));
82 /*
83 这里接口业务代码分发数据
84 */
85 ACE_DEBUG((LM_INFO,"[time:%d]recevie msg:%s\n",(int)ACE_OS::time(),strBuffer));
86 //ACE_SOCK_Stream Stream(*(pData->IOHandle));
87 //Stream.send("server recive data!\n",sizeof("server recive data!")); //响应client数据
88 //ACE_OS::sleep(1); //模拟业务耗时
89 Msg->release(); //release,inclue data_block
90 //ACE_DEBUG((LM_INFO,"thread end queue count:%d\n",msg_queue_->message_count()));
91 }
92 return 0;
93 }
94 };
95 typedef ACE_Singleton<TaskThread, ACE_Thread_Mutex> TaskThreadPool;
96
97 ServerStream::ServerStream(TaskThread* pMsgQueue)
98 {
99 m_MsgQueue = pMsgQueue;
100 }
101
102 ServerStream::~ServerStream()
103 {
104 close();
105 }
106
107 /*------------------------------------------------------
108 * IO上报流数据,使用select复用上报,这里单线程处理
109 * 原来考虑直接把IO插队列给线程池处理,但是线程池和
110 * 这里是异步操作,线程没有处理队列这条消息ACE底层会
111 * 一直上报这个IO插消息队列,暂时在这里做单线程revc
112 * 考虑epoll边沿触发,一次上报处理
113 *------------------------------------------------------*/
114 int ServerStream::handle_input(ACE_HANDLE fd)
115 {
116 MsgData Message;
117 char strBuffer[MAX_BUFF_SIZE];
118 Message.DataFlag = m_Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //获取数据回select响应避免反复通知
119 if (-1 == Message.DataFlag)
120 {
121 ACE_DEBUG((LM_INFO, ACE_TEXT("recive data error!\n")));
122 return -1;
123 }
124 else if(0 == Message.DataFlag)
125 {
126 close();
127 ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));
128 }
129 ACE_Data_Block *Data_Block = new ACE_Data_Block; //线程做释放
130 ACE_HANDLE Cli_IO = get_handle();
131
132 Message.IOHandle = &Cli_IO;
133 ACE_OS::memcpy(Message.Data,strBuffer,sizeof(strBuffer));//传的data可带length信息来适配消息大小
134
135 char *p = reinterpret_cast <char*>(&Message);
136 Data_Block->base(p,sizeof(Message));
137 ACE_Message_Block* msg = new ACE_Message_Block(Data_Block);
138 m_MsgQueue->putq(msg); //put
139 //Data_Block->release();
140 return 0;
141 }
142
143 void ServerStream::close()
144 {
145 m_Svr_stream.close();
146 ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
147 }
148
149 class ServerAcceptor : public ACE_Event_Handler
150 {
151 public:
152 ServerAcceptor(int port,char* ip);
153 ~ServerAcceptor();
154 bool open();
155 virtual int handle_input(ACE_HANDLE fd); //有client连接
156 void close();
157 virtual ACE_HANDLE get_handle(void) const {return m_Svr_aceept.get_handle();}
158 private:
159 ACE_INET_Addr m_Svr_addr;
160 ACE_SOCK_Acceptor m_Svr_aceept;
161 };
162
163 ServerAcceptor::ServerAcceptor(int port,char* ip):m_Svr_addr(port,ip)
164 {
165 if (!open()) //open listen port
166 {
167 ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!\n")));
168 }
169 else
170 {
171 ACE_DEBUG((LM_INFO, ACE_TEXT("open success!\n")));
172 TaskThreadPool::instance()->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , THREAD_NUM);//创建10个线程处理业务
173 }
174 }
175
176 ServerAcceptor::~ServerAcceptor()
177 {
178 close();
179 std::list<ServerStream*>::iterator it;
180 for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
181 {
182 if (NULL != (*it))
183 {
184 (*it)->close();
185 delete (*it);
186 }
187 }
188 }
189
190 bool ServerAcceptor::open()
191 {
192 if (-1 == m_Svr_aceept.open(m_Svr_addr,1))
193 {
194 ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept\n")));
195 m_Svr_aceept.close();
196 return false;
197 }
198 return true;
199 }
200
201 int ServerAcceptor::handle_input(ACE_HANDLE fd )
202 {
203 ServerStream *stream = new ServerStream(TaskThreadPool::instance()); //产生新通道
204 if (NULL != stream)
205 {
206 g_StreamPool.push_back(stream);//暂时存储全局变量用于内存管理,优化可增加一个连接管理类管理连接通道
207 }
208 if (m_Svr_aceept.accept(stream->GetStream()) == -1) //绑定通道
209 {
210 printf("accept client fail\n");
211 return -1;
212 }
213 ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK); //通道注册到ACE_Reactor
214 ACE_DEBUG((LM_INFO,"User connect success!,ClientPool num = %d\n",g_StreamPool.size()));
215 return 0;
216 }
217
218 void ServerAcceptor::close()
219 {
220 ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
221 m_Svr_aceept.close();
222 }
223
224 int ACE_TMAIN()
225 {
226 ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
227 ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK); //listen port注册到ACE_Reactor
228
229 ACE_Reactor::instance()->run_reactor_event_loop(); //进入消息循环,有I/O事件回调handle_input
230 return 0;
231 }
代码实现了最简单的完整并发服务器,有部分还值得思考和优化:
1.dispatch进行类封装
2.回话通道的数据流管理进行类封装
3.dispatch消息结构优化
4.dispatch处为单线程,直接传递I/O给线程获取数据流还是获取数据流完成后给线程,如何实现两个线程同步
5.底层I/O复用使用epoll边沿优化
6.业务buff处理优化,进行消息类型划分,进入不同业务处理
由于实现完整服务器代码以最简单形式实现,上述优化在实际商用代码中还需要大量封装优化考虑。