在之前的文章中提到过Reactor模式和Preactor模式,现在利用ACE的Reactor来实现一个基于Reactor框架的服务器。
首先回顾下Reactor模式和Preactor模式。
Reactor模式:
Reactor模式实现非常简单,使用同步IO模型,即业务线程处理数据需要主动等待或询问,主要特点是利用epoll监听listen描述符是否有响应,及时将客户连接信息放于一个队列,epoll和队列都是在主进程/线程中,由子进程/线程来接管描述符传输的数据,对描述符进行下一步操作,包括connect和数据读写。主程读写就绪事件。整个过程都需要先获取描述符状态,在状态允许下再执行任务。
大致流程图如下:
Preactor模式:
Preactor模式完全将IO处理和业务分离,使用异步IO模型,即内核完成数据处理后主动通知给应用处理,主进程/线程不仅要完成listen任务,还需要完成内核数据缓冲区的映射,直接将数据buff传递给业务线程,业务线程只需要处理业务逻辑即可。整个过程直接推送任务,描述符状态是否允许执行任务由内核去调度处理。
大致流程如下:
ACE的Reactor模式
所有服务器都可以归纳为以下三层:
ACE的Reactor处于I/O和Dispatch层。提供了I/O监控和消息Dispatch。其中I/O需要用户以handle的形式提供到ACE_Reactor内。
Dispatch需要以ACE_Event_Handler为载体,也就是说要实现一个完整的Reactor只依赖ACE_Reactor类是无法完成的。
上篇博文利用ACE的Socket可以看出一个ACE_SOCK_Acceptor和ACE_SOCK_Stream就可以完成服务器代码。现在要做的是,
1.引入Reactor,把Acceptor和Stream两个I/O分别放在两个继承于ACE_Event_Handler的类中注册给ACE_Reactor。
2.主函数注册包含ACE_SOCK_Acceptor的类到ACE_Reactor中,当ACE_SOCK_Acceptor收到数据即有客户端连接后再给对应的客户端创建一个ACE_SOCK_Stream通道并注册到ACE_Reactor中。
使用ACE_Reactor实现的Server代码:
1 #include <ace/INET_Addr.h>
2 #include <ace/SOCK_Acceptor.h>
3 #include <ace/SOCK_Stream.h>
4 #include <ace/Reactor.h>
5 #include <ace/Log_Msg.h>
6 #include <list>
7
8 #define MAX_BUFF_SIZE 1024
9 #define LISTEN_PORT 5010
10 #define SERVER_IP ACE_LOCALHOST
11
12 class ServerStream : public ACE_Event_Handler
13 {
14 public:
15 ServerStream();
16 ~ServerStream();
17 ACE_SOCK_Stream& GetStream(){return Svr_stream;} //给accept提供接口绑定数据通道
18 virtual int handle_input(ACE_HANDLE fd); //I/O触发事件后调用
19 void close();
20 virtual ACE_HANDLE get_handle(void) const {return Svr_stream.get_handle();} //不重载需要手动将handle传入ACE_Reactor
21 private:
22 ACE_INET_Addr Cli_addr;
23 ACE_SOCK_Stream Svr_stream;
24 };
25
26 ServerStream::ServerStream()
27 {
28
29 }
30
31 ServerStream::~ServerStream()
32 {
33 close();
34 }
35
36 int ServerStream::handle_input(ACE_HANDLE fd)
37 {
38 char strBuffer[MAX_BUFF_SIZE];
39 int byte = Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //可读数据
40 if (-1 == byte)
41 {
42 ACE_DEBUG((LM_INFO, ACE_TEXT("receive data failed\n")));
43 }
44 else if(0 == byte)
45 {
46 close();
47 ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));
48 }
49 else
50 {
51 ACE_DEBUG((LM_INFO, ACE_TEXT("receive from client: %s\n"),strBuffer));
52 }
53 }
54
55 void ServerStream::close()
56 {
57 Svr_stream.close();
58 ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
59 //delete this;
60 }
61
62 class ServerAcceptor : public ACE_Event_Handler
63 {
64 public:
65 ServerAcceptor(int port,char* ip);
66 ~ServerAcceptor();
67 bool open();
68 virtual int handle_input(ACE_HANDLE fd); //有client连接
69 void close();
70 virtual ACE_HANDLE get_handle(void) const {return Svr_aceept.get_handle();}
71 private:
72 ACE_INET_Addr Svr_addr;
73 ACE_SOCK_Acceptor Svr_aceept;
74 std::list<ServerStream*> m_streamPool; //stream pool
75 };
76
77 ServerAcceptor::ServerAcceptor(int port,char* ip):Svr_addr(port,ip)
78 {
79 if (!open()) //open listen port
80 {
81 ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!\n")));
82 }
83 else
84 {
85 ACE_DEBUG((LM_INFO, ACE_TEXT("open success!\n")));
86 }
87 }
88
89 ServerAcceptor::~ServerAcceptor()
90 {
91 close();
92 std::list<ServerStream*>::iterator it;
93 for (it = m_streamPool.begin();it != m_streamPool.end();++it)
94 {
95 if (NULL != (*it))
96 {
97 (*it)->close();
98 delete (*it);
99 }
100 }
101 }
102
103 bool ServerAcceptor::open()
104 {
105 if (-1 == Svr_aceept.open(Svr_addr,1))
106 {
107 ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept\n")));
108 Svr_aceept.close();
109 return false;
110 }
111 return true;
112 }
113
114 int ServerAcceptor::handle_input(ACE_HANDLE fd )
115 {
116 ServerStream *stream = new ServerStream(); //产生新通道
117 if (NULL != stream)
118 {
119 m_streamPool.push_back(stream);
120 }
121 if (Svr_aceept.accept(stream->GetStream()) == -1) //绑定通道
122 {
123 printf("accept client fail\n");
124 return -1;
125 }
126 ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK); //通道注册到ACE_Reactor
127 ACE_DEBUG((LM_ERROR,ACE_TEXT("User connect success!\n")));
128 }
129
130 void ServerAcceptor::close()
131 {
132 ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
133 Svr_aceept.close();
134 }
135
136 int ACE_TMAIN()
137 {
138 ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
139 ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK); //listen port注册到ACE_Reactor
140 ACE_Reactor::instance()->run_reactor_event_loop(); //进入消息循环,有I/O事件回调handle_input
141 return 0;
142 }
测试结果:
终端1:
终端2:
终端3:
ACE_Reactor内部已经帮我们实现了IO复用。
有了Reactor的demo后,下面一步步查看ACE_Reactor内部是如何运作的:
ACE_Reactor注册EVENT,重载了一个register_handler:
1 int
2 ACE_Reactor::register_handler (ACE_Event_Handler *event_handler,
3 ACE_Reactor_Mask mask)
4 {
5 // Remember the old reactor.
6 ACE_Reactor *old_reactor = event_handler->reactor ();
7
8 // Assign *this* <Reactor> to the <Event_Handler>.
9 event_handler->reactor (this);
10
11 int result = this->implementation ()->register_handler (event_handler, mask);
12 if (result == -1)
13 // Reset the old reactor in case of failures.
14 event_handler->reactor (old_reactor);
15
16 return result;
17 }
第11行实际是ACE_Reactor_Impl *implementation (void) const;在做实际功能。进一步查看implementation 是如何注册的。
1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler
3 (ACE_Event_Handler *handler,
4 ACE_Reactor_Mask mask)
5 {
6 ACE_TRACE ("ACE_Select_Reactor_T::register_handler");
7 ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1));
8 return this->register_handler_i (handler->get_handle (), handler, mask);
9 }
这里开始大量使用模板,这里重载了两个在最后调用register_handler_i,在第8行,可以看到调用了get_handle,也就是我们重载的那个函数,所以我们不需要传入ACE_Reactor,它在这一步调用了我们重新的虚函数,获得了handle,当然也可以不做重写,手动传入handle。这个handle就是我们要处理的I/O,而handler则是我们继承ACE_Event_Handler的类。
1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i
3 (ACE_HANDLE handle,
4 ACE_Event_Handler *event_handler,
5 ACE_Reactor_Mask mask)
6 {
7 ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i");
8
9 // Insert the <handle, event_handle> tuple into the Handler
10 // Repository.
11 return this->handler_rep_.bind (handle, event_handler, mask);
12 }
到这里,我们看到代码handler_rep_将hande和event_handler绑定了起来,handler_rep_在Select_Reactor_Base.h内为ACE_Select_Reactor_Impl的成员变量。下面我们继续看bind实际是在做什么操作。
1 // Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>.
2 int
3 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
4 ACE_Event_Handler *event_handler,
5 ACE_Reactor_Mask mask)
6 {
7 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
8
9 if (event_handler == 0)
10 return -1;
11
12 if (handle == ACE_INVALID_HANDLE)
13 handle = event_handler->get_handle ();
14
15 if (this->invalid_handle (handle))
16 return -1;
17
18 // Is this handle already in the Reactor?
19 bool existing_handle = false;
20
21 #if defined (ACE_WIN32)
22
23 map_type::ENTRY * entry = 0;
24
25 int const result =
26 this->event_handlers_.bind (handle, event_handler, entry);
27
28 if (result == -1)
29 {
30 return -1;
31 }
32 else if (result == 1) // Entry already exists.
33 {
34 // Cannot use a different handler for an existing handle.
35 if (event_handler != entry->item ())
36 {
37 return -1;
38 }
39 else
40 {
41 // Remember that this handle is already registered in the
42 // Reactor.
43 existing_handle = true;
44 }
45 }
46
47 #else
48
49 // Check if this handle is already registered.
50 ACE_Event_Handler * const current_handler =
51 this->event_handlers_[handle];
52
53 if (current_handler)
54 {
55 // Cannot use a different handler for an existing handle.
56 if (current_handler != event_handler)
57 return -1;
58
59 // Remember that this handle is already registered in the
60 // Reactor.
61 existing_handle = true;
62 }
63
64 this->event_handlers_[handle] = event_handler;
65
66 if (this->max_handlep1_ < handle + 1)
67 this->max_handlep1_ = handle + 1;
68
69 #endif /* ACE_WIN32 */
70
71 if (this->select_reactor_.is_suspended_i (handle))
72 {
73 this->select_reactor_.bit_ops (handle,
74 mask,
75 this->select_reactor_.suspend_set_,
76 ACE_Reactor::ADD_MASK);
77 }
78 else
79 {
80 this->select_reactor_.bit_ops (handle,
81 mask,
82 this->select_reactor_.wait_set_,
83 ACE_Reactor::ADD_MASK);
84
85 // Note the fact that we've changed the state of the <wait_set_>,
86 // which is used by the dispatching loop to determine whether it can
87 // keep going or if it needs to reconsult select().
88 // this->select_reactor_.state_changed_ = 1;
89 }
90
91 // If new entry, call add_reference() if needed.
92 if (!existing_handle)
93 event_handler->add_reference ();
94
95 return 0;
96 }
这里非常关键,第50行,event_handlers_实则是一个容器,handle和event_hander以index的方式绑定了起来,存储在了一个容器内,第80行还有一行关键代码,ADD_MASK形式的操作加入到了wait_set_成员内。
注册的代码到这里为止,ACE_Reactor实际上调用了几层N个文件,其实就是把handle,即I/O和handler,即继承ACE_Event_Handler的类绑定在了一个容器里。下面看ACE_Reactor是如何进行消息循环的。
1 int
2 ACE_Reactor::run_reactor_event_loop (REACTOR_EVENT_HOOK eh)
3 {
4 ACE_TRACE ("ACE_Reactor::run_reactor_event_loop");
5
6 if (this->reactor_event_loop_done ())
7 return 0;
8
9 while (1)
10 {
11 int const result = this->implementation_->handle_events ();
12
13 if (eh != 0 && (*eh)(this))
14 continue;
15 else if (result == -1 && this->implementation_->deactivated ())
16 return 0;
17 else if (result == -1)
18 return -1;
19 }
20
21 ACE_NOTREACHED (return 0;)
22 }
同样,将loop交给了ACE_Reactor_Impl *implementation (void) const;操作。
继续跟踪
1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events
3 (ACE_Time_Value &max_wait_time)
4 {
5 ACE_TRACE ("ACE_Select_Reactor_T::handle_events");
6
7 return this->handle_events (&max_wait_time);
8 }
再次到了模板,调用handle_events,下面到了关键代码
1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i
3 (ACE_Time_Value *max_wait_time)
4 {
5 int result = -1;
6
7 ACE_SEH_TRY
8 {
9 // We use the data member dispatch_set_ as the current dispatch
10 // set.
11
12 // We need to start from a clean dispatch_set
13 this->dispatch_set_.rd_mask_.reset ();
14 this->dispatch_set_.wr_mask_.reset ();
15 this->dispatch_set_.ex_mask_.reset ();
16
17 int number_of_active_handles =
18 this->wait_for_multiple_events (this->dispatch_set_,
19 max_wait_time);
20
21 result =
22 this->dispatch (number_of_active_handles,
23 this->dispatch_set_);
24 }
25 ACE_SEH_EXCEPT (this->release_token ())
26 {
27 // As it stands now, we catch and then rethrow all Win32
28 // structured exceptions so that we can make sure to release the
29 // <token_> lock correctly.
30 }
31
32 return result;
33 }
第18行wait_for_multiple_events和第22行dispatch。分别做了两件非常关键的事。
1 // Must be called with lock held.
2
3 template <class ACE_SELECT_REACTOR_TOKEN> int
4 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::wait_for_multiple_events
5 (ACE_Select_Reactor_Handle_Set &dispatch_set,
6 ACE_Time_Value *max_wait_time)
7 {
8 ACE_TRACE ("ACE_Select_Reactor_T::wait_for_multiple_events");
9 ACE_Time_Value timer_buf (0);
10 ACE_Time_Value *this_timeout = 0;
11
12 int number_of_active_handles = this->any_ready (dispatch_set);
13
14 // If there are any bits enabled in the <ready_set_> then we'll
15 // handle those first, otherwise we'll block in <select>.
16
17 if (number_of_active_handles == 0)
18 {
19 do
20 {
21 if (this->timer_queue_ == 0)
22 return 0;
23
24 this_timeout =
25 this->timer_queue_->calculate_timeout (max_wait_time,
26 &timer_buf);
27 #ifdef ACE_WIN32
28 // This arg is ignored on Windows and causes pointer
29 // truncation warnings on 64-bit compiles.
30 int const width = 0;
31 #else
32 int const width = this->handler_rep_.max_handlep1 ();
33 #endif /* ACE_WIN32 */
34
35 dispatch_set.rd_mask_ = this->wait_set_.rd_mask_;
36 dispatch_set.wr_mask_ = this->wait_set_.wr_mask_;
37 dispatch_set.ex_mask_ = this->wait_set_.ex_mask_;
38 number_of_active_handles = ACE_OS::select (width,
39 dispatch_set.rd_mask_,
40 dispatch_set.wr_mask_,
41 dispatch_set.ex_mask_,
42 this_timeout);
43 }
44 while (number_of_active_handles == -1 && this->handle_error () > 0);
45
46 if (number_of_active_handles > 0)
47 {
48 #if !defined (ACE_WIN32)
49 // Resynchronize the fd_sets so their "max" is set properly.
50 dispatch_set.rd_mask_.sync (this->handler_rep_.max_handlep1 ());
51 dispatch_set.wr_mask_.sync (this->handler_rep_.max_handlep1 ());
52 dispatch_set.ex_mask_.sync (this->handler_rep_.max_handlep1 ());
53 #endif /* ACE_WIN32 */
54 }
55 else if (number_of_active_handles == -1)
56 {
57 // Normally, select() will reset the bits in dispatch_set
58 // so that only those filed descriptors that are ready will
59 // have bits set. However, when an error occurs, the bit
60 // set remains as it was when the select call was first made.
61 // Thus, we now have a dispatch_set that has every file
62 // descriptor that was originally waited for, which is not
63 // correct. We must clear all the bit sets because we
64 // have no idea if any of the file descriptors is ready.
65 //
66 // NOTE: We dont have a test case to reproduce this
67 // problem. But pleae dont ignore this and remove it off.
68 dispatch_set.rd_mask_.reset ();
69 dispatch_set.wr_mask_.reset ();
70 dispatch_set.ex_mask_.reset ();
71 }
72 }
73
74 // Return the number of events to dispatch.
75 return number_of_active_handles;
76 }
第35行熟悉的变量wait_set_和第38行函数select,到这里发现,Reactor的I/O监控,就是利用select函数监控之前注册进去且ADD到wait_set_的handle,即I/O。
当有I/O事件,即返回值的number_of_active_handles不为0时,将进行dispatch。
1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch
3 (int active_handle_count,
4 ACE_Select_Reactor_Handle_Set &dispatch_set)
5 {
6 ACE_TRACE ("ACE_Select_Reactor_T::dispatch");
7
8 int io_handlers_dispatched = 0;
9 int other_handlers_dispatched = 0;
10 int signal_occurred = 0;
11 // The following do/while loop keeps dispatching as long as there
12 // are still active handles. Note that the only way we should ever
13 // iterate more than once through this loop is if signals occur
14 // while we're dispatching other handlers.
15
16 do
17 {
18 // We expect that the loop will decrease the number of active
19 // handles in each iteration. If it does not, then something is
20 // inconsistent in the state of the Reactor and we should avoid
21 // the loop. Please read the comments on bug 2540 for more
22 // details.
23 int initial_handle_count = active_handle_count;
24
25 // Note that we keep track of changes to our state. If any of
26 // the dispatch_*() methods below return -1 it means that the
27 // <wait_set_> state has changed as the result of an
28 // <ACE_Event_Handler> being dispatched. This means that we
29 // need to bail out and rerun the select() loop since our
30 // existing notion of handles in <dispatch_set> may no longer be
31 // correct.
32 //
33 // In the beginning, our state starts out unchanged. After
34 // every iteration (i.e., due to signals), our state starts out
35 // unchanged again.
36
37 this->state_changed_ = false;
38
39 // Perform the Template Method for dispatching all the handlers.
40
41 // First check for interrupts.
42 if (active_handle_count == -1)
43 {
44 // Bail out -- we got here since <select> was interrupted.
45 if (ACE_Sig_Handler::sig_pending () != 0)
46 {
47 ACE_Sig_Handler::sig_pending (0);
48
49 // If any HANDLES in the <ready_set_> are activated as a
50 // result of signals they should be dispatched since
51 // they may be time critical...
52 active_handle_count = this->any_ready (dispatch_set);
53
54 // Record the fact that the Reactor has dispatched a
55 // handle_signal() method. We need this to return the
56 // appropriate count below.
57 signal_occurred = 1;
58 }
59 else
60 return -1;
61 }
62
63 // Handle timers early since they may have higher latency
64 // constraints than I/O handlers. Ideally, the order of
65 // dispatching should be a strategy...
66 else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1)
67 // State has changed or timer queue has failed, exit loop.
68 break;
69
70 // Check to see if there are no more I/O handles left to
71 // dispatch AFTER we've handled the timers...
72 else if (active_handle_count == 0)
73 return io_handlers_dispatched
74 + other_handlers_dispatched
75 + signal_occurred;
76
77 // Next dispatch the notification handlers (if there are any to
78 // dispatch). These are required to handle multi-threads that
79 // are trying to update the <Reactor>.
80
81 else if (this->dispatch_notification_handlers
82 (dispatch_set,
83 active_handle_count,
84 other_handlers_dispatched) == -1)
85 // State has changed or a serious failure has occured, so exit
86 // loop.
87 break;
88
89 // Finally, dispatch the I/O handlers.
90 else if (this->dispatch_io_handlers
91 (dispatch_set,
92 active_handle_count,
93 io_handlers_dispatched) == -1)
94 // State has changed, so exit loop.
95 break;
96
97 // if state changed, we need to re-eval active_handle_count,
98 // so we will not end with an endless loop
99 if (initial_handle_count == active_handle_count
100 || this->state_changed_)
101 {
102 active_handle_count = this->any_ready (dispatch_set);
103 }
104 }
105 while (active_handle_count > 0);
106
107 return io_handlers_dispatched + other_handlers_dispatched + signal_occurred;
108 }
这里一步步按顺序进行判断分发,进入dispatch_notification_handlers,调用到Select_Reactor_Base.cpp的dispatch_notifications,到这里终于看到熟悉的函数。
1 // Handles pending threads (if any) that are waiting to unblock the
2 // Select_Reactor.
3
4 int
5 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
6 ACE_Handle_Set &rd_mask)
7 {
8 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
9
10 ACE_HANDLE const read_handle =
11 this->notification_pipe_.read_handle ();
12
13 if (read_handle != ACE_INVALID_HANDLE
14 && rd_mask.is_set (read_handle))
15 {
16 --number_of_active_handles;
17 rd_mask.clr_bit (read_handle);
18 return this->handle_input (read_handle);
19 }
20 else
21 return 0;
22 }
第18行,调用了自己的handle_input,还不是最开始外部重载的handle_input,查看最后这个函数。
1 int
2 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
3 {
4 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
5 // Precondition: this->select_reactor_.token_.current_owner () ==
6 // ACE_Thread::self ();
7
8 int number_dispatched = 0;
9 int result = 0;
10 ACE_Notification_Buffer buffer;
11
12 // If there is only one buffer in the pipe, this will loop and call
13 // read_notify_pipe() twice. The first time will read the buffer, and
14 // the second will read the fact that the pipe is empty.
15 while ((result = this->read_notify_pipe (handle, buffer)) > 0)
16 {
17 // Dispatch the buffer
18 // NOTE: We count only if we made any dispatches ie. upcalls.
19 if (this->dispatch_notify (buffer) > 0)
20 ++number_dispatched;
21
22 // Bail out if we've reached the <notify_threshold_>. Note that
23 // by default <notify_threshold_> is -1, so we'll loop until all
24 // the notifications in the pipe have been dispatched.
25 if (number_dispatched == this->max_notify_iterations_)
26 break;
27 }
28
29 // Reassign number_dispatched to -1 if things have gone seriously
30 // wrong.
31 if (result < 0)
32 number_dispatched = -1;
33
34 // Enqueue ourselves into the list of waiting threads. When we
35 // reacquire the token we'll be off and running again with ownership
36 // of the token. The postcondition of this call is that
37 // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
38 this->select_reactor_->renew ();
39 return number_dispatched;
40 }
第15行,取数据,第19行dispatch_notify
1 int
2 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
3 {
4 int result = 0;
5
6 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
7 // Dispatch one message from the notify queue, and put another in
8 // the pipe if one is available. Remember, the idea is to keep
9 // exactly one message in the pipe at a time.
10
11 bool more_messages_queued = false;
12 ACE_Notification_Buffer next;
13
14 result = notification_queue_.pop_next_notification(buffer,
15 more_messages_queued,
16 next);
17
18 if (result == 0 || result == -1)
19 {
20 return result;
21 }
22
23 if(more_messages_queued)
24 {
25 (void) ACE::send(this->notification_pipe_.write_handle(),
26 (char *)&next, sizeof(ACE_Notification_Buffer));
27 }
28 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
29
30 // If eh == 0 then another thread is unblocking the
31 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
32 // internal structures. Otherwise, we need to dispatch the
33 // appropriate handle_* method on the <ACE_Event_Handler> pointer
34 // we've been passed.
35 if (buffer.eh_ != 0)
36 {
37 ACE_Event_Handler *event_handler = buffer.eh_;
38
39 bool const requires_reference_counting =
40 event_handler->reference_counting_policy ().value () ==
41 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
42
43 switch (buffer.mask_)
44 {
45 case ACE_Event_Handler::READ_MASK:
46 case ACE_Event_Handler::ACCEPT_MASK:
47 result = event_handler->handle_input (ACE_INVALID_HANDLE);
48 break;
49 case ACE_Event_Handler::WRITE_MASK:
50 result = event_handler->handle_output (ACE_INVALID_HANDLE);
51 break;
52 case ACE_Event_Handler::EXCEPT_MASK:
53 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
54 break;
55 case ACE_Event_Handler::QOS_MASK:
56 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
57 break;
58 case ACE_Event_Handler::GROUP_QOS_MASK:
59 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
60 break;
61 default:
62 // Should we bail out if we get an invalid mask?
63 ACE_ERROR ((LM_ERROR,
64 ACE_TEXT ("invalid mask = %d\n"),
65 buffer.mask_));
66 }
67
68 if (result == -1)
69 event_handler->handle_close (ACE_INVALID_HANDLE,
70 ACE_Event_Handler::EXCEPT_MASK);
71
72 if (requires_reference_counting)
73 {
74 event_handler->remove_reference ();
75 }
76 }
77
78 return 1;
79 }
到这里,终于看到调用到我们最开始继承ACE_Event_Handler重写的那个回调handle_input()了。
至此,ACE_Reactor内部源码的执行过程全部结束,其实ACE并没有做非常特别的事,注册利用一个容器进行I/O和回调方法的绑定,I/O复用利用select,最后发生I/O事件找到对应的event函数handle_input执行。
怪不得网上有人抱怨ACE代码臃肿了,这些我们关心“简单”过程的代码就这么多的复杂用法和调用,更不用说我们还没用上的了,但是ACE提供的Reactor框架确实方便了我们使用,也提供了可靠的移植性和性能。