Linux进程间通信(二) - 消息队列

消息队列

消息队列是Linux IPC中很常用的一种通信方式,它通常用来在不同进程间发送特定格式的消息数据。

消息队列和之前讨论过的管道和FIFO有很大的区别,主要有以下两点(管道请查阅我的另一篇文章:https://cloud.tencent.com/developer/article/1021159):

Ø 一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达,而管道和FIFO是相反的,进程向其中写消息时,管道和FIFO必须已经打开来读,否则写进程就会阻塞(默认情况下)。

Ø IPC的持续性不同。管道和FIFO是随进程的持续性,当管道和FIFO最后一次关闭发生时,仍在管道和FIFO中的数据会被丢弃。消息队列是随内核的持续性,即一个进程向消息队列写入消息后,然后终止,另外一个进程可以在以后某个时刻打开该队列读取消息。只要内核没有重新自举,消息队列没有被删除。

消息队列中的每条消息通常具有以下属性:

Ø 一个表示优先级的整数;

Ø 消息的数据部分的长度;

Ø 消息数据本身;

下面我们分别阐述POSIX消息队列和System V消息队列,这2种消息队列目前Linux都支持。

POSIX消息队列

数据结构

先给出mq_attr 结构的定义

#include <bits/mqueue.h>
struct mq_attr
{
 long int mq_flags; /* Message queue flags. 0 or O_NONBLOCK */
 long int mq_maxmsg; /* Maximum number of messages. */
 long int mq_msgsize; /* Maximum message size. */
 long int mq_curmsgs; /* Number of messages currently queued. */
 long int __pad[4];
};

函数说明

// 打开一个已经创建的消息队列

mqd_t mq_open(const char *name, int oflag);

// 创建消息队列

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

name:表示消息队列的名字,它符合POSIX IPC的名字规则。

oflag:表示打开的方式,和 open函数的类似。有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。

mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。

attr:也是一个可选参数,在 oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。

mq_open返回值是mqd_t类型的值,被称为消息队列描述符。

在Linux 2.6中该类型的定义为整型:

#include <bits/mqueue.h>

typedef int mqd_t;

// 关闭消息队列

mqd_t mq_close(mqd_t mqdes);

mq_close用于关闭一个消息队列,和文件的close类型一样,关闭后,消息队列并不从系统中删除。一个进程结束,会自动调用关闭打开着的消息队列。

// 删除消息队列

mqd_t mq_unlink(const char *name);

mq_unlink用于删除一个消息队列。消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,和文件一样,因此本函数能够实现类似于unlink函数删除一个文件的机制。

// 获取消息队列参数

mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);

// 设置消息队列参数

mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。

mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。mq_maxmsg和mq_msgsize属性只能在创建消息队列时通过mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。

// 发送接收消息

mqd_t mq_send(mqd_t mqdes, const char *msg_ptr,

size_t msg_len, unsigned msg_prio); //成功返回0,出错返回-1

mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,

unsigned *msg_prio); //成功返回接收到消息的字节数,出错返回-1

如果mq为空,mq_receive默认阻塞,如果设置了O_NONBLOCK,mq_receive立即返回,并将errno设置为EAGAIN。

多进程情况下,如果多个进程阻塞在mq_receive调用,当消息到来时,具有最高优先级和等待时间最长的进程将得到这条消息。因此可以确认,mq接收消息在应用层看来是原子操作。

#ifdef __USE_XOPEN2K

mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr,

size_t msg_len, unsigned msg_prio,

const struct timespec *abs_timeout);

mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,

size_t msg_len, unsigned *msg_prio,

const struct timespec *abs_timeout);

#endif

mq_send向消息队列中写入一条消息,mq_receive从消息队列中读取一条消息。

mqdes:消息队列描述符;

msg_ptr:指向消息体缓冲区的指针;

msg_len:消息体的长度,其中mq_receive 的该参数不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。如果 mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。

msg_prio:消息的优先级;它是一个小于 MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在 mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。

还有两个XSI定义的扩展接口限时发送和接收消息的函数:mq_timedsend和mq_timedreceive函数。默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK。

mq使用详解

