前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Linux进程间通信 消息队列

Linux进程间通信 消息队列

作者头像
开源519
发布2021-07-30 15:38:22
4.5K0
发布2021-07-30 15:38:22
举报
文章被收录于专栏:开源519开源519

消息队列 是消息的链接表,存储内核中,由消息标识符标识。 --《UNIX环境高级编程》

简单理解,消息队列就是一堆消息的有序集合,并缓存于内核中。如此一来,多个进程就可通过访问内核来实现多个进程之间的通信。目前存在的消息队列有POSIX与System V标准的接口,本篇主要介绍System V接口的使用。

简介

消息队列的本质是位于内核空间的链表,其中每个节点都是一个独立的消息,每个消息都有类型,相同类型的消息组成一个链表。

当各种各样的消息发出时,就如同下图所示排列在内核空间中。形状看成消息的类型,相同的形状则表示相同的消息类型。

这些看似杂乱无章的消息,通过消息队列发出来后,根据其发送的类型与发送的时间,在接收端中则是有规律的排序。

如上图,内核中杂乱无章的消息,接收端可通过消息类型与发送的顺序来逐一接收处理。可通过消息类型查看指定类型的消息,若指定类型为0,则按时间顺序输出所有接收到的消息。

接口

主要用到msgget、msgsnd、msgrcv和msgctl四个接口。其使用方式man手册说明的比较清晰了,这里简单描述一下函数形式及功能。

msgget

代码语言:javascript
复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

主要功能是根据key值获取一个消息队列的ID。msgflag主要有两个值IPC_CREAT 和IPC_EXC,指的是需要新创建消息队列ID。

msgsnd、msgrcv

代码语言:javascript
复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
               int msgflg);

msgsnd与msgrcv主要用于消息队列的发送与接收。这里需要注意的是发送的msgp一般定义为结构体,首个成员为long型,表示消息的类型。如此msgrcv通过指定msgtype来筛选出需要的消息。

msgctl

代码语言:javascript
复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

msgctl是用来控制消息队列的,其中cmd指进行的操作,buf记录了消息队列的信息。cmd:

  • IPC_STAT: 将msg相关的内核信息存储到buf指向的msqid_ds 结构体中。调用者需拥有阅读权限才可读取。
  • IPC_SET:该命令用来设置消息队列的属性,要设置的属性存储在buf指向的msqid结构中;可设置属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes,同时,也影响msg_ctime成员。
  • IPC_RMID:删除msqid标识的消息队列

buf:

代码语言:javascript
复制
struct msqid_ds {
    struct ipc_perm msg_perm;     /* Ownership and permissions */
 time_t          msg_stime;    /* Time of last msgsnd(2) */
 time_t          msg_rtime;    /* Time of last msgrcv(2) */
 time_t          msg_ctime;    /* Time of last change */
 unsigned long   __msg_cbytes; /* Current number of bytes in
         queue (nonstandard) */
 msgqnum_t       msg_qnum;     /* Current number of messages
         in queue */
 msglen_t        msg_qbytes;   /* Maximum number of bytes
         allowed in queue */
 pid_t           msg_lspid;    /* PID of last msgsnd(2) */
 pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
};

如上信息可看到buf中存储了与消息队列相关的属性,设置cmd后,可通过buf拿到这些信息。

实例演示

功能: 用消息队列实现server接client的数据,server可筛选显示指定消息类型的数据。

效果:server接收所有消息:

server 筛选消息类型为2的数据:

注:代码里可将消息类型封装成枚举,此demo作为演示不做过多封装。

总结

消息队列在进程间通信的优势总结起来有以下几点:

  • 缓存:数据较大的消息处理起来时间较长,此时将其写入消息队列更快,待系统空闲时再处理。提高系统任务执行效率。
  • 送达:消息队列存储的消息,会一直保留在队列中直到消息被处理,且被取走后就会被队列释放。因此无论多少个进程在获取,每个消息仅会被处理一次。
  • 排序:消息在队列中一直按照“先入先出”的顺序来执行。因此任务被处理的时序不会错乱。
  • 异步:消息队列因为会缓存消息,且顺序处理不会丢失。因此多个进程可通过消息队列实现异步通信,互不阻塞。

代码

client.cpp

代码语言:javascript
复制
/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name   : client.cpp
* Author      :
* Version     : V1.0
* Description :
* Journal     : 2021-03-19 init v1.0
* Brief       : Blog address: https://blog.csdn.net/qq_38750572?spm=1001.2014.3001.5343
* Others      :
********************************************************************************
*/

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <errno.h>
#include <string.h>
#include "common.h"

const char TEXT[2][50] = {"this is client1!", "this is client2!"};

