专栏首页白驹过隙ACE - Reactor模式源码剖析及具体实现(大量源码慎入)

ACE - Reactor模式源码剖析及具体实现(大量源码慎入)

在之前的文章中提到过Reactor模式和Preactor模式,现在利用ACE的Reactor来实现一个基于Reactor框架的服务器。

首先回顾下Reactor模式和Preactor模式。

Reactor模式:

Reactor模式实现非常简单,使用同步IO模型,即业务线程处理数据需要主动等待或询问,主要特点是利用epoll监听listen描述符是否有响应,及时将客户连接信息放于一个队列,epoll和队列都是在主进程/线程中,由子进程/线程来接管描述符传输的数据,对描述符进行下一步操作,包括connect和数据读写。主程读写就绪事件。整个过程都需要先获取描述符状态,在状态允许下再执行任务。

大致流程图如下:

Preactor模式:

Preactor模式完全将IO处理和业务分离,使用异步IO模型,即内核完成数据处理后主动通知给应用处理,主进程/线程不仅要完成listen任务,还需要完成内核数据缓冲区的映射,直接将数据buff传递给业务线程,业务线程只需要处理业务逻辑即可。整个过程直接推送任务,描述符状态是否允许执行任务由内核去调度处理。

大致流程如下:

ACE的Reactor模式

所有服务器都可以归纳为以下三层:

  • I/O:处理底层IO事件
  • Dispatch:事件消息派发
  • Service:业务处理

ACE的Reactor处于I/O和Dispatch层。提供了I/O监控和消息Dispatch。其中I/O需要用户以handle的形式提供到ACE_Reactor内。

Dispatch需要以ACE_Event_Handler为载体,也就是说要实现一个完整的Reactor只依赖ACE_Reactor类是无法完成的。

上篇博文利用ACE的Socket可以看出一个ACE_SOCK_Acceptor和ACE_SOCK_Stream就可以完成服务器代码。现在要做的是,

1.引入Reactor,把Acceptor和Stream两个I/O分别放在两个继承于ACE_Event_Handler的类中注册给ACE_Reactor。

2.主函数注册包含ACE_SOCK_Acceptor的类到ACE_Reactor中,当ACE_SOCK_Acceptor收到数据即有客户端连接后再给对应的客户端创建一个ACE_SOCK_Stream通道并注册到ACE_Reactor中。