创建一个mq
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_create <mq name>\n");
        exit(0);
    }
    
    mqID = mq_open(argv[optind], O_RDWR|O_CREAT|O_EXCL, 0666, NULL);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    printf("open message queue succ, mqID = %d\n", mqID);
    
    mq_attr mqAttr;
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        printf("get the message queue attribute error\n");
        return -1;
    }
    
    printf("mq_flags = %d, mq_maxmsg = %d, mq_msgsize = %d, mq_curmsgs = %d\n",
        mqAttr.mq_flags, mqAttr.mq_maxmsg, mqAttr.mq_msgsize, mqAttr.mq_curmsgs);
    
    return 0;
}

结果说明:

[root@rocket ipc]# g++ -g -o ipc_posix_mq_create ipc_posix_mq_create.cpp -lrt

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue /a.txt error[File exists]

[root@rocket ipc]# ./ipc_posix_mq_create /b.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_create /b.txt

open message queue /b.txt error[File exists]

删除一个mq
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_unlink <mq name>\n");
        exit(0);
    }
    
    mq_unlink(argv[optind]);
    printf("error = %s\n", strerror(errno));
    return 0;
}

结果说明:

[root@rocket ipc]# g++ -g -o ipc_posix_mq_unlink ipc_posix_mq_unlink.cpp -lrt

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue /a.txt error[File exists]

[root@rocket ipc]# ./ipc_posix_mq_unlink /a.txt

error = Success

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

获取mq的属性
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_getattr <mq name>\n");
        exit(0);
    }
    
    mqID = mq_open(argv[optind], O_RDONLY);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    printf("open message queue succ, mqID = %d\n", mqID);
    
    mq_attr mqAttr;
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        printf("get the message queue attribute error\n");
        return -1;
    }
    
    printf("mq_flags = %d, mq_maxmsg = %d, mq_msgsize = %d, mq_curmsgs = %d\n",
        mqAttr.mq_flags, mqAttr.mq_maxmsg, mqAttr.mq_msgsize, mqAttr.mq_curmsgs);
    mq_close(mqID);
    
    return 0;
}

结果说明:

[root@rocket ipc]# g++ -g -o ipc_posix_mq_getattr ipc_posix_mq_getattr.cpp -lrt

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_getattr /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_unlink /a.txt

error = Success

[root@rocket ipc]# ./ipc_posix_mq_getattr /a.txt

open message queue /a.txt error[No such file or directory]

设置mq属性
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    if (argc!=4)
    {
        printf("usage: ./ipc_posix_mq_open_setattr <mq name> <max msg> <msgsize>\n");
        exit(0);
    }
    
    mq_attr mqAttr;
    mqAttr.mq_maxmsg = atoi(argv[2]);
    mqAttr.mq_msgsize = atoi(argv[3]);
    
    mqID = mq_open(argv[optind], O_RDWR|O_CREAT|O_EXCL, 0666, &mqAttr);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    printf("open message queue succ, mqID = %d\n", mqID);
    
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        printf("get the message queue attribute error\n");
        return -1;
    }
    
    printf("mq_flags = %d, mq_maxmsg = %d, mq_msgsize = %d, mq_curmsgs = %d\n",
        mqAttr.mq_flags, mqAttr.mq_maxmsg, mqAttr.mq_msgsize, mqAttr.mq_curmsgs);
    mq_close(mqID);
    
    return 0;
}

结果说明:

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_unlink /a.txt

error = Success

[root@rocket ipc]# ./ipc_posix_mq_open_setattr /a.txt 100 1024

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 100, mq_msgsize = 1024, mq_curmsgs = 0

这里可以看出,属性修改符合预期,已经和默认属性不一样了。

发送接收mq消息

发送mq消息

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    unsigned int iprio;
    if (argc!=4)
    {
        printf("usage: ./ipc_posix_mq_send <mq name> <message> <priority>\n");
        exit(0);
    }
    iprio = atoi(argv[3]);
    
    mqID = mq_open(argv[optind], O_WRONLY);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    printf("open message queue succ, mqID = %d\n", mqID);
    mq_send(mqID, argv[2], strlen(argv[2]), iprio);
    return 0;
}