int main(int argc, char *argv[])
{
    int msg_id, key, ret = 0;
    struct MsgFrame msg_buf = {0, {0}};

    if (argc < 2) {
        PRINT_ERR("usage: %s [msgid]\n", argv[0]);
        goto exit;
    }

    if (strspn(argv[1], "0123456789") != strlen(argv[1])) {
        PRINT_ERR("Params invalid!\n");
        goto exit;
    }

    /* Obtain the standard key according to the file path */
    key = ftok(MSGQ_FILE_PATH, MSGQ_ID);
    if (key < 0) {
        PRINT_ERR("ftok failed! errno = %d(%s)\n", errno, strerror(errno));
        goto exit;
    }

    msg_id = msgget(key, IPC_EXCL);
    if (msg_id < 0) {
        PRINT_ERR("msgget failed! errno = %d(%s)\n", errno, strerror(errno));
        goto exit;
    }

    do {
        memset(&msg_buf, 0x00, sizeof(msg_buf));
#if 0
        if (fgets(msg_buf.buffer, sizeof(msg_buf.buffer), stdin) == NULL) {
            PRINT_ERR("scanf failed! errno = %d(%s)\n", errno, strerror(errno));
            goto exit_msgid;
        }
#else
        msg_buf.type = atoi(argv[1]);
        strncpy(msg_buf.buffer, TEXT[msg_buf.type - 1], strlen(TEXT[msg_buf.type-1]));
#endif
        ret = msgsnd(msg_id, &msg_buf, sizeof(msg_buf.buffer), IPC_NOWAIT);
        if (ret < 0) {
            PRINT_ERR("msgnd failed! errno = %d(%s)\n", errno, strerror(errno));
            goto exit_msgid;
        } else {
            PRINT_INFO("[Send %ld %ld] %s\n", msg_buf.type,
                                        strlen(msg_buf.buffer), msg_buf.buffer);
        }
        sleep(1);
    } while (strncmp(msg_buf.buffer, "end", msg_buf.type) != 0);

exit_msgid:
    ret = msgctl(msg_id, IPC_RMID, 0);
    if (ret < 0) {
        PRINT_ERR("msgctl failed! errno = %d(%s)\n", errno, strerror(errno));
    }
exit:
    return 0;
}

server.cpp

代码语言:javascript
复制
/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name   : server.cpp
* Author      :
* Version     : V1.0
* Description :
* Journal     : 2021-03-21 init v1.0
* Brief       : Blog address: https://blog.csdn.net/qq_38750572?spm=1001.2014.3001.5343
* Others      :
********************************************************************************
*/

#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <signal.h>
#include <errno.h>
#include <string.h>
#include "common.h"

static int msg_id;

static void SignalHandler(int sig)
{
    switch (sig)
    {
      case SIGSTOP:
        if (msg_id != 0) {
            msgctl(msg_id, IPC_RMID, 0);
        }
        break;
      default:
        break;
    }
}

int main(int argc, char *argv[])
{
    int key, ret = 0;
    long msg_type = 0;
    struct MsgFrame msg_recv = {0, {0}};

    signal(SIGSTOP, SignalHandler);

    if (argc == 1) {
       msg_type = 0;
    } else if (argc == 2) {
        if (strspn(argv[1], "0123456789") != strlen(argv[1])) {
            PRINT_ERR("Params invalid!\n");
            goto exit;
        } else {
            msg_type = atoi(argv[1]);
            //PRINT_INFO("Receive msg type is %ld\n", msg_type);
        }
    } else {
        PRINT_ERR("Params invalid\n");
        goto exit;
    }

    key = ftok(MSGQ_FILE_PATH, MSGQ_ID);
    if (key < 0)
    {
        PRINT_ERR("ftok failed! errno = %d(%s)\n", errno, strerror(errno));
        goto exit;
    }

    msg_id = msgget(key, IPC_CREAT|0666);
    if (msg_id < 0) {
        PRINT_ERR("msgget failed! errno = %d(%s)\n", errno, strerror(errno));
        goto exit;
    }

    do {
        memset(msg_recv.buffer, 0x00, MAX_SIZE);
        ret = msgrcv(msg_id,(void*)&msg_recv, MAX_SIZE, msg_type, 0);
        if (ret != -1) {
            PRINT_INFO("[Receive %ld %ld] %s\n",
                       msg_recv.type, strlen(msg_recv.buffer), msg_recv.buffer);
        } else {
            PRINT_ERR("msgrcv failed!\n");
        }
    } while (1);

exit:
    PRINT_INFO("Exit from %s\n", __func__);
    return 0;
}

最后

用心感悟,认真记录,写好每一篇文章,分享每一框干货。愿每一篇文章不负自己,不负看客!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-07-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源519 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 接口
    • msgget
      • msgsnd、msgrcv
        • msgctl
        • 实例演示
        • 总结
        • 代码
          • 最后
          相关产品与服务
          命令行工具
          腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档