前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ACE - ACE_Task源码剖析及线程池实现

ACE - ACE_Task源码剖析及线程池实现

作者头像
Aichen
发布2018-05-18 10:47:04
1.6K0
发布2018-05-18 10:47:04
举报
文章被收录于专栏:白驹过隙

上篇提到用Reactor模式,利用I/O复用,获得Socket数据并且实现I/O层单线程并发,和dispatch层把不同的I/O绑定到了不同的Event中去处理。也就是已经实现了多个client连接和通信,且可以把不同的I/O与Event句柄绑定,指定处理函数。

但是问题来了,多个用户连接时,I/O层可以通过复用以较快的速度处理连接和把过来的数据关联到绑定的Event函数执行。但是绑定Event函数获得数据后,需要逐个处理,当大量数据从I/O层过来,所有数据共享线程,而且业务代码又是非常耗时的。每个socket通道过来的数据都要以单线程的方式执行业务,效率就非常低了。必须当线程空闲时才能占有,所以这里有必要引入线程池去处理业务。

ACE_Task类封装了线程和消息队列,使这些功能以面向对象的方式提供给用户,其中消息队列并非IPC中的消息队列,在ACE实现实际为普通队列。利用ACE_Task可以生产一个或一组线程,并且为线程池之间的消息交互提供了接口和队列。在Event部分,我们可以利用线程池来解决效率问题。

下面提供线程池实现代码:

代码语言:javascript
复制
 1 #include "ace/Task.h"
 2 #include "ace/OS.h" 
 3 #include <string>
 4 
 5 using namespace std;
 6 
 7 string g_strMsg[] = {
 8 "MESSAGE 1",
 9 "MESSAGE 2",
10 "MESSAGE 3",
11 "MESSAGE 4",
12 "MESSAGE 5",
13 "MESSAGE 6",
14 };
15 
16 class TaskThread: public ACE_Task<ACE_MT_SYNCH>
17 {
18 public:
19     virtual int svc(void)
20     {
21         ACE_Message_Block *Msg;// = new ACE_Message_Block();
22         while(1)
23         {
24             getq(Msg);            //空闲线程阻塞
25             ACE_DEBUG((LM_INFO,"recevie msg:%s,time:%d\n",Msg->rd_ptr(),(int)ACE_OS::time()));
26             Msg->release();        //release
27             ACE_OS::sleep(1);    //延时1s模拟业务处理耗时
28         }
29     }
30 };
31 
32 class Message
33 {
34 public:
35     Message(TaskThread* mb);
36 };
37 
38 Message::Message(TaskThread* mb)
39 {
40     for (int i = 0;i < 6;++i)
41     {
42         string m_data = g_strMsg[i];
43         ACE_Message_Block* msg = new ACE_Message_Block(sizeof(m_data));
44         msg->copy(m_data.c_str());
45         mb->putq(msg);    //put
46     }
47 }
48 
49 int main(int argc, char *argv[]) 
50 { 
51     TaskThread task;
52     Message initMsg(&task);
53     task.activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , 10);//创建10个线程
54     while(1);
55     return 0; 
56 }

修改线程数量进行测试:

10个线程效果:

3个线程效果:

1个线程效果:

有了这个ACE_TASK的demo,我们跟踪查看ACE源码。

在Task.cpp的ACE_Task_Base内的activate函数,可以看到线程是怎么创建出来的。

