ACE - ACE_Task源码剖析及线程池实现

原文出自http://www.cnblogs.com/binchen-china,禁止转载。

上篇提到用Reactor模式,利用I/O复用,获得Socket数据并且实现I/O层单线程并发,和dispatch层把不同的I/O绑定到了不同的Event中去处理。也就是已经实现了多个client连接和通信,且可以把不同的I/O与Event句柄绑定,指定处理函数。

但是问题来了,多个用户连接时,I/O层可以通过复用以较快的速度处理连接和把过来的数据关联到绑定的Event函数执行。但是绑定Event函数获得数据后,需要逐个处理,当大量数据从I/O层过来,所有数据共享线程,而且业务代码又是非常耗时的。每个socket通道过来的数据都要以单线程的方式执行业务,效率就非常低了。必须当线程空闲时才能占有,所以这里有必要引入线程池去处理业务。

ACE_Task类封装了线程和消息队列,使这些功能以面向对象的方式提供给用户,其中消息队列并非IPC中的消息队列,在ACE实现实际为普通队列。利用ACE_Task可以生产一个或一组线程,并且为线程池之间的消息交互提供了接口和队列。在Event部分,我们可以利用线程池来解决效率问题。

下面提供线程池实现代码:

 1 #include "ace/Task.h"
 2 #include "ace/OS.h" 
 3 #include <string>
 4 
 5 using namespace std;
 6 
 7 string g_strMsg[] = {
 8 "MESSAGE 1",
 9 "MESSAGE 2",
10 "MESSAGE 3",
11 "MESSAGE 4",
12 "MESSAGE 5",
13 "MESSAGE 6",
14 };
15 
16 class TaskThread: public ACE_Task<ACE_MT_SYNCH>
17 {
18 public:
19     virtual int svc(void)
20     {
21         ACE_Message_Block *Msg;// = new ACE_Message_Block();
22         while(1)
23         {
24             getq(Msg);            //空闲线程阻塞
25             ACE_DEBUG((LM_INFO,"recevie msg:%s,time:%d\n",Msg->rd_ptr(),(int)ACE_OS::time()));
26             Msg->release();        //release
27             ACE_OS::sleep(1);    //延时1s模拟业务处理耗时
28         }
29     }
30 };
31 
32 class Message
33 {
34 public:
35     Message(TaskThread* mb);
36 };
37 
38 Message::Message(TaskThread* mb)
39 {
40     for (int i = 0;i < 6;++i)
41     {
42         string m_data = g_strMsg[i];
43         ACE_Message_Block* msg = new ACE_Message_Block(sizeof(m_data));
44         msg->copy(m_data.c_str());
45         mb->putq(msg);    //put
46     }
47 }
48 
49 int main(int argc, char *argv[]) 
50 { 
51     TaskThread task;
52     Message initMsg(&task);
53     task.activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , 10);//创建10个线程
54     while(1);
55     return 0; 
56 }

修改线程数量进行测试:

10个线程效果:

3个线程效果:

1个线程效果:

有了这个ACE_TASK的demo,我们跟踪查看ACE源码。

