前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >1 producer — n consumers 模型 实现

1 producer — n consumers 模型 实现

作者头像
小爷毛毛_卓寿杰
发布2019-02-13 12:24:41
3690
发布2019-02-13 12:24:41
举报
文章被收录于专栏:Soul Joy HubSoul Joy Hub
1 #include<stdio.h>
  2 #include<string.h>
  3 #include<pthread.h>
  4 #include<stdlib.h>
  5 #include<unistd.h>
  6 #include<queue>
  7 using namespace std;
  8 #define LEN 128
  9 typedef struct task_que
 10 {
 11     queue<char*> string_queue;
 12     int m_flag ;        //结束标志
 13     int m_capacity;
 14     pthread_mutex_t m_lock;
 15     pthread_cond_t m_pro , m_con;
 16 }QUE , *pQUE;
 17 void* thd_func(void* arg);
 18 void put(pQUE pq, char* src);
 19 void get(pQUE pq, char* dest);
 20 int main(int argc, char *argv[])
 21 {
 22     QUE aque;
 23     aque.m_flag = 0;
 24     int nthds = atoi( argv[1] );   //线程的个数
 25     aque.m_capacity = atoi( argv[2] );//字符串队列的大小
 26     pthread_mutex_init( &aque.m_lock , NULL ); //锁
 27     pthread_cond_init( &aque.m_pro ,NULL );   //生产者条件变量
 28     pthread_cond_init( &aque.m_con , NULL );  //消费者条件变量
 29     //开辟线程空间
 30     pthread_t *thd_arr = ( pthread_t*)calloc(nthds, sizeof(pthread_t));
 31     int* ntasks = (int*)calloc(nthds, sizeof(int));//用来记录 线程工作了几次
 32     //创建线程
 33     int index;
 34     for(index = 0; index < nthds; index ++)
 35     {    
 36         pthread_create( thd_arr + index, NULL, thd_func,(void*)&aque );
 37     }
 38 
 39 
 40     //输入字符串到队列中
 41     char buf[LEN] ;
 42     while( memset(buf, 0, LEN), fgets(buf, LEN, stdin) != NULL)
 43     {
 44         put(&aque, buf);
 45     }
 46 
 47 
 48 
 49     //发出结束字符串
 50     strcpy(buf, "over");
 51     put(&aque, buf);
 52 
 53     for(index = 0; index < nthds; index ++)
 54     {
 55         pthread_join(thd_arr[index], (void**)(ntasks + index ));
 56     }
 57     for(index = 0; index < nthds; index ++)
 58     {
 59         printf("%d ", ntasks[index]);
 60     }
 61     printf("\n");
 62 
 63     pthread_mutex_destroy(&aque.m_lock);
 64     pthread_cond_destroy(&aque.m_pro);
 65     pthread_cond_destroy(&aque.m_con);
 66     return 0 ;
 67 }
 68 void put(pQUE pq,  char* src)      //把字符串写到队列中
 69 {
 70     pthread_mutex_lock(&pq ->m_lock);   //加锁
 71     while(pq ->string_queue.size() == pq ->m_capacity)  //队列满则阻塞
 72     {
 73         pthread_cond_wait(&pq -> m_pro, &pq ->m_lock);
 74 
 75     }
 76     //插入队列
 77     char* tem =  ( char*)calloc( LEN , sizeof( char ));
 78     strcpy(tem,src);
 79     pq->string_queue.push(tem);
 80     pthread_mutex_unlock(&pq -> m_lock);  //解锁
 81     pthread_cond_broadcast(&pq ->m_con);  //唤醒所有消费者线程
 82 
 83 
 84 }
 85 
 86 void get(pQUE pq, char* dest)
 87 {
 88     pthread_mutex_lock(&pq ->m_lock); //加锁
 89     while(pq ->m_flag == 0 &&  pq ->string_queue.empty()  ) //队列空 并且未结束 则阻塞
 90     {
 91         pthread_cond_wait(&pq ->m_con, &pq ->m_lock);
 92     }
 93     if(pq ->m_flag == 1)                 //判断结束标志
 94     {
 95         pthread_mutex_unlock(&pq ->m_lock); //解锁
 96         return ;                         
 97     }
 98     //出队
 99     strcpy(dest, pq ->string_queue.front());
100     pq->string_queue.pop();
101 
102     pthread_mutex_unlock(&pq ->m_lock);
103     pthread_cond_signal(&pq ->m_pro);
104 
105 }
106 
107 void* thd_func(void* arg)
108 {
109     pQUE pq = (pQUE)arg ;
110     char buf[LEN] ;
111     int ncnt = 0 ;
112     while(1)
113     {
114         memset(buf, 0, LEN) ;
115         get(pq, buf);
116         if(pq ->m_flag == 1)            //判断结束标志
117         {
118             printf("%u exit!\n", pthread_self());
119             pthread_exit((void*)ncnt);             //退出
120         }
121         ncnt ++ ;
122         printf("%u: %s\n", pthread_self(), buf);  //打印字符串
123         if(strcmp("over", buf) == 0)              //判断结束字符串        
124         {
125             pq ->m_flag = 1 ;                    //把结束符号置为 1
126             pthread_cond_broadcast(&pq ->m_con); //唤醒所有线程
127             pthread_exit((void*)ncnt);           //退出
128         }
129         if(ncnt & 1 == 1) sleep(1);     //简单的负载平衡
130     }
131 }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016年06月13日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档