linux网络编程之POSIX 消息队列 和 系列函数

一、在前面介绍了system v 消息队列的相关知识,现在来稍微看看posix 消息队列。

posix消息队列的一个可能实现如下图:

其实消息队列就是一个可以让进程间交换数据的场所,而两个标准的消息队列最大的不同可能只是api 函数的不同,如system v 的系列函数是msgxxx,而posix 是mq_xxx。posix 消息队列也有一些对消息长度等的限制,man 7 mq_overview:

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/msg_max  10 simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/msgsize_max  8192 simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/queues_max  256

即一个消息队列最多能有10条消息,每条消息的最大长度为8192字节,一个系统最多能有256个消息队列。

还有一点是,在Linux上,posix 消息队列是以虚拟文件系统实现的,必须将其挂载到某个目录才能看见,如

           # mkdir /dev/mqueue            # mount -t mqueue none /dev/mqueue

通过cat 命令查看消息队列的状态,假设mymq 是创建的一条消息队列的名字            $ cat /dev/mqueue/mymq            QSIZE:129     NOTIFY:2    SIGNO:0    NOTIFY_PID:8260

QSIZE:消息队列所有消息的数据长度

NOTIFY_PID:某个进程使用mq_notify 注册了消息到达异步通知事件,即此进程的pid

NOTIFY:通知方式: 0 is SIGEV_SIGNAL; 1 is SIGEV_NONE; and 2 is SIGEV_THREAD.

SIGNO:当以信号方式通知的时候,表示信号的编号.

二、系列函数,编译时候加上 -lrt 选项,即连接librt 库 (实时库)

      #include <fcntl.h>           /* For O_* constants */       #include <sys/stat.h>        /* For mode constants */       #include <mqueue.h>

功能:用来创建和访问一个消息队列 原型 mqd_t mq_open(const char *name, int oflag); mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr); 参数 name: 某个消息队列的名字,必须以/打头,并且后续不能有其它/ ,形如/somename长度不能超过NAME_MAX(255) oflag:与open函数类似,可以是O_RDONLY、O_WRONLY、O_RDWR,还可以按位或上O_CREAT、O_EXCL、O_NONBLOCK; mode:如果oflag指定了O_CREAT,需要设置mode。 返回值:成功返回消息队列文件描述符;失败返回-1

功能:关闭消息队列 原型 mqd_t mq_close(mqd_t mqdes); 参数 mqdes : 消息队列描述符 返回值:成功返回0;失败返回-1

功能:删除消息队列 原型 mqd_t mq_unlink(const char *name); 参数 name: 消息队列的名字 返回值:成功返回0;失败返回-1

功能:获取/设置消息队列属性 原型 mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr); mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); 返回值:成功返回0;失败返回-1

           struct mq_attr {                long mq_flags;       /* Flags: 0 or O_NONBLOCK */                long mq_maxmsg;      /* Max. # of messages on queue */                 long mq_msgsize;     /* Max. message size (bytes) */                long mq_curmsgs;     /* # of messages currently in queue */            };

mq_flags 是标志;mq_maxmsg 即一个消息队列消息个数上限;mq_msgsize即一条消息的数据上限;mq_curmsgs即当前消息个数

功能:发送消息 原型 mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); 参数 mqdes:消息队列描述符 msg_ptr:指向消息的指针 msg_len:消息长度 msg_prio:消息优先级 返回值:成功返回0;失败返回-1

功能:接收消息 原型 ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); 参数 mqdes:消息队列描述符 msg_ptr:返回接收到的消息 msg_len:消息长度,一般需要指定为msgsize_max msg_prio:返回接收到的消息优先级 返回值:成功返回接收到的消息字节数;失败返回-1 注意:返回指定消息队列中最高优先级的最早消息,优先级最低为0

功能:建立或者删除消息到达通知事件 原型 mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification); 参数 mqdes:消息队列描述符 notification: 非空表示当消息到达且消息队列先前为空,那么将得到通知; NULL表示撤消已注册的通知 返回值:成功返回0;失败返回-1 通知方式: 产生一个信号 创建一个线程执行一个指定的函数

union sigval {          /* Data passed with notification */ int     sival_int;/* Integer value */ void   *sival_ptr;/* Pointer value */ }; struct sigevent { int           sigev_notify; /* Notification method */ int           sigev_signo; /* Notification signal */ union sigval sigev_value; /* Data passed with notification */ void (*sigev_notify_function) (union sigval); /* Function for thread notification */ void *sigev_notify_attributes; /* Thread function attributes */ };

sigev_notify 的取值有3个:

SIGEV_NONE:消息到达不会发出通知

SIGEV_SIGNAL:以信号方式发送通知,当设置此选项时,sigev_signo 设置信号的编号,且只有当信号为实时信号时才可以通过sigev_value顺带数据,参考这里

SIGEV_THREAD:以线程方式通知,当设置此选项时,sigev_notify_function 即一个函数指针,sigev_notify_attributes 即线程的属性

mq_notify 函数注意点:

1、任何时刻只能有一个进程可以被注册为接收某个给定队列的通知 2、当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出。 3、当通知被发送给它的注册进程时,其注册被撤消。进程必须再次调用mq_notify以重新注册(如果需要的话),重新注册要放在从消息队列读出消息之前而不是之后。

下面写两个小程序测试一下:

mq_send.c

#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<unistd.h>
#include<errno.h>
#include<mqueue.h>
#include<fcntl.h>
#include<sys/stat.h>
#include<string.h>

#define ERR_EXIT(m) \
    do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)