使用ACE_Reactor实现的Server代码:

  1 #include <ace/INET_Addr.h>
  2 #include <ace/SOCK_Acceptor.h>
  3 #include <ace/SOCK_Stream.h>
  4 #include <ace/Reactor.h> 
  5 #include <ace/Log_Msg.h>
  6 #include <list>
  7 
  8 #define MAX_BUFF_SIZE     1024
  9 #define LISTEN_PORT     5010
 10 #define SERVER_IP        ACE_LOCALHOST
 11 
 12 class ServerStream : public ACE_Event_Handler
 13 {
 14 public:
 15     ServerStream();
 16     ~ServerStream();
 17     ACE_SOCK_Stream& GetStream(){return Svr_stream;}    //给accept提供接口绑定数据通道
 18     virtual int handle_input(ACE_HANDLE fd);    //I/O触发事件后调用
 19     void close();
 20     virtual ACE_HANDLE get_handle(void) const {return Svr_stream.get_handle();}    //不重载需要手动将handle传入ACE_Reactor
 21 private:
 22     ACE_INET_Addr Cli_addr;
 23     ACE_SOCK_Stream Svr_stream;
 24 };
 25 
 26 ServerStream::ServerStream()
 27 {
 28 
 29 }
 30 
 31 ServerStream::~ServerStream()
 32 {
 33     close();
 34 }
 35 
 36 int ServerStream::handle_input(ACE_HANDLE fd)
 37 {
 38     char strBuffer[MAX_BUFF_SIZE];
 39     int byte = Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //可读数据
 40     if (-1 == byte)
 41     {
 42         ACE_DEBUG((LM_INFO, ACE_TEXT("receive data failed\n")));
 43     }
 44     else if(0 == byte)
 45     {
 46         close();
 47         ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));
 48     }
 49     else
 50     {
 51         ACE_DEBUG((LM_INFO, ACE_TEXT("receive from client: %s\n"),strBuffer));
 52     }
 53 }
 54 
 55 void ServerStream::close()
 56 {
 57     Svr_stream.close();
 58     ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
 59     //delete this;
 60 }
 61 
 62 class ServerAcceptor : public ACE_Event_Handler
 63 {
 64 public:
 65     ServerAcceptor(int port,char* ip);
 66     ~ServerAcceptor();
 67     bool open();
 68     virtual int handle_input(ACE_HANDLE fd);  //有client连接
 69     void close();
 70     virtual ACE_HANDLE get_handle(void) const {return Svr_aceept.get_handle();}
 71 private:
 72     ACE_INET_Addr Svr_addr;
 73     ACE_SOCK_Acceptor Svr_aceept;
 74     std::list<ServerStream*> m_streamPool;  //stream pool
 75 };
 76 
 77 ServerAcceptor::ServerAcceptor(int port,char* ip):Svr_addr(port,ip)
 78 {
 79     if (!open())    //open listen port
 80     {
 81         ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!\n")));
 82     }
 83     else
 84     {
 85         ACE_DEBUG((LM_INFO, ACE_TEXT("open success!\n")));
 86     }
 87 }
 88 
 89 ServerAcceptor::~ServerAcceptor()
 90 {
 91     close();
 92     std::list<ServerStream*>::iterator it;
 93     for (it = m_streamPool.begin();it != m_streamPool.end();++it)
 94     {
 95         if (NULL != (*it))
 96         {
 97             (*it)->close();
 98             delete (*it);
 99         }
100     }
101 }
102 
103 bool ServerAcceptor::open()
104 {
105     if (-1 == Svr_aceept.open(Svr_addr,1))
106     {
107         ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept\n")));
108         Svr_aceept.close();
109         return false;
110     }
111     return true;
112 }
113 
114 int ServerAcceptor::handle_input(ACE_HANDLE fd )  
115 {
116     ServerStream *stream = new ServerStream();    //产生新通道
117     if (NULL != stream)
118     {
119         m_streamPool.push_back(stream);
120     }
121     if (Svr_aceept.accept(stream->GetStream()) == -1)  //绑定通道
122     {  
123         printf("accept client fail\n");  
124         return -1;  
125     }
126     ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK);  //通道注册到ACE_Reactor
127     ACE_DEBUG((LM_ERROR,ACE_TEXT("User connect success!\n")));
128 }  
129     
130 void ServerAcceptor::close()
131 {
132     ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
133     Svr_aceept.close();
134 }
135 
136 int ACE_TMAIN()
137 {
138     ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
139     ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);    //listen port注册到ACE_Reactor
140     ACE_Reactor::instance()->run_reactor_event_loop();  //进入消息循环,有I/O事件回调handle_input
141     return 0;
142 }

测试结果:

终端1:

终端2:

终端3:

ACE_Reactor内部已经帮我们实现了IO复用。

有了Reactor的demo后,下面一步步查看ACE_Reactor内部是如何运作的:

ACE_Reactor注册EVENT,重载了一个register_handler:

 1 int
 2 ACE_Reactor::register_handler (ACE_Event_Handler *event_handler,
 3                                ACE_Reactor_Mask mask)
 4 {
 5   // Remember the old reactor.
 6   ACE_Reactor *old_reactor = event_handler->reactor ();
 7 
 8   // Assign *this* <Reactor> to the <Event_Handler>.
 9   event_handler->reactor (this);
10 
11   int result = this->implementation ()->register_handler (event_handler, mask);
12   if (result == -1)
13     // Reset the old reactor in case of failures.
14     event_handler->reactor (old_reactor);
15 
16   return result;
17 }

第11行实际是ACE_Reactor_Impl *implementation (void) const;在做实际功能。进一步查看implementation 是如何注册的。

1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler
3   (ACE_Event_Handler *handler,
4    ACE_Reactor_Mask mask)
5 {
6   ACE_TRACE ("ACE_Select_Reactor_T::register_handler");
7   ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1));
8   return this->register_handler_i (handler->get_handle (), handler, mask);
9 }