在Task.cpp的ACE_Task_Base内的activate函数,可以看到线程是怎么创建出来的。

  1 int
  2 ACE_Task_Base::activate (long flags,
  3                          int n_threads,
  4                          int force_active,
  5                          long priority,
  6                          int grp_id,
  7                          ACE_Task_Base *task,
  8                          ACE_hthread_t thread_handles[],
  9                          void *stack[],
 10                          size_t stack_size[],
 11                          ACE_thread_t thread_ids[],
 12                          const char* thr_name[])
 13 {
 14   ACE_TRACE ("ACE_Task_Base::activate");
 15 
 16 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
 17   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
 18 
 19   // If the task passed in is zero, we will use <this>
 20   if (task == 0)
 21     task = this;
 22 
 23   if (this->thr_count_ > 0 && force_active == 0)
 24     return 1; // Already active.
 25   else
 26     {
 27       if (this->thr_count_ > 0 && this->grp_id_ != -1)
 28         // If we're joining an existing group of threads then make
 29         // sure to use its group id.
 30         grp_id = this->grp_id_;
 31       this->thr_count_ += n_threads;
 32     }
 33 
 34   // Use the ACE_Thread_Manager singleton if we're running as an
 35   // active object and the caller didn't supply us with a
 36   // Thread_Manager.
 37   if (this->thr_mgr_ == 0)
 38 # if defined (ACE_THREAD_MANAGER_LACKS_STATICS)
 39     this->thr_mgr_ = ACE_THREAD_MANAGER_SINGLETON::instance ();
 40 # else /* ! ACE_THREAD_MANAGER_LACKS_STATICS */
 41     this->thr_mgr_ = ACE_Thread_Manager::instance ();
 42 # endif /* ACE_THREAD_MANAGER_LACKS_STATICS */
 43 
 44   int grp_spawned = -1;
 45   if (thread_ids == 0)
 46     // Thread Ids were not specified
 47     grp_spawned =
 48       this->thr_mgr_->spawn_n (n_threads,
 49                                &ACE_Task_Base::svc_run,
 50                                (void *) this,
 51                                flags,
 52                                priority,
 53                                grp_id,
 54                                task,
 55                                thread_handles,
 56                                stack,
 57                                stack_size,
 58                                thr_name);
 59   else
 60     // thread names were specified
 61     grp_spawned =
 62       this->thr_mgr_->spawn_n (thread_ids,
 63                                n_threads,
 64                                &ACE_Task_Base::svc_run,
 65                                (void *) this,
 66                                flags,
 67                                priority,
 68                                grp_id,
 69                                stack,
 70                                stack_size,
 71                                thread_handles,
 72                                task,
 73                                thr_name);
 74   if (grp_spawned == -1)
 75     {
 76       // If spawn_n fails, restore original thread count.
 77       this->thr_count_ -= n_threads;
 78       return -1;
 79     }
 80 
 81   if (this->grp_id_ == -1)
 82     this->grp_id_ = grp_spawned;
 83 
 84 #if defined (ACE_MVS) || defined(__TANDEM)
 85   ACE_OS::memcpy( &this->last_thread_id_, '\0', sizeof(this->last_thread_id_));
 86 #else
 87   this->last_thread_id_ = 0;    // Reset to prevent inadvertant match on ID
 88 #endif /* defined (ACE_MVS) */
 89 
 90   return 0;
 91 
 92 #else
 93   {
 94     // Keep the compiler from complaining.
 95     ACE_UNUSED_ARG (flags);
 96     ACE_UNUSED_ARG (n_threads);
 97     ACE_UNUSED_ARG (force_active);
 98     ACE_UNUSED_ARG (priority);
 99     ACE_UNUSED_ARG (grp_id);
100     ACE_UNUSED_ARG (task);
101     ACE_UNUSED_ARG (thread_handles);
102     ACE_UNUSED_ARG (stack);
103     ACE_UNUSED_ARG (stack_size);
104     ACE_UNUSED_ARG (thread_ids);
105     ACE_UNUSED_ARG (thr_name);
106     ACE_NOTSUP_RETURN (-1);
107   }
108 #endif /* ACE_MT_SAFE */
109 }

第49行,把线程执行函数指向了svc_run。

 1 ACE_THR_FUNC_RETURN
 2 ACE_Task_Base::svc_run (void *args)
 3 {
 4   ACE_TRACE ("ACE_Task_Base::svc_run");
 5 
 6   ACE_Task_Base *t = (ACE_Task_Base *) args;
 7 
 8   // Register ourself with our <Thread_Manager>'s thread exit hook
 9   // mechanism so that our close() hook will be sure to get invoked
10   // when this thread exits.
11 
12 #if defined ACE_HAS_SIG_C_FUNC
13   t->thr_mgr ()->at_exit (t, ACE_Task_Base_cleanup, 0);
14 #else
15   t->thr_mgr ()->at_exit (t, ACE_Task_Base::cleanup, 0);
16 #endif /* ACE_HAS_SIG_C_FUNC */
17 
18   // Call the Task's svc() hook method.
19   int const svc_status = t->svc ();
20   ACE_THR_FUNC_RETURN status;
21 #if defined (ACE_HAS_INTEGRAL_TYPE_THR_FUNC_RETURN)
22   // Reinterpret case between integral types is not mentioned in the C++ spec
23   status = static_cast<ACE_THR_FUNC_RETURN> (svc_status);
24 #else
25   status = reinterpret_cast<ACE_THR_FUNC_RETURN> (svc_status);
26 #endif /* ACE_HAS_INTEGRAL_TYPE_THR_FUNC_RETURN */
27 
28 // If we changed this zero change the other if in OS.cpp Thread_Adapter::invoke
29 #if 1
30   // Call the <Task->close> hook.
31   ACE_Thread_Manager *thr_mgr_ptr = t->thr_mgr ();
32 
33   // This calls the Task->close () hook.
34   t->cleanup (t, 0);
35 
36   // This prevents a second invocation of the cleanup code
37   // (called later by <ACE_Thread_Manager::exit>.
38   thr_mgr_ptr->at_exit (t, 0, 0);
39 #endif
40   return status;
41 }