接收mq消息

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    mq_attr mqAttr;
    unsigned int iprio;
    unsigned int n;
    char buff[8192];
    
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_recv <mq name>\n");
        exit(0);
    }
    
    mqID = mq_open(argv[optind], O_RDONLY);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    mq_getattr(mqID, &mqAttr);
    n = mq_receive(mqID, buff, mqAttr.mq_msgsize, &iprio);
    printf("read from mq`s msg = %s\n", buff);
    return 0;
}

结果说明:

[root@rocket ipc]# g++ -g -o ipc_posix_mq_recv ipc_posix_mq_recv.cpp -lrt

[root@rocket ipc]# g++ -g -o ipc_posix_mq_send ipc_posix_mq_send.cpp -lrt

[root@rocket ipc]# ./ipc_posix_mq_unlink /a.txt

error = No such file or directory

[root@rocket ipc]# ./ipc_posix_mq_create /a.txt

open message queue succ, mqID = 3

mq_flags = 0, mq_maxmsg = 10, mq_msgsize = 8192, mq_curmsgs = 0

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "how are you?" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_recv /a.txt

read from mq`s msg = hello

[root@rocket ipc]# ./ipc_posix_mq_recv /a.txt

read from mq`s msg = how are you?

多进程阻塞接收mq消息,发送进程跟前面一样,接收进程修改为循环接收消息

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>

using namespace std;

int main(int argc, char** argv)
{
    mqd_t mqID;
    mq_attr mqAttr;
    unsigned int iprio;
    unsigned int n;
    char buff[8192];
    
    if (argc!=2)
    {
        printf("usage: ./ipc_posix_mq_recv <mq name>\n");
        exit(0);
    }
    
    mqID = mq_open(argv[optind], O_RDONLY);
    if (mqID < 0)
    {
        printf("open message queue %s error[%s]\n", argv[optind], strerror(errno));
        return -1;
    }
    mq_getattr(mqID, &mqAttr);
    while(1)
    {
        n = mq_receive(mqID, buff, mqAttr.mq_msgsize, &iprio);
        printf("read from mq`s msg = %s\n", buff);
    }
    return 0;
}

结果说明:可以看到当2个进程调用mq_receive,当消息到来时,只有1个进程能接收到这条消息,2个进程轮流的接收mq_send发出的消息

tty1发送消息:

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello222" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello223" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello123" 10

open message queue succ, mqID = 3

[root@rocket ipc]# ./ipc_posix_mq_send /a.txt "hello333" 10

open message queue succ, mqID = 3

tty2接收消息:

[root@rocket ipc]# ./ipc_posix_mq_recv_loop /a.txt

read from mq`s msg = hello

read from mq`s msg = hello222

read from mq`s msg = hello123

tty3接收消息:

[root@rocket ipc]# ./ipc_posix_mq_recv_loop /a.txt

read from mq`s msg = hello

read from mq`s msg = hello223

read from mq`s msg = hello333

POSIX消息队列的限制

POSIX消息队列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize,分别用于限定消息队列中的最大消息数和每个消息的最大字节数。在前面已经说过了,这两个参数可以在调用mq_open创建一个消息队列的时候设定。但这个设定是受到系统内核限制的。

下面是在Linux 2.6下shell对启动进程的POSIX消息队列大小的限制:

[root@rocket ipc]# ulimit -a|grep message

POSIX message queues (bytes, -q) 819200

当然你可以调大这个参数

[root@rocket ipc]# ulimit -q 8192000

[root@rocket ipc]# ulimit -a|grep message

POSIX message queues (bytes, -q) 8192000

System V消息队列

数据结构

控制结构:

struct msqid_ds {

struct ipc_perm msg_perm; /* Ownership and permissions */

time_t msg_stime; /* Time of last msgsnd(2) */

time_t msg_rtime; /* Time of last msgrcv(2) */

time_t msg_ctime; /* Time of last change */

unsigned long __msg_cbytes; /* Current number of bytes in

queue (non-standard) */

msgqnum_t msg_qnum; /* Current number of messages

in queue */

msglen_t msg_qbytes; /* Maximum number of bytes

allowed in queue */

pid_t msg_lspid; /* PID of last msgsnd(2) */

pid_t msg_lrpid; /* PID of last msgrcv(2) */

};

发送接收数据:

struct msgbuf {

long mtype; /* message type, must be > 0 */

char mtext[1]; /* message data */

};

函数说明