这里开始大量使用模板,这里重载了两个在最后调用register_handler_i,在第8行,可以看到调用了get_handle,也就是我们重载的那个函数,所以我们不需要传入ACE_Reactor,它在这一步调用了我们重新的虚函数,获得了handle,当然也可以不做重写,手动传入handle。这个handle就是我们要处理的I/O,而handler则是我们继承ACE_Event_Handler的类。

 1 template <class ACE_SELECT_REACTOR_TOKEN> int
 2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i
 3   (ACE_HANDLE handle,
 4    ACE_Event_Handler *event_handler,
 5    ACE_Reactor_Mask mask)
 6 {
 7   ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i");
 8 
 9   // Insert the <handle, event_handle> tuple into the Handler
10   // Repository.
11   return this->handler_rep_.bind (handle, event_handler, mask);
12 }

到这里,我们看到代码handler_rep_将hande和event_handler绑定了起来,handler_rep_在Select_Reactor_Base.h内为ACE_Select_Reactor_Impl的成员变量。下面我们继续看bind实际是在做什么操作。

 1 // Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>.
 2 int
 3 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
 4                                              ACE_Event_Handler *event_handler,
 5                                              ACE_Reactor_Mask mask)
 6 {
 7   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
 8 
 9   if (event_handler == 0)
10     return -1;
11 
12   if (handle == ACE_INVALID_HANDLE)
13     handle = event_handler->get_handle ();
14 
15   if (this->invalid_handle (handle))
16     return -1;
17 
18   // Is this handle already in the Reactor?
19   bool existing_handle = false;
20 
21 #if defined (ACE_WIN32)
22 
23   map_type::ENTRY * entry = 0;
24 
25   int const result =
26     this->event_handlers_.bind (handle, event_handler, entry);
27 
28   if (result == -1)
29     {
30       return -1;
31     }
32   else if (result == 1)  // Entry already exists.
33     {
34       // Cannot use a different handler for an existing handle.
35       if (event_handler != entry->item ())
36         {
37           return -1;
38         }
39       else
40         {
41           // Remember that this handle is already registered in the
42           // Reactor.
43           existing_handle = true;
44         }
45     }
46 
47 #else
48 
49   // Check if this handle is already registered.
50   ACE_Event_Handler * const current_handler =
51     this->event_handlers_[handle];
52 
53   if (current_handler)
54     {
55       // Cannot use a different handler for an existing handle.
56       if (current_handler != event_handler)
57         return -1;
58 
59       // Remember that this handle is already registered in the
60       // Reactor.
61       existing_handle = true;
62     }
63 
64   this->event_handlers_[handle] = event_handler;
65 
66   if (this->max_handlep1_ < handle + 1)
67     this->max_handlep1_ = handle + 1;
68 
69 #endif /* ACE_WIN32 */
70 
71   if (this->select_reactor_.is_suspended_i (handle))
72     {
73       this->select_reactor_.bit_ops (handle,
74                                      mask,
75                                      this->select_reactor_.suspend_set_,
76                                      ACE_Reactor::ADD_MASK);
77     }
78   else
79     {
80       this->select_reactor_.bit_ops (handle,
81                                      mask,
82                                      this->select_reactor_.wait_set_,
83                                      ACE_Reactor::ADD_MASK);
84 
85       // Note the fact that we've changed the state of the <wait_set_>,
86       // which is used by the dispatching loop to determine whether it can
87       // keep going or if it needs to reconsult select().
88       // this->select_reactor_.state_changed_ = 1;
89     }
90 
91   // If new entry, call add_reference() if needed.
92   if (!existing_handle)
93     event_handler->add_reference ();
94 
95   return 0;
96 }

这里非常关键,第50行,event_handlers_实则是一个容器,handle和event_hander以index的方式绑定了起来,存储在了一个容器内,第80行还有一行关键代码,ADD_MASK形式的操作加入到了wait_set_成员内。