typedef struct stu
{
    char name[32];
    int age;
} STU;

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        fprintf(stderr, "Usage: %s <prio>\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    mqd_t mqid;
    mqid = mq_open("/abc", O_WRONLY);
    if (mqid == (mqd_t) - 1)
        ERR_EXIT("mq_open");
    printf("mq_open succ\n");

    STU stu;
    strcpy(stu.name, "test");
    stu.age = 20;
    unsigned int prio = atoi(argv[1]);
    mq_send(mqid, (const char *)&stu, sizeof(stu), prio);
    mq_close(mqid);

    return 0;
}

mq_notify.c

#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<unistd.h>
#include<errno.h>
#include<mqueue.h>
#include<fcntl.h>
#include<sys/stat.h>
#include<string.h>
#include<signal.h>

#define ERR_EXIT(m) \
    do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)

typedef struct stu
{
    char name[32];
    int age;
} STU;

size_t size;

mqd_t mqid;

struct sigevent sigev;

void handle(int sig)
{
    mq_notify(mqid, &sigev);
    STU stu;
    unsigned int prio;
    mq_receive(mqid, (char *)&stu, size, &prio);
    printf("name=%s, age=%d, prio=%d\n", stu.name, stu.age, prio);

}


int main(int argc, char *argv[])
{
    mqid = mq_open("/abc", O_RDONLY);
    if (mqid == (mqd_t) - 1)
        ERR_EXIT("mq_open");
    printf("mq_open succ\n");

    struct mq_attr attr;
    mq_getattr(mqid, &attr);
    size = attr.mq_msgsize;

    signal(SIGUSR1, handle);

    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;

    mq_notify(mqid, &sigev);

    for (; ;)
        pause();

    mq_close(mqid);

    return 0;
}

mq_open.c

#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<unistd.h>
#include<errno.h>
#include<mqueue.h>
#include<fcntl.h>
#include<sys/stat.h>

#define ERR_EXIT(m) \
    do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)

int main(void)
{
    mqd_t mqid;
    mqid = mq_open("/abc", O_CREAT | O_RDWR, 0666, NULL);
    if (mqid == (mqd_t) - 1)
        ERR_EXIT("mq_open");
    mq_close(mqid);
    return 0;
}

先运行./mq_open 用mq_open 创建了一个消息队列并mount 到/dev/mqueue 上,可以查看状态:

simba@ubuntu:/dev/mqueue$ cat /dev/mqueue/abc  QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0  

接着运行./mq_notify 再查看状态:

simba@ubuntu:/dev/mqueue$ cat /dev/mqueue/abc  QSIZE:0          NOTIFY:0     SIGNO:10    NOTIFY_PID:3572 

准备通知的信号是10号,即SIGUSR1,等待的进程pid即./mq_notify ,此时./mq_notify 进程是阻塞的,正在等待其他进程往消息队列写入消息,现在运行./mq_send

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send  Usage: ./mq_send <prio> simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send 1 mq_open succ simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send 1 mq_open succ simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ 

回头看./mq_notify 的输出:

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_notify  mq_open succ name=test, age=20, prio=1 name=test, age=20, prio=1

...........................................

即./mq_send 发送的两条消息都接收到了,需要注意的是虽然通知是一次性的,但我们在消息处理函数再次注册了通知,故能多次接收到通知,但通知只发生在消息队列由空到不空的时候,如果先运行./mq_send 先往消息队列发送了n条消息,那么执行./mq_notify 是不会接收到通知的,一直阻塞着。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏惨绿少年

Redis 数据库

1.1 Redis简介 ? 1.1.1 介绍 Redis是一个使用ANSI C编写的开源、支持网络、基于内存、可选持久性的键值对(key-value)存储数据库...

55013
来自专栏Kirito的技术分享

研究优雅停机时的一点思考

最近瞥了一眼项目的重启脚本,发现运维一直在使用 kill-9<pid> 的方式重启 springboot embedded tomcat,其实大家几乎一致认为...

1K6
来自专栏idba

轻量级分布式任务调度系统-RQ

一 前言 Redis Queue 一款轻量级的P分布式异步任务队列,基于Redis作为broker,将任务存到redis里面,然后在后台执行指定...

1243
来自专栏吴伟祥

Redis分布式锁的正确实现方式(Java版)

分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis...

602
来自专栏大内老A

[WCF安全系列]消息的保护等级[上篇]

到目前为止,对于WCF安全传输的三个方面,我们已经对认证进行了详细的介绍,现在我们来关注另外两个话题:消息的一致性和机密性,两者又统称为消息保护(Message...

18310
来自专栏腾讯NEXT学位

Nodejs探秘:深入理解单线程实现高并发原理

为什么单线程的nodejs可以支持高并发呢?很多人都不明白其原理,自己也在很长一段时间内被这些概念搞的是云里雾里。下面我们就来一步一步揭开其神秘的面纱。

4842
来自专栏大内老A

WCF服务端运行时架构体系详解[上篇]

WCF的服务端架构体系又可以成为服务寄宿端架构体系。我们知道,对于一个基于某种类型的服务进行寄宿只需要使用到一个唯一的对象,那就是ServiceHost。甚至在...

1819
来自专栏L宝宝聊IT

Mysql性能优化——慢查询分析

MYSQL数据库是常见的两个瓶颈是CPU和I/O的瓶颈,CPU在饱和的时候一般发生在数据装入内存或从磁盘上读取数据时候。磁盘I/O瓶颈发生在装入...

752
来自专栏前端小叙

koa2入门学习

koa模块 koa-route 路由 route.get("路径",路由函数) koa-static 静态资源加载     const serve(路径) k...

3348
来自专栏一名叫大蕉的程序员

关于Java健壮性的一些思考与实践 No.102

上面两种模式都可以实现标准的 response 的封装,那么具体要封装哪些东西呢?其实最主要的就是统一的 try catch,防止出现任何的 500 错误给到调...

792

扫码关注云+社区