svc_run则在第19行调用了具体的我们在外部重写的虚函数函数执行。

下面是消息队列的维护,putq和getq在内联函数文件Task_T.inl内。

 1 template <ACE_SYNCH_DECL> ACE_INLINE int
 2 ACE_Task<ACE_SYNCH_USE>::getq (ACE_Message_Block *&mb, ACE_Time_Value *tv)
 3 {
 4   ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::getq");
 5   return this->msg_queue_->dequeue_head (mb, tv);
 6 }
 7 
 8 template <ACE_SYNCH_DECL> ACE_INLINE int
 9 ACE_Task<ACE_SYNCH_USE>::putq (ACE_Message_Block *mb, ACE_Time_Value *tv)
10 {
11   ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::putq");
12   return this->msg_queue_->enqueue_tail (mb, tv);
13 }

实则就是在维护ACE_Task类里面的ACE_Message_Queue<ACE_SYNCH_USE> *msg_queue_;这个队列。

ACE_Task相对简单,里面关于线程池的创建和调度还是有很多值得学习的地方。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏c#开发者

lightswitch binding custom control

Listing 1: Setting up data binding for the Rating control usingSystem.Windows.Co...

29311
来自专栏程序员的SOD蜜

用事实说话,成熟的ORM性能不是瓶颈,灵活性不是问题:EF5.0、PDF.NET5.0、Dapper原理分析与测试手记

[本文篇幅较长,可以通过目录查看您感兴趣的内容,或者下载格式良好的PDF版本文件查看]  目录 一、ORM的"三国志"    2 1,PDF.NET诞生历程...

4139
来自专栏码匠的流水账

聊聊spring cloud gateway的RetryGatewayFilter

本文主要研究一下spring cloud gateway的RetryGatewayFilter

1682
来自专栏跟着阿笨一起玩NET

C#如何快速高效地导出大量数据?

本文转载:http://www.cnblogs.com/herbert/archive/2010/07/28/1787095.html

4041
来自专栏前沿技墅

Android进阶解密:探访AMS家族

资深开发工程师,Android进阶二部曲《Android进阶之光》《Android进阶解密》的作者,公众号“刘望舒”的作者,CSDN人气博主。他在博客中构建了“...

4562
来自专栏跟着阿笨一起玩NET

C#实现树型结构TreeView节点拖拽的简单功能(转)

本文摘抄博客园里面的牛人吉日嘎啦。http://www.cnblogs.com/jirigala

2321
来自专栏Laoqi's Linux运维专列

HP服务器Linux下hpacucli常用命令

3746
来自专栏Java成神之路

Java微信公众平台开发_03_消息管理之被动回复消息

上一节,我们启用服务器配置的时候,填写了一个服务器地址(url),如下图,这个url就是回调url,是开发者用来接收微信消息和事件的接口URL 。也就是说,用户...

1.2K5
来自专栏逸鹏说道

ExecuteReader在执行有输出参数的存储过程时拿不到输出参数

异常处理汇总-后端系列 http://www.cnblogs.com/dunitian/p/4523006.html 后期会在博客首发更新:http://dnt...

3537
来自专栏c#开发者

在DataGrid中创建一个弹出式Details窗口

在DataGrid中创建一个弹出式Details窗口 这篇文章来自DotNetJunkie的提议。他最初写信要求我们提供一个关于如何创建在DataGrid 中...

3828

扫码关注云+社区

领取腾讯云代金券