注册的代码到这里为止,ACE_Reactor实际上调用了几层N个文件,其实就是把handle,即I/O和handler,即继承ACE_Event_Handler的类绑定在了一个容器里。下面看ACE_Reactor是如何进行消息循环的。

 1 int
 2 ACE_Reactor::run_reactor_event_loop (REACTOR_EVENT_HOOK eh)
 3 {
 4   ACE_TRACE ("ACE_Reactor::run_reactor_event_loop");
 5 
 6   if (this->reactor_event_loop_done ())
 7     return 0;
 8 
 9   while (1)
10     {
11       int const result = this->implementation_->handle_events ();
12 
13       if (eh != 0 && (*eh)(this))
14         continue;
15       else if (result == -1 && this->implementation_->deactivated ())
16         return 0;
17       else if (result == -1)
18         return -1;
19     }
20 
21   ACE_NOTREACHED (return 0;)
22 }

同样,将loop交给了ACE_Reactor_Impl *implementation (void) const;操作。

继续跟踪

1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events
3   (ACE_Time_Value &max_wait_time)
4 {
5   ACE_TRACE ("ACE_Select_Reactor_T::handle_events");
6 
7   return this->handle_events (&max_wait_time);
8 }

再次到了模板,调用handle_events,下面到了关键代码

 1 template <class ACE_SELECT_REACTOR_TOKEN> int
 2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i
 3   (ACE_Time_Value *max_wait_time)
 4 {
 5   int result = -1;
 6 
 7   ACE_SEH_TRY
 8     {
 9       // We use the data member dispatch_set_ as the current dispatch
10       // set.
11 
12       // We need to start from a clean dispatch_set
13       this->dispatch_set_.rd_mask_.reset ();
14       this->dispatch_set_.wr_mask_.reset ();
15       this->dispatch_set_.ex_mask_.reset ();
16 
17       int number_of_active_handles =
18         this->wait_for_multiple_events (this->dispatch_set_,
19                                         max_wait_time);
20 
21       result =
22         this->dispatch (number_of_active_handles,
23                         this->dispatch_set_);
24     }
25   ACE_SEH_EXCEPT (this->release_token ())
26     {
27       // As it stands now, we catch and then rethrow all Win32
28       // structured exceptions so that we can make sure to release the
29       // <token_> lock correctly.
30     }
31 
32   return result;
33 }

第18行wait_for_multiple_events和第22行dispatch。分别做了两件非常关键的事。

 1 // Must be called with lock held.
 2 
 3 template <class ACE_SELECT_REACTOR_TOKEN> int
 4 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::wait_for_multiple_events
 5   (ACE_Select_Reactor_Handle_Set &dispatch_set,
 6    ACE_Time_Value *max_wait_time)
 7 {
 8   ACE_TRACE ("ACE_Select_Reactor_T::wait_for_multiple_events");
 9   ACE_Time_Value timer_buf (0);
10   ACE_Time_Value *this_timeout = 0;
11 
12   int number_of_active_handles = this->any_ready (dispatch_set);
13 
14   // If there are any bits enabled in the <ready_set_> then we'll
15   // handle those first, otherwise we'll block in <select>.
16 
17   if (number_of_active_handles == 0)
18     {
19       do
20         {
21           if (this->timer_queue_ == 0)
22             return 0;
23 
24           this_timeout =
25             this->timer_queue_->calculate_timeout (max_wait_time,
26                                                    &timer_buf);
27 #ifdef ACE_WIN32
28           // This arg is ignored on Windows and causes pointer
29           // truncation warnings on 64-bit compiles.
30           int const width = 0;
31 #else
32           int const width = this->handler_rep_.max_handlep1 ();
33 #endif  /* ACE_WIN32 */
34 
35           dispatch_set.rd_mask_ = this->wait_set_.rd_mask_;
36           dispatch_set.wr_mask_ = this->wait_set_.wr_mask_;
37           dispatch_set.ex_mask_ = this->wait_set_.ex_mask_;
38           number_of_active_handles = ACE_OS::select (width,
39                                                      dispatch_set.rd_mask_,
40                                                      dispatch_set.wr_mask_,
41                                                      dispatch_set.ex_mask_,
42                                                      this_timeout);
43         }
44       while (number_of_active_handles == -1 && this->handle_error () > 0);
45 
46       if (number_of_active_handles > 0)
47         {
48 #if !defined (ACE_WIN32)
49           // Resynchronize the fd_sets so their "max" is set properly.
50           dispatch_set.rd_mask_.sync (this->handler_rep_.max_handlep1 ());
51           dispatch_set.wr_mask_.sync (this->handler_rep_.max_handlep1 ());
52           dispatch_set.ex_mask_.sync (this->handler_rep_.max_handlep1 ());
53 #endif /* ACE_WIN32 */
54         }
55       else if (number_of_active_handles == -1)
56         {
57           // Normally, select() will reset the bits in dispatch_set
58           // so that only those filed descriptors that are ready will
59           // have bits set.  However, when an error occurs, the bit
60           // set remains as it was when the select call was first made.
61           // Thus, we now have a dispatch_set that has every file
62           // descriptor that was originally waited for, which is not
63           // correct.  We must clear all the bit sets because we
64           // have no idea if any of the file descriptors is ready.
65           //
66           // NOTE: We dont have a test case to reproduce this
67           // problem. But pleae dont ignore this and remove it off.
68           dispatch_set.rd_mask_.reset ();
69           dispatch_set.wr_mask_.reset ();
70           dispatch_set.ex_mask_.reset ();
71         }
72     }
73 
74   // Return the number of events to dispatch.
75   return number_of_active_handles;
76 }