msgget函数

该函数用来创建和访问一个消息队列。它的原型为:

int msgget(key_t key, int msgflg);

与其他的IPC机制一样,程序必须提供一个键来命名某个特定的消息队列。msgflg是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。

它返回一个以key命名的消息队列的标识符(非零整数),失败时返回-1。

msgsnd函数

该函数用来把消息添加到消息队列中。它的原型为:

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);

msgid是由msgget函数返回的消息队列标识符。

msg_ptr是一个指向准备发送消息的指针,但是消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。所以消息结构要定义成这样:

struct my_message{

long int message_type;

/* The data you wish to transfer*/

};

struct my_message{

long int message_type;

/* The data you wish to transfer*/

};

msg_sz是msg_ptr指向的消息的长度,注意是消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度。

msgflg用于控制当前消息队列满或队列消息到达系统范围的限制时将要发生的事情。

如果调用成功,消息数据的一分副本将被放到消息队列中,并返回0,失败时返回-1.

msgrcv函数

该函数用来从一个消息队列获取消息,它的原型为

int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);

msgid, msg_ptr, msg_st的作用也函数msgsnd函数的一样。

msgtype可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。

msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情。

调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1.

msgctl函数

该函数用来控制消息队列,它与共享内存的shmctl函数相似,它的原型为:

int msgctl(int msgid, int command, struct msgid_ds *buf);

int msgctl(int msgid, int command, struct msgid_ds *buf);

command是将要采取的动作,它可以取3个值,

IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。

IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值

IPC_RMID:删除消息队列

buf是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds结构至少包括以下成员:

struct msgid_ds 
{ 
uid_t shm_perm.uid; 
uid_t shm_perm.gid; 
mode_t shm_perm.mode; 
}; 
struct msgid_ds
{
 uid_t shm_perm.uid;
 uid_t shm_perm.gid;
 mode_t shm_perm.mode;
};

成功时返回0,失败时返回-1.

使用详解

mq创建,代码说明:
#include <unistd.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define BUFF_SIZE 1024
struct mq_msg_st  
{  
    long msg_type;  
    char text[BUFF_SIZE];  
};  

int main(int argc, char** argv)  
{    
    int msgid = -1;  
    struct mq_msg_st data;  
    long msgtype = 0;
    int iret = 0;
  
    //建立消息队列  
    msgid = msgget((key_t)1234, 0666 | IPC_CREAT | IPC_EXCL);  
    if(msgid == -1)  
    {  
        printf("msgget failed with error: %s\n", strerror(errno));  
        return -1;
    }
    printf("msgget succ, msgid = %d\n", msgid);
    
    // 获取消息队列状态
    struct msqid_ds ds;
    iret = msgctl(msgid, IPC_STAT, (struct msqid_ds *)&ds);
    if(iret == -1)
    {
        printf("msgctl IPC_STAT failed\n");
        return -2;
    }
    
    // 设置消息队列最大容量
    const unsigned int QBYTES_NUM = 10000000;
    ds.msg_qbytes = QBYTES_NUM;
    iret = msgctl(msgid, IPC_SET, (struct msqid_ds *)&ds);
    if(iret == -1)
    {
        printf("msgctl IPC_SET failed\n");
        return -3;
    }
    
    return 0;
}

结果说明:

[root@rocket ipc]# g++ -g -o ipc_systemv_mq_create ipc_systemv_mq_create.cpp

[root@rocket ipc]# ./ipc_systemv_mq_create

msgget succ, msgid = 0

[root@rocket ipc]# ./ipc_systemv_mq_create

msgget failed with error: File exists

[root@rocket ipc]# ipcs

------ Shared Memory Segments --------

key shmid owner perms bytes nattch status

0x00000000 0 gdm 600 393216 2 dest

0x00000000 32769 gdm 600 393216 2 dest

0x00000000 65538 gdm 600 393216 2 dest

0x00000000 98307 gdm 600 393216 2 dest

------ Semaphore Arrays --------

key semid owner perms nsems

0x00000000 0 root 600 1

0x00000000 32769 root 600 1

------ Message Queues --------

key msqid owner perms used-bytes messages

0x000004d2 0 root 666 0 0

这里看到已经创建了一个key为1234(16进制为4d2)的消息队列。

