Linux 多进程通信开发(四): 消息队列

版权声明:本文为博主原创文章,未经博主允许不得转载。

这会是一系列文章,讲解的内容也很简单,文章的目的是让自己的知识固话和文档化,以备自己不时的复习,同时也希望能够给予初学者一些帮助。

前面的文章有介绍了如何利用管道进行进程间的通信,但是那只适合比较简单的场景,发送一些简单的通知,我只在 Android 的 Framework 源码中见过这么一回。而 Linux 下 IPC 的手段有很多,今天介绍的消息队列就是其中的一种。

什么是消息队列?

顾名思义,消息队列,首先得是一个队列,这个队列里面存放了许许多多的消息。

---msg----|---msg----|---msg----|---msg---|

特别的是,这个队列是一个内核链表。

什么意思是,进程间通过消息队列进行通信的时候,操作系统内核站了出来,做了一个中间人,它维护了消息队列,并提供了如下操作。

  • 创建消息队列
  • 添加消息
  • 读取消息

进程间要传递数据只要按照相应的格式封装好,然后发送到消息队列当中就好了。

下面对于这些特性开始一一讲解。

消息队列中的消息

消息在实际运行当中是一种数据结构,叫做消息缓冲结构体,在 linux 下有如下定义。

#include <linux/msg.h>

struct msgbuf {
	__kernel_long_t mtype;          /* type of message */
	char mtext[1];                  /* message text */
};
  • mtype 标志消息的类型
  • mtext 是一个 char 类型的数组,但是你可以自定义替换它。

消息队列里面的消息不能无限制添加内容,它的大小又 MSGMAX 这个宏指定,我电脑中这个值是 8192 代表 8K 的容量。

也就是说消息队列适合短小的数据通信。

创建消息队列

创建消息队列分 2 个步骤。

  1. 获取一个键值
  2. 通过键值创建或者使用现存的消息队列

获取键值

通过 ftok()可以获取一个键值。

#include <sys/ipc.h>

key_t ftok (const char *__pathname, int __proj_id) 
  • __pathname 是一个文件或者是目录的路劲,可以任意指定
  • __proj_id 范围是 0 ~ 255

ftok 的思路是通过读取一个文件的 inode 节点信息,然后生成一个唯一的键值用来标识队列。

要注意的是,你要确保你的程序有能够正常访问 pathname 的权限!

创建消息队列

有了键值之后,可以通过调用 msgget()来创建一个消息队列。

int msgget (key_t key, int msgflg)
  • key 就是前面通过 ftok 获取到的键值
  • msgflg 有两个取值可能 IPC_CREATE 或 IPC_EXCL
  • IPC_CREATE 如果消息队列不存在,则新建,否则访问已经存在的消息队列
  • IPC_EXCL 不单独使用,和 IPC_CREATE 配合使用,当访问的消息队列已经存在时,则返回 -1.

msgget 返回结果为 -1 时表明异常,否则返回消息队列的 id。

下面是代码示例。

testmsgget.cpp

#include <iostream>
#include <sys/msg.h>

using namespace std;

int main(int argc,char** argv)
{
    key_t keyid = ftok ("./formsgqueue",0);

    if (keyid < 0)
    {
        cerr << "get keyid failed !" << endl;
        return -1;
    }

    int queueid = msgget(keyid,IPC_CREAT);

    if (queueid < 0)
    {
        cerr << "create messagequeue error!" << endl;
        return -1;
    }

    cout << "Message queue keyid " << keyid;
    cout << " Message queue id: " << queueid << endl;

    return 0;
}

在程序编译之前,我在当前目录下创建了一个空文件。

touch formsgqueue

这个文件的作用是提供给 ftok 使用的。

然后编译代码,并执行。

g++ testmsgget.cpp -o testget
./testget

运行的结果如下:

Message queue keyid 529891 Message queue id: 32769

这个说明,消息队列创建正常。

我们就可以考虑往里面读写消息了。

发送消息

消息队列创建成功后,就可以向其中发送信息了。

通过下面 msgsend 这个 API.

#include <sys/msg.h>

int msgsnd (int __msqid, const void *__msgp, size_t __msgsz,
		   int __msgflg)
  • __msqid 前面讲的消息队列的 id 号
  • __msgp 要发送的消息结构体指针
  • __msgsz 消息体中数据的大小,这个不包括 mtype 在内。
  • __msgflg 0 或者是 IPC_NOWAIT,取 0 时,消息队列满了之后,msgsend 再发消息时会阻塞,否则直接返回结果。