第35行熟悉的变量wait_set_和第38行函数select,到这里发现,Reactor的I/O监控,就是利用select函数监控之前注册进去且ADD到wait_set_的handle,即I/O。

当有I/O事件,即返回值的number_of_active_handles不为0时,将进行dispatch。

  1 template <class ACE_SELECT_REACTOR_TOKEN> int
  2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch
  3   (int active_handle_count,
  4    ACE_Select_Reactor_Handle_Set &dispatch_set)
  5 {
  6   ACE_TRACE ("ACE_Select_Reactor_T::dispatch");
  7 
  8   int io_handlers_dispatched = 0;
  9   int other_handlers_dispatched = 0;
 10   int signal_occurred = 0;
 11   // The following do/while loop keeps dispatching as long as there
 12   // are still active handles.  Note that the only way we should ever
 13   // iterate more than once through this loop is if signals occur
 14   // while we're dispatching other handlers.
 15 
 16   do
 17     {
 18       // We expect that the loop will decrease the number of active
 19       // handles in each iteration.  If it does not, then something is
 20       // inconsistent in the state of the Reactor and we should avoid
 21       // the loop.  Please read the comments on bug 2540 for more
 22       // details.
 23       int initial_handle_count = active_handle_count;
 24 
 25       // Note that we keep track of changes to our state.  If any of
 26       // the dispatch_*() methods below return -1 it means that the
 27       // <wait_set_> state has changed as the result of an
 28       // <ACE_Event_Handler> being dispatched.  This means that we
 29       // need to bail out and rerun the select() loop since our
 30       // existing notion of handles in <dispatch_set> may no longer be
 31       // correct.
 32       //
 33       // In the beginning, our state starts out unchanged.  After
 34       // every iteration (i.e., due to signals), our state starts out
 35       // unchanged again.
 36 
 37       this->state_changed_ = false;
 38 
 39       // Perform the Template Method for dispatching all the handlers.
 40 
 41       // First check for interrupts.
 42       if (active_handle_count == -1)
 43         {
 44           // Bail out -- we got here since <select> was interrupted.
 45           if (ACE_Sig_Handler::sig_pending () != 0)
 46             {
 47               ACE_Sig_Handler::sig_pending (0);
 48 
 49               // If any HANDLES in the <ready_set_> are activated as a
 50               // result of signals they should be dispatched since
 51               // they may be time critical...
 52               active_handle_count = this->any_ready (dispatch_set);
 53 
 54               // Record the fact that the Reactor has dispatched a
 55               // handle_signal() method.  We need this to return the
 56               // appropriate count below.
 57               signal_occurred = 1;
 58             }
 59           else
 60             return -1;
 61         }
 62 
 63       // Handle timers early since they may have higher latency
 64       // constraints than I/O handlers.  Ideally, the order of
 65       // dispatching should be a strategy...
 66       else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1)
 67         // State has changed or timer queue has failed, exit loop.
 68         break;
 69 
 70       // Check to see if there are no more I/O handles left to
 71       // dispatch AFTER we've handled the timers...
 72       else if (active_handle_count == 0)
 73         return io_handlers_dispatched
 74           + other_handlers_dispatched
 75           + signal_occurred;
 76 
 77       // Next dispatch the notification handlers (if there are any to
 78       // dispatch).  These are required to handle multi-threads that
 79       // are trying to update the <Reactor>.
 80 
 81       else if (this->dispatch_notification_handlers
 82                (dispatch_set,
 83                 active_handle_count,
 84                 other_handlers_dispatched) == -1)
 85         // State has changed or a serious failure has occured, so exit
 86         // loop.
 87         break;
 88 
 89       // Finally, dispatch the I/O handlers.
 90       else if (this->dispatch_io_handlers
 91                (dispatch_set,
 92                 active_handle_count,
 93                 io_handlers_dispatched) == -1)
 94         // State has changed, so exit loop.
 95         break;
 96 
 97       // if state changed, we need to re-eval active_handle_count,
 98       // so we will not end with an endless loop
 99       if (initial_handle_count == active_handle_count
100           || this->state_changed_)
101       {
102         active_handle_count = this->any_ready (dispatch_set);
103       }
104     }
105   while (active_handle_count > 0);
106 
107   return io_handlers_dispatched + other_handlers_dispatched + signal_occurred;
108 }