mq删除,代码说明:
#include <unistd.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int main(int argc, char** argv)  
{    
    int msgid = -1;   
    
    //建立消息队列  
    msgid = msgget((key_t)1234, 0666);  
    if(msgid == -1)  
    {  
        printf("msgget failed with error: %s\n", strerror(errno));  
        return -1;
    }
    printf("msgget succ, msgid = %d\n", msgid);
    
    if (msgctl(msgid, IPC_RMID, 0) == -1)  
    {  
        printf("msgctl IPC_RMID failed\n");  
        return -1;
    }  
    
    return 0;
}
mq发送,代码说明:
#include <unistd.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>  

#define BUFF_SIZE 1024
struct mq_msg_st  
{  
    long msg_type;  
    char text[BUFF_SIZE];  
};

int main(int argc, char** argv)  
{    
    int msgid = -1;  
    struct mq_msg_st data;
    long msgtype = 0;
    int iret = 0;
    char buffer[BUFF_SIZE];
  
    //建立消息队列  
    msgid = msgget((key_t)1234, 0666);  
    if(msgid == -1)  
    {  
        printf("msgget failed with error: %s\n", strerror(errno));  
        return -1;
    }
    printf("msgget succ, msgid = %d\n", msgid);
    
    // 获取消息队列状态
    struct msqid_ds ds;
    iret = msgctl(msgid, IPC_STAT, (struct msqid_ds *)&ds);
    if(iret == -1)
    {
        printf("msgctl IPC_STAT failed\n");
        return -2;
    }
    
    while(1)  
    {  
        //输入数据  
        printf("Enter some text: ");  
        fgets(buffer, BUFF_SIZE, stdin);  
        data.msg_type = 1;      
        strcpy(data.text, buffer);  
        //向队列发送数据  
        iret = msgsnd(msgid, (void*)&data, strlen(data.text)+1, IPC_NOWAIT);
        if(iret == -1)
        {  
            if (errno == EAGAIN)
            {
                continue;
            }
            else
            {
                printf("msgsnd failed, error = %s\n", strerror(errno));
            return -1;
            }
        }
        //输入end结束输入  
        if(strncmp(buffer, "end", 3) == 0)
        {
            break;
        }            
    }      
    return 0;
}

结果说明:

[root@rocket ipc]# g++ -g -o ipc_systemv_mq_send ipc_systemv_mq_send.cpp

[root@rocket ipc]# ./ipc_systemv_mq_send

msgget succ, msgid = 32768

Enter some text: hello

Enter some text: world

Enter some text: end

[root@rocket ipc]# ipcs

------ Shared Memory Segments --------

key shmid owner perms bytes nattch status

0x00000000 0 gdm 600 393216 2 dest

0x00000000 32769 gdm 600 393216 2 dest

0x00000000 65538 gdm 600 393216 2 dest

0x00000000 98307 gdm 600 393216 2 dest

------ Semaphore Arrays --------

key semid owner perms nsems

0x00000000 0 root 600 1

0x00000000 32769 root 600 1

------ Message Queues --------

key msqid owner perms used-bytes messages

0x000004d2 32768 root 666 19 3

这里看到发送3条消息之后这里的messages为3。

mq接收,代码说明:
#include <unistd.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>  
#include <errno.h>  
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define BUFF_SIZE 1024
struct mq_msg_st  
{  
    long msg_type;  
    char text[BUFF_SIZE];  
};

int main(int argc, char** argv)  
{    
    int msgid = -1;  
    struct mq_msg_st data;
    long msgtype = 0;
    int iret = 0;
  
    //建立消息队列  
    msgid = msgget((key_t)1234, 0666);  
    if(msgid == -1)  
    {  
        printf("msgget failed with error: %s\n", strerror(errno));  
        return -1;
    }
    printf("msgget succ, msgid = %d\n", msgid);
    
    // 获取消息队列状态
    struct msqid_ds ds;
    iret = msgctl(msgid, IPC_STAT, (struct msqid_ds *)&ds);
    if(iret == -1)
    {
        printf("msgctl IPC_STAT failed\n");
        return -2;
    }
    
    //从队列中获取消息,直到遇到end消息为止  
    while(1)
    {
        iret = msgrcv(msgid, (void*)&data, BUFF_SIZE, msgtype, IPC_NOWAIT);
        if (iret == -1)
        {  
            if (errno == ENOMSG)
            {
                usleep(100);
                continue;
            }
            else
            {
                printf("msgrcv failed, error = %s\n", strerror(errno));
                return -1;
            }
        }
        
        printf("get message: %s\n", data.text);  
        //遇到end结束  
        if(strncmp(data.text, "end", 3) == 0)
        {
            break;
        }  
    }
    
    return 0;
}