msgsnd正常返回值是 0,否则返回 -1,异常时它会重置一些错误码。

  • EAGAIN 消息队列已经满了
  • EIDRM 消息队列已经被删除
  • EACCESS 无权访问这个消息队列

下面是示例代码

#include <iostream>
#include <sys/msg.h>
#include <string.h>
#include <errno.h>

using namespace std;


struct testmsginfo{
    long type;
    char info[512];
}tmp_msg;


int main(int argc,char** argv){

    key_t msgkey = ftok ("./formsgqueue",11);

    if (msgkey < 0){
        cout << "get msg id faild!" << endl;
        return -1;
    }


    int msgqid = msgget(msgkey,IPC_CREAT|0660);

    if ( msgqid < 0){
        cout << "Create msgqueue failed!" << strerror(errno) << endl;
        return -1;
    }


    tmp_msg.type = 3;
    char* tmp = "Hello msgqueue!\n";


    strcpy(tmp_msg.info,tmp);
    int msglen = sizeof(struct testmsginfo) - 4;

    if (msgsnd(msgqid,(void*)&tmp_msg,msglen,0) < 0){
        cerr << "msg send failed " << strerror(errno) << endl;
        return -1;
    }

    return 0;
}

假设这段代码源文件是 testmsgsend.cpp

g++ -o testsend testmsgsend.cpp
./testsend

执行可执行文件.

然后在终端中输入 ipcs这个命令就可以查看消息是否在内核当中了。

--------- 消息队列 -----------
键        msqid      拥有者  权限     已用字节数 消息
0x0b0815e3 98307      frank      660        516         1

这个可以说明消息队列里面有消息了。

消息队列有了消息之后,我们就可以读取了。

读取消息

通过 msgrcv搞定。

ssize_t msgrcv (int msqid, void *msgp, size_t msgsz,
		       long int msgtyp, int msgflg)
  • msqid 消息队列的 id
  • msgp 消息指针,指的是消息队列中的消息将内容赋值到哪里去
  • msgsz 消息的容量,这个容量是请求读取的容量。
  • msgtype 请求读取的消息类型
  • msgflg 读取标志,取值可能 IPC_NOWAIT、IPC_EXCEPT、IPC_NOERROR
  • IPC_NOWAIT 读取不到消息直接返回
  • IPC_EXECPT 返回消息队列中第一个不是 msgtype 的消息
  • IPC_NOERROR 如果消息队列中的消息容量大于请求的消息容量,则截断

msgrcv 如果异常就会返回 -1,正常的话就会返回读写到的消息容量。

下面是示例代码。

#include <iostream>
#include <sys/msg.h>
#include <string.h>
#include <errno.h>

using namespace std;


struct testmsginfo{
    long type;
    char info[512];
}tmp_msg;


int main(int argc,char** argv){

    key_t msgkey = ftok ("./formsgqueue",11);

    if (msgkey < 0){
        cout << "get msg id faild!" << endl;
        return -1;
    }


    int msgqid = msgget(msgkey,IPC_CREAT|0660);

    if ( msgqid < 0){
        cout << "Create msgqueue failed!" << strerror(errno) << endl;
        return -1;
    }

    cout << "Msgqueue key " <<  msgkey;
    cout << " msgqueue id " << msgqid << endl;

    int msglen = sizeof(struct testmsginfo) - 4;
    if (msgrcv(msgqid,(void*)&tmp_msg,msglen,3,0) < 0){
        cerr << "msg receive failed " << strerror(errno) << endl;
        return -1;
    }

    cout << "Message recv: " << tmp_msg.info << endl;

    return 0;
}

编译代码

g++ testmsgrcv.cpp -o testrcv
./testrcv

结果如下:

Msgqueue key 185079267 msgqueue id 98307
Message recv: Hello msgqueue!

这个说明,msgrcv 可以读取到先前发送的消息了。

有了消息队列中消息的发送和读取能力,那么我们就可以编写进程之间的通信了。

但是,下面还有一个很重要的知识点,那就是对于消息队列的操纵。

消息队列的属性管理

消息队列本身有很多属性需要管理,而 Linux 下提供了系统调用 msgctl进行相应的设置。

而的 Linux 内核中,每一个消息队列都有一个数据结构体来指示它的状态,这个结构体叫做 msqid_ds。

#include <linux/msg.h>