这里一步步按顺序进行判断分发,进入dispatch_notification_handlers,调用到Select_Reactor_Base.cpp的dispatch_notifications,到这里终于看到熟悉的函数。

 1 // Handles pending threads (if any) that are waiting to unblock the
 2 // Select_Reactor.
 3 
 4 int
 5 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
 6                                                    ACE_Handle_Set &rd_mask)
 7 {
 8   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
 9 
10   ACE_HANDLE const read_handle =
11     this->notification_pipe_.read_handle ();
12 
13   if (read_handle != ACE_INVALID_HANDLE
14       && rd_mask.is_set (read_handle))
15     {
16       --number_of_active_handles;
17       rd_mask.clr_bit (read_handle);
18       return this->handle_input (read_handle);
19     }
20   else
21     return 0;
22 }

第18行,调用了自己的handle_input,还不是最开始外部重载的handle_input,查看最后这个函数。

 1 int
 2 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
 3 {
 4   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
 5   // Precondition: this->select_reactor_.token_.current_owner () ==
 6   // ACE_Thread::self ();
 7 
 8   int number_dispatched = 0;
 9   int result = 0;
10   ACE_Notification_Buffer buffer;
11 
12   // If there is only one buffer in the pipe, this will loop and call
13   // read_notify_pipe() twice.  The first time will read the buffer, and
14   // the second will read the fact that the pipe is empty.
15   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
16     {
17       // Dispatch the buffer
18       // NOTE: We count only if we made any dispatches ie. upcalls.
19       if (this->dispatch_notify (buffer) > 0)
20         ++number_dispatched;
21 
22       // Bail out if we've reached the <notify_threshold_>.  Note that
23       // by default <notify_threshold_> is -1, so we'll loop until all
24       // the notifications in the pipe have been dispatched.
25       if (number_dispatched == this->max_notify_iterations_)
26         break;
27     }
28 
29   // Reassign number_dispatched to -1 if things have gone seriously
30   // wrong.
31   if (result < 0)
32     number_dispatched = -1;
33 
34   // Enqueue ourselves into the list of waiting threads.  When we
35   // reacquire the token we'll be off and running again with ownership
36   // of the token.  The postcondition of this call is that
37   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
38   this->select_reactor_->renew ();
39   return number_dispatched;
40 }