结果说明:

[root@rocket ipc]# g++ -g -o ipc_systemv_mq_recv ipc_systemv_mq_recv.cpp

[root@rocket ipc]# ./ipc_systemv_mq_recv

msgget succ, msgid = 32768

get message: hello

get message: world

get message: end

[root@rocket ipc]# ipcs

------ Shared Memory Segments --------

key shmid owner perms bytes nattch status

0x00000000 0 gdm 600 393216 2 dest

0x00000000 32769 gdm 600 393216 2 dest

0x00000000 65538 gdm 600 393216 2 dest

0x00000000 98307 gdm 600 393216 2 dest

------ Semaphore Arrays --------

key semid owner perms nsems

0x00000000 0 root 600 1

0x00000000 32769 root 600 1

------ Message Queues --------

key msqid owner perms used-bytes messages

0x000004d2 32768 root 666 0 0

这里看到消息接收完了,messages为0。

msgrcv接收消息类型说明:

The argument msgtyp specifies the type of message requested as follows:

* If msgtyp is 0, then the first message in the queue is read.

* If msgtyp is greater than 0, then the first message in the queue of type msgtyp is read, unless MSG_EXCEPT was specified in msgflg, in which case the first message in the queue of type not equal to msgtyp will be read.

* If msgtyp is less than 0, then the first message in the queue with the lowest type less than or equal to the absolute value of msgtyp will be read.

这几段也说得比较清楚了,这里就不翻译了,在开发的过程中我们可以方便的使用msgtype来分发消息到不同的进程。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏开发与安全

linux网络编程之posix 线程(一):线程模型、pthread 系列函数 和 简单多线程服务器端程序

一、线程有3种模型,分别是N:1用户线程模型,1:1核心线程模型和N:M混合线程模型,posix thread属于1:1模型。 (一)、N:1用户线程模型 “线...

62900
来自专栏博岩Java大讲堂

Java日志体系(logback)

3.3K50
来自专栏码洞

深入Python多进程通信原理与实战——图文

继上节使用原生多进程并行运行,基于Redis作为消息队列完成了圆周率的计算,本节我们使用原生操作系统消息队列来替换Redis。

10620
来自专栏凉城

[教程]黑客级别的批量处理文件

27350
来自专栏coolblog.xyz技术专栏

MyBatis 源码分析 - 内置数据源

本篇文章将向大家介绍 MyBatis 内置数据源的实现逻辑。搞懂这些数据源的实现,可使大家对数据源有更深入的认识。同时在配置这些数据源时,也会更清楚每种属性的意...

13710
来自专栏逸鹏说道

在 ASP.NET MVC 中使用异步控制器

可以通过 AsyncController 类编写异步操作方法。 可以对长时间运行的、非 CPU 绑定的请求使用异步操作方法。 这样可避免在处理请求时阻塞 Web...

390110
来自专栏主机笔记

centos安装ab工具给网站进行压力测试

在配置好网站服务器后,我们可以进行压力测试看一看实际环境中的效果怎么样,判断服务器质量、网站程序设计是否合理、提前预防突发事件。今天就介绍一款开源免费的压力测试...

320100
来自专栏CSDN技术头条

使用 Dubbo 搭建一个简单的分布式系统

随着阿里巴巴开源的高性能分布式 RPC 框架 Dubbo 正式进入 Apache 孵化器,Dubbo 又火了一把。本文作为 Dubbo 系列开端,先教大家使用 ...

19720
来自专栏Android机动车

Android热修复简单总结

DexClassLoader -> DexPathList -> DexFile -> -> Element -> dexElements.add(elemen...

11820
来自专栏java系列博客

maven 构建第一个HelloWorld

18620

扫码关注云+社区

领取腾讯云代金券