struct msqid_ds {
	struct ipc_perm msg_perm;
	struct msg *msg_first;		/* first message on queue,unused  */
	struct msg *msg_last;		/* last message in queue,unused */
	__kernel_time_t msg_stime;	/* last msgsnd time */
	__kernel_time_t msg_rtime;	/* last msgrcv time */
	__kernel_time_t msg_ctime;	/* last change time */
	unsigned long  msg_lcbytes;	/* Reuse junk fields for 32 bit */
	unsigned long  msg_lqbytes;	/* ditto */
	unsigned short msg_cbytes;	/* current number of bytes on queue */
	unsigned short msg_qnum;	/* number of messages in queue */
	unsigned short msg_qbytes;	/* max number of bytes on queue */
	__kernel_ipc_pid_t msg_lspid;	/* pid of last msgsnd */
	__kernel_ipc_pid_t msg_lrpid;	/* last receive pid */
};
  • msg_perm 比较重要,它标志了消息队列的权限
  • msg_qnum 标志了消息队列中有多少条消息
  • msg_qbytes 标志了消息队列最多能存储的容量

其它的参数就不一一解释了。

#include <linux/ipc.h>

struct ipc_perm
{
	__kernel_key_t	key;
	__kernel_uid_t	uid;
	__kernel_gid_t	gid;
	__kernel_uid_t	cuid;
	__kernel_gid_t	cgid;
	__kernel_mode_t	mode; 
	unsigned short	seq;
};
  • key 是文章前面反复提到过的 key
  • uid 消息队列的用户id
  • gid 消息队列的组id
  • cuid 创建消息队列的那个进程的用户id
  • cgid 创建消息队列的那个进程的组id

熟悉了基本的数据结构后,再看看 msgctl 这个调用

int msgctl (int msqid, int cmd, struct msqid_ds *buf)

msgctl 最核心的参数是 cmd,它代表对与消息队列的操作。

  • msqid 前面介绍的消息队列的 id
  • buf 数据指针,可以用来读取也可以用来赋值,具体看 cmd 取值
  • cmd 取值可能 IPC_STAT、IPC_SET、IPC_RMID
  • IPC_STAT 用来读,读取的结构存放到 buf
  • IPC_SET 用来设置,将 buf 里面的内容设置给消息队列
  • IPC_RMID 从内核中删除 msqid 的消息队列

下面用代码示例,讲解一下如何读取消息队列的状态。

#include <iostream>
#include <sys/msg.h>
#include <string.h>
#include <errno.h>

using namespace std;


struct testmsginfo{
    long type;
    char info[512];
}tmp_msg;


int main(int argc,char** argv){

    key_t msgkey = ftok ("./formsgqueue",11);

    if (msgkey < 0){
        cout << "get msg id faild!" << endl;
        return -1;
    }


    int msgqid = msgget(msgkey,IPC_CREAT|0660);

    if ( msgqid < 0){
        cout << "Create msgqueue failed!" << strerror(errno) << endl;
        return -1;
    }

    cout << "Msgqueue key " <<  msgkey;
    cout << " msgqueue id " << msgqid << endl;

    msqid_ds stats;
    if (msgctl(msgqid,IPC_STAT,&stats) < 0){
        cerr << "msg get statu failed " << strerror(errno) << endl;
        return -1;
    }

    cout << "Message stats  max bytes: " << stats.msg_qbytes << endl;
    cout << "Message stats  current message: " << stats.msg_qnum << endl;

    return 0;
}

编译代码

g++ testmsgstat.cpp -o teststat
./teststat

结果如下:

Msgqueue key 185079267 msgqueue id 98307
Message stats  max bytes: 16384
Message stats  current message: 1

我们获取了当前只有 1 条消息在内核当中,消息队列的数据容量是 16384,所以不能够存太多的消息。

进程间通过消息队列进行通信

前面的内容讲解了如何通过读取消息和发送消息,那么就已经具备了进程间通信的能力了。

问题的关键在于定义好 消息的类型,也就是 msgtype。

比如有两个进程 A 和 B,A 发送的消息可以定义为 1,B 发送的消息可以定义为 2.

那么进程 B 到消息队列读取类型为 1 的消息就好,进程 A 读取类型为 2 的消息。

代码比较简单,读者可以自己去尝试编写一下。

和管道相比,消息队列显得很轻量,因为它不涉及文件的打开与关闭,开销比管道要小,使用更加灵活。并且,内核有介入通信,充当了中间人的角色,进程本身反而更轻松了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券