第15行,取数据,第19行dispatch_notify

 1 int
 2 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
 3 {
 4   int result = 0;
 5 
 6 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
 7   // Dispatch one message from the notify queue, and put another in
 8   // the pipe if one is available.  Remember, the idea is to keep
 9   // exactly one message in the pipe at a time.
10 
11   bool more_messages_queued = false;
12   ACE_Notification_Buffer next;
13 
14   result = notification_queue_.pop_next_notification(buffer,
15                                                      more_messages_queued,
16                                                      next);
17 
18   if (result == 0 || result == -1)
19     {
20       return result;
21     }
22 
23   if(more_messages_queued)
24     {
25       (void) ACE::send(this->notification_pipe_.write_handle(),
26             (char *)&next, sizeof(ACE_Notification_Buffer));
27     }
28 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
29 
30   // If eh == 0 then another thread is unblocking the
31   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
32   // internal structures.  Otherwise, we need to dispatch the
33   // appropriate handle_* method on the <ACE_Event_Handler> pointer
34   // we've been passed.
35   if (buffer.eh_ != 0)
36     {
37       ACE_Event_Handler *event_handler = buffer.eh_;
38 
39       bool const requires_reference_counting =
40         event_handler->reference_counting_policy ().value () ==
41         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
42 
43       switch (buffer.mask_)
44         {
45         case ACE_Event_Handler::READ_MASK:
46         case ACE_Event_Handler::ACCEPT_MASK:
47           result = event_handler->handle_input (ACE_INVALID_HANDLE);
48           break;
49         case ACE_Event_Handler::WRITE_MASK:
50           result = event_handler->handle_output (ACE_INVALID_HANDLE);
51           break;
52         case ACE_Event_Handler::EXCEPT_MASK:
53           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
54           break;
55         case ACE_Event_Handler::QOS_MASK:
56           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
57           break;
58         case ACE_Event_Handler::GROUP_QOS_MASK:
59           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
60           break;
61         default:
62           // Should we bail out if we get an invalid mask?
63           ACE_ERROR ((LM_ERROR,
64                       ACE_TEXT ("invalid mask = %d\n"),
65                       buffer.mask_));
66         }
67 
68       if (result == -1)
69         event_handler->handle_close (ACE_INVALID_HANDLE,
70                                      ACE_Event_Handler::EXCEPT_MASK);
71 
72       if (requires_reference_counting)
73         {
74           event_handler->remove_reference ();
75         }
76     }
77 
78   return 1;
79 }

到这里,终于看到调用到我们最开始继承ACE_Event_Handler重写的那个回调handle_input()了。

至此,ACE_Reactor内部源码的执行过程全部结束,其实ACE并没有做非常特别的事,注册利用一个容器进行I/O和回调方法的绑定,I/O复用利用select,最后发生I/O事件找到对应的event函数handle_input执行。

怪不得网上有人抱怨ACE代码臃肿了,这些我们关心“简单”过程的代码就这么多的复杂用法和调用,更不用说我们还没用上的了,但是ACE提供的Reactor框架确实方便了我们使用,也提供了可靠的移植性和性能。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

    Aichen
  • ACE - ACE_Task源码剖析及线程池实现

    原文出自http://www.cnblogs.com/binchen-china,禁止转载。

    Aichen
  • ACE - ACE_Task源码剖析及线程池实现

    Aichen
  • ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

    Aichen
  • ACE - ACE_Task源码剖析及线程池实现

    原文出自http://www.cnblogs.com/binchen-china,禁止转载。

    Aichen
  • ACE - ACE_Task源码剖析及线程池实现

    Aichen
  • ACE - 代码层次及Socket封装

    Aichen
  • ACE - 代码层次及Socket封装

    原文出自http://www.cnblogs.com/binchen-china,禁止转载。

    Aichen
  • 将某个Qt4项目升级到Qt5遇到的问题[转]

    该Qt4项目以前是使用Qt4.7.4 MSVC2008开发的,因为使用到了OWC10(Office Web Components),使用MSVC编译器的话无法正...

    lpxxn
  • Python数据分析之matplotlib(二)

    罗罗攀

扫码关注云+社区

领取腾讯云代金券