本文介绍常见的进程间通信方式,分为互斥锁和条件变量,共享内存和信号量两部分,并分别给出样例使用方式和运行结果:
1. 生产者和消费者使用互斥锁和条件变量通信
详细代码如下:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#define NUM_OF_PRODUCER 2
#define NUM_OF_CONSUMER 3
#define MAX_TASK_QUEUE_LEN 10
char task_queue[MAX_TASK_QUEUE_LEN];
int head=0,tail=0,quit=0;
pthread_mutex_t g_task_lock;
pthread_cond_t g_task_cv;
void print_task_status(){
for(int i=0;i<MAX_TASK_QUEUE_LEN;i++){
if(i!=0)
printf(",\t");
printf("%c", task_queue[i]);
if(head==i)
printf("%c", 'H');
if(tail==i)
printf("%c", 'T');
}
printf("\n");
}
void *consumer(void *not_used){
pthread_t tid=pthread_self();
while(!quit){
pthread_mutex_lock(&g_task_lock);
while(head==tail){
if(quit){
printf("Thread consumer %u is quiting!\n", (unsigned int)tid);
pthread_mutex_unlock(&g_task_lock);
pthread_exit((void*)0);
}
printf("No task now! Thread %u is waiting!\n", (unsigned int)tid);
pthread_cond_wait(&g_task_cv, &g_task_lock);
printf("Have task now! Thread %u is grabing the task !\n", (unsigned int)tid);
}
char task=task_queue[tail];
task_queue[tail]='.';
tail=(tail+1)%MAX_TASK_QUEUE_LEN;
printf("consumer %u has a task %c now!\n", (unsigned int)tid, task);
print_task_status();
pthread_mutex_unlock(&g_task_lock);
sleep(2);
printf("consumer %u finish the task %c!\n", (unsigned int)tid, task);
}
pthread_exit((void*)0);
}
void *producer(void *not_used){
pthread_t tid=pthread_self();
while(!quit){
pthread_mutex_lock(&g_task_lock);
if((head+1)%MAX_TASK_QUEUE_LEN != tail){
char task='A' + rand()%26;
task_queue[head] = task;
head=(head+1)%MAX_TASK_QUEUE_LEN;
printf("producer %u gen a task %c!\n", (unsigned int)tid, task);
print_task_status();
pthread_cond_broadcast(&g_task_cv);
}
else{
printf("producer %u find the queue is full !\n", (unsigned int)tid);
}
pthread_mutex_unlock(&g_task_lock);
sleep(rand()%2);
}
printf("Thread %u producer is quiting!\n", (unsigned int)tid);
pthread_exit((void *)0);
}
int main(){
pthread_t pros[NUM_OF_PRODUCER];
pthread_t cons[NUM_OF_CONSUMER];
int rc,t;
memset(task_queue, '.', sizeof(char) * MAX_TASK_QUEUE_LEN);
pthread_mutex_init(&g_task_lock, NULL);
pthread_cond_init(&g_task_cv, NULL);
for(t=0;t<NUM_OF_PRODUCER;t++){
rc=pthread_create(&pros[t], NULL, producer, NULL);
if(rc){
printf("ERROR; return code from producer pthread_create() is %d\n", rc);
exit(-1);
}
}
for(t=0;t<NUM_OF_CONSUMER;t++){
rc=pthread_create(&cons[t], NULL, consumer, NULL);
if(rc){
printf("ERROR; return code from consumer pthread_create() is %d\n", rc);
exit(-1);
}
}
sleep(20);
pthread_mutex_lock(&g_task_lock);
quit=1;
pthread_cond_broadcast(&g_task_cv);
pthread_mutex_unlock(&g_task_lock);
pthread_mutex_destroy(&g_task_lock);
pthread_cond_destroy(&g_task_cv);
pthread_exit(NULL);
}
其中H表示head位置,T表示tail位置,点表示内存区域为空,可能未生产数据或者已经被消费完成了。
shared.h包含了consumer.cpp和producer.cpp共同用到的方法。producer中让用户输入几个整数,并将输入的整数保存到共享内存中,然后consumer从共享内存中读取整数相加产生结果。这里的信号量只设定为1,起到了互斥锁的作用。
//shared.h
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/sem.h>
#include <string.h>
#define MAX_NUM 128
struct shm_data {
int data[MAX_NUM];
int data_len;
};
union semun {
int val;
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};
int get_shmid(){
int shmid;
key_t key;
if((key=ftok("/data/angelotong/multi_task/shm_key", 1024)) < 0){
perror("ftok error");
return -1;
}
shmid = shmget(key, sizeof(shm_data), IPC_CREAT|0777);
return shmid;
}
int get_semid(){
int semid;
int key;
if((key=ftok("/data/angelotong/multi_task/sem_key", 1024)) < 0){
perror("ftok error");
return -1;
}
semid = semget(key, 1, IPC_CREAT|0777);
return semid;
}
int sem_init(int semid){
union semun argument;
unsigned short values[1];
values[0]=1;
argument.array = values;
return semctl(semid, 0, SETALL, argument);
}
int sem_p(int semid){
struct sembuf operations[1];
operations[0].sem_num = 0;
operations[0].sem_op = -1;
operations[0].sem_flag = SEM_UNDO; //这里的flag去掉a
return semop(semid, operations, 1);
}
int sem_v(int semid){
struct sembuf operations[1];
operations[0].sem_num = 0;
operations[0].sem_op = 1;
operations[0].sem_flag = SEM_UNDO; //这里的flag去掉a
return semop(semid, operations, 1);
}
//producer.cpp
#include <unistd.h>
#include "shared.h"
int main(){
void *shm = NULL;
struct shm_data *shared = NULL;
int shmid = get_shmid();
int semid = get_semid();
int i;
shm = shmat(shmid, 0, 0);
if(shm == (void*)-1){
exit(0);
}
shared = (struct shm_data*)shm;
memset(shared, 0, sizeof(struct shm_data));
sem_init(semid);
while(true){
sem_p(semid);
if(shared->data_len>0){
sem_v(semid);
sleep(1);
}
else{
printf("input integers num: ");
scanf("%d", &shared->data_len);
if(shared->data_len > MAX_NUM){
perror("too many integers.");
shared->data_len = 0;
sem_v(semid);
exit(1);
}
for(i=0;i<shared->data_len;i++){
printf("Input the %dth integer: ", i);
scanf("%d", &shared->data[i]);
}
sem_v(semid);
}
}
}
//consumer.cpp
#include <unistd.h>
#include "shared.h"
int main(){
void *shm = NULL;
struct shm_data *shared = NULL;
int shmid = get_shmid();
int semid = get_semid();
int i;
shm = shmat(shmid, (void*)0, 0);
if(shm == (void*)-1){
exit(0);
}
shared = (struct shm_data*)shm;
while(true){
sem_p(semid);
if(shared->data_len>0){
int sum = 0;
for(i=0;i<shared->data_len-1;i++){
printf("%d+",shared->data[i]);
sum+=shared->data[i];
}
printf("%d",shared->data[i]);
sum+=shared->data[shared->data_len-1];
printf("=%d\n", sum);
memset(shared, 0, sizeof(struct shm_data));
sem_v(semid);
}
else{
sem_v(semid);
printf("no tasks, waiting.\n");
sleep(1);
}
}
}
使用ipcs命令可以查看到我们创建的共享内存和信号量:
三、在使用锁时,需要注意不要产生死锁
1. 死锁产生必要条件:
2. 死锁预防--破坏必要条件:
3. 死锁避免:
4. 死锁检测:
5. 死锁恢复:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。