代码语言:javascript
复制
  1 int
  2 ACE_Task_Base::activate (long flags,
  3                          int n_threads,
  4                          int force_active,
  5                          long priority,
  6                          int grp_id,
  7                          ACE_Task_Base *task,
  8                          ACE_hthread_t thread_handles[],
  9                          void *stack[],
 10                          size_t stack_size[],
 11                          ACE_thread_t thread_ids[],
 12                          const char* thr_name[])
 13 {
 14   ACE_TRACE ("ACE_Task_Base::activate");
 15 
 16 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
 17   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
 18 
 19   // If the task passed in is zero, we will use <this>
 20   if (task == 0)
 21     task = this;
 22 
 23   if (this->thr_count_ > 0 && force_active == 0)
 24     return 1; // Already active.
 25   else
 26     {
 27       if (this->thr_count_ > 0 && this->grp_id_ != -1)
 28         // If we're joining an existing group of threads then make
 29         // sure to use its group id.
 30         grp_id = this->grp_id_;
 31       this->thr_count_ += n_threads;
 32     }
 33 
 34   // Use the ACE_Thread_Manager singleton if we're running as an
 35   // active object and the caller didn't supply us with a
 36   // Thread_Manager.
 37   if (this->thr_mgr_ == 0)
 38 # if defined (ACE_THREAD_MANAGER_LACKS_STATICS)
 39     this->thr_mgr_ = ACE_THREAD_MANAGER_SINGLETON::instance ();
 40 # else /* ! ACE_THREAD_MANAGER_LACKS_STATICS */
 41     this->thr_mgr_ = ACE_Thread_Manager::instance ();
 42 # endif /* ACE_THREAD_MANAGER_LACKS_STATICS */
 43 
 44   int grp_spawned = -1;
 45   if (thread_ids == 0)
 46     // Thread Ids were not specified
 47     grp_spawned =
 48       this->thr_mgr_->spawn_n (n_threads,
 49                                &ACE_Task_Base::svc_run,
 50                                (void *) this,
 51                                flags,
 52                                priority,
 53                                grp_id,
 54                                task,
 55                                thread_handles,
 56                                stack,
 57                                stack_size,
 58                                thr_name);
 59   else
 60     // thread names were specified
 61     grp_spawned =
 62       this->thr_mgr_->spawn_n (thread_ids,
 63                                n_threads,
 64                                &ACE_Task_Base::svc_run,
 65                                (void *) this,
 66                                flags,
 67                                priority,
 68                                grp_id,
 69                                stack,
 70                                stack_size,
 71                                thread_handles,
 72                                task,
 73                                thr_name);
 74   if (grp_spawned == -1)
 75     {
 76       // If spawn_n fails, restore original thread count.
 77       this->thr_count_ -= n_threads;
 78       return -1;
 79     }
 80 
 81   if (this->grp_id_ == -1)
 82     this->grp_id_ = grp_spawned;
 83 
 84 #if defined (ACE_MVS) || defined(__TANDEM)
 85   ACE_OS::memcpy( &this->last_thread_id_, '\0', sizeof(this->last_thread_id_));
 86 #else
 87   this->last_thread_id_ = 0;    // Reset to prevent inadvertant match on ID
 88 #endif /* defined (ACE_MVS) */
 89 
 90   return 0;
 91 
 92 #else
 93   {
 94     // Keep the compiler from complaining.
 95     ACE_UNUSED_ARG (flags);
 96     ACE_UNUSED_ARG (n_threads);
 97     ACE_UNUSED_ARG (force_active);
 98     ACE_UNUSED_ARG (priority);
 99     ACE_UNUSED_ARG (grp_id);
100     ACE_UNUSED_ARG (task);
101     ACE_UNUSED_ARG (thread_handles);
102     ACE_UNUSED_ARG (stack);
103     ACE_UNUSED_ARG (stack_size);
104     ACE_UNUSED_ARG (thread_ids);
105     ACE_UNUSED_ARG (thr_name);
106     ACE_NOTSUP_RETURN (-1);
107   }
108 #endif /* ACE_MT_SAFE */
109 }

第49行,把线程执行函数指向了svc_run。

代码语言:javascript
复制
 1 ACE_THR_FUNC_RETURN
 2 ACE_Task_Base::svc_run (void *args)
 3 {
 4   ACE_TRACE ("ACE_Task_Base::svc_run");
 5 
 6   ACE_Task_Base *t = (ACE_Task_Base *) args;
 7 
 8   // Register ourself with our <Thread_Manager>'s thread exit hook
 9   // mechanism so that our close() hook will be sure to get invoked
10   // when this thread exits.
11 
12 #if defined ACE_HAS_SIG_C_FUNC
13   t->thr_mgr ()->at_exit (t, ACE_Task_Base_cleanup, 0);
14 #else
15   t->thr_mgr ()->at_exit (t, ACE_Task_Base::cleanup, 0);
16 #endif /* ACE_HAS_SIG_C_FUNC */
17 
18   // Call the Task's svc() hook method.
19   int const svc_status = t->svc ();
20   ACE_THR_FUNC_RETURN status;
21 #if defined (ACE_HAS_INTEGRAL_TYPE_THR_FUNC_RETURN)
22   // Reinterpret case between integral types is not mentioned in the C++ spec
23   status = static_cast<ACE_THR_FUNC_RETURN> (svc_status);
24 #else
25   status = reinterpret_cast<ACE_THR_FUNC_RETURN> (svc_status);
26 #endif /* ACE_HAS_INTEGRAL_TYPE_THR_FUNC_RETURN */
27 
28 // If we changed this zero change the other if in OS.cpp Thread_Adapter::invoke
29 #if 1
30   // Call the <Task->close> hook.
31   ACE_Thread_Manager *thr_mgr_ptr = t->thr_mgr ();
32 
33   // This calls the Task->close () hook.
34   t->cleanup (t, 0);
35 
36   // This prevents a second invocation of the cleanup code
37   // (called later by <ACE_Thread_Manager::exit>.
38   thr_mgr_ptr->at_exit (t, 0, 0);
39 #endif
40   return status;
41 }

svc_run则在第19行调用了具体的我们在外部重写的虚函数函数执行。

下面是消息队列的维护,putq和getq在内联函数文件Task_T.inl内。

代码语言:javascript
复制
 1 template <ACE_SYNCH_DECL> ACE_INLINE int
 2 ACE_Task<ACE_SYNCH_USE>::getq (ACE_Message_Block *&mb, ACE_Time_Value *tv)
 3 {
 4   ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::getq");
 5   return this->msg_queue_->dequeue_head (mb, tv);
 6 }
 7 
 8 template <ACE_SYNCH_DECL> ACE_INLINE int
 9 ACE_Task<ACE_SYNCH_USE>::putq (ACE_Message_Block *mb, ACE_Time_Value *tv)
10 {
11   ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::putq");
12   return this->msg_queue_->enqueue_tail (mb, tv);
13 }

实则就是在维护ACE_Task类里面的ACE_Message_Queue<ACE_SYNCH_USE> *msg_queue_;这个队列。

ACE_Task相对简单,里面关于线程池的创建和调度还是有很多值得学习的地方。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016-06-24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档