linux网络编程之System V 消息队列(二):消息队列实现回射客户/服务器和 msgsnd、msgrcv 函数

一、msgsnd 和 msgrcv 函数

  #include <sys/types.h>   #include <sys/ipc.h>   #include <sys/msg.h>

功能:把一条消息添加到消息队列中 原型 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); 参数 msgid: 由msgget函数返回的消息队列标识码 msgp:是一个指针,指针指向准备发送的消息结构体 msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的那个long int长整型 msgflg:控制着当前消息队列满或到达系统上限时将要发生的事情 返回值:成功返回0;失败返回-1

msgflg=IPC_NOWAIT表示队列满不等待,返回EAGAIN错误。为0表示阻塞等待 消息结构在两方面受到制约。首先,它的具体数据必须小于系统规定的上限值MSGMAX;其次,它必须以一个long int长整数开始,接收者函数将利用这个长整数确定消息的类型。

消息结构参考形式如下: struct msgbuf { long  mtype; char mtext[1]; };

The  mtext  field  is an array (or other structure) whose size is specified by msgsz, a nonnegative integer value.Messages of zero length (i.e., no mtext field) are permitted. 

即mtex 这块区域可以是个数组或者结构体,大小由参数msgsz 指明。

功能:是从一个消息队列接收消息 原型 ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); 参数 msgid: 由msgget函数返回的消息队列标识码 msgp:是一个指针,指针指向准备接收的消息结构体 msgsz:是msgp指向的最大消息长度,这个长度不含保存消息类型的那个long int长整型 msgtype:它可以实现接收优先级的简单形式 msgflg:控制着队列中没有相应类型的消息可供接收时将要发生的事 返回值:成功返回实际放到接收缓冲区里去的字符个数,失败返回-1

msgtype=0返回队列第一条信息 msgtype>0返回队列第一条类型等于msgtype的消息  msgtype<0返回队列第一条类型小于等于msgtype绝对值的消息,并且是满足条件的消息类型最小的消息 msgflg=IPC_NOWAIT,队列没有可读消息不等待,返回ENOMSG错误。 msgflg=MSG_NOERROR,消息大小超过msgsz时被截断 msgtype>0且msgflg=MSG_EXCEPT,接收类型不等于msgtype的第一条消息。

二、消息队列实现回射客户/服务器

在前面的系列文章中,我们都是使用socket 套接字来实现回射客户/服务器程序,现在尝试使用消息队列来实现,主要就是利用上面介绍的两个函数msgsnd,msgrcv 。

对于服务器端来说,接收到一个消息结构体的类型如果为1,表示是客户请求,而mtex 字段的前4个字节存放着不同进程的pid ,后续字节才是真正的数据,服务器回射客户端时,将pid 作为类型,mtex 为实际数据,客户端只接收对应类型的数据,故可以区分不同客户端。

程序如下:

echoser.c

#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>

#define ERR_EXIT(m) \
    do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)

#define MSGMAX 8192
struct msgbuf
{
    long mtype;
    char mtext[MSGMAX];
};


void echo_ser(int msgid)
{
    struct msgbuf msg;
    memset(&msg, 0, sizeof(msg));
    int nrcv = 0;
    while (1)
    {

        if ((nrcv = msgrcv(msgid, &msg, MSGMAX, 1, 0)) < 0);
        int pid = *((int *)msg.mtext);
        fputs(msg.mtext + 4, stdout);
        msg.mtype = pid;
        msgsnd(msgid, &msg, nrcv, 0);
        memset(&msg, 0, sizeof(msg));

    }
}

int main(int argc, char *argv[])
{
    int msgid;
    msgid = msgget(1234, IPC_CREAT | 0666);
    if (msgid == -1)
        ERR_EXIT("msgget");

    echo_ser(msgid);


    return 0;
}

echocli.c

#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>

#define ERR_EXIT(m) \
    do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)

#define MSGMAX 8192

struct msgbuf
{
    long mtype;
    char mtext[MSGMAX];
};

void echo_cli(int msgid)
{
    int nrcv;
    int pid = getpid();
    struct msgbuf msg;
    memset(&msg, 0, sizeof(msg));
    msg.mtype = 1;
    *((int *)msg.mtext) = pid;
    while (fgets(msg.mtext + 4, MSGMAX, stdin) != NULL)
    {

        if (msgsnd(msgid, &msg, 4 + strlen(msg.mtext + 4), IPC_NOWAIT) < 0)
            ERR_EXIT("msgsnd");

        memset(msg.mtext + 4, 0, MSGMAX - 4);
        if ((nrcv = msgrcv(msgid, &msg, MSGMAX, pid, 0)) < 0)
            ERR_EXIT("msgsnd");
        fputs(msg.mtext + 4, stdout);
        memset(msg.mtext + 4, 0, MSGMAX - 4);

    }
}

int main(int argc, char *argv[])
{

    int msgid;
    msgid = msgget(1234, 0);
    if (msgid == -1)
        ERR_EXIT("msgget");

    echo_cli(msgid);

    return 0;
}

程序逻辑不复杂,就不多说了,编译运行服务器端,再开两个客户端,可以看到正常回射输出。

但上述程序是存在死锁的风险的,当开了多个客户端,将队列写满了,此时服务器端想要写入就会阻塞,而因为客户端一旦发送了数据就阻塞等待服务器端回射类型为pid的消息,即队列的消息不会减少,此时就会形成死锁,即使服务器端是非阻塞地写入,此时会返回EAGAIN 的错误,程序逻辑来说我们也会使其不断地尝试去写入,而不是粗暴地将其退出进程,这样还是会死锁。

对此问题可以多开几个私有的队列进行服务,如下:

即某个客户端先创建一个私有消息队列,然后将私有消息队列标识符和具体数据发到共享的队列,服务器fork 出一个子进程,此时根据私有队列标识符就可以将数据回射到这个队列,这个客户端就可以从私有队列读取到回射的数据。

参考:

《UNP》

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java 成神之路

NIO 之 Channel

34613
来自专栏java一日一条

java高并发锁的3种实现

提到锁,大家可能都会想到synchronized关键字,使用它的确可以解决一切并发问题,但是对于系统吞吐要求更高的,在这里提供了几个小技巧,帮助大家减小锁粒度,...

1193
来自专栏菩提树下的杨过

java: web应用中不经意的内存泄露

前面有一篇讲解如何在spring mvc web应用中一启动就执行某些逻辑,今天无意发现如果使用不当,很容易引起内存泄露,测试代码如下: 1、定义一个类App ...

2109
来自专栏逆向技术

内核开发知识第二讲,编写Kerner 程序中注意的问题.

什么是函数多线程安全. 简单来说就是 ,一个函数在调用过程中.还没有返回的时候.再次被其他线程调用了.但是函数执行的结果是可靠的.就可以了说这个函数是安全的.

903
来自专栏猿人谷

unix共享内存要点

共享内存优点:     1.在进程之间不通过内核传递数据,即不通过系统调用拷贝数据,达到快速,高效的数据传输。     2.随内核持续     *nix的共享内...

16910
来自专栏技术记录

RunTime.getRuntime().exec()运行脚本命令介绍和阻塞

 java在企业级项目开发中,无论是强制性的功能需要,还是为了简便java的实现,需要调用服务器命令脚本来执行。在java中,RunTime.getRuntim...

2579
来自专栏Seebug漏洞平台

Jenkins 未授权远程代码执行漏洞(CVE-2017-1000353)

漏洞概要 Jenkins 未授权远程代码执行漏洞, 允许攻击者将序列化的Java SignedObject对象传输给Jenkins CLI处理,反序列化Obj...

3656
来自专栏服务端技术杂谈

Motan源码阅读--ShutDownHook使用

任何一个中间件系统,都需要有个“平滑部署,平滑下线”的功能。 如果基于Java开发,往往采用ShutDownHook去做这件事情。 比如我们在tomcat关闭时...

752
来自专栏与神兽党一起成长

封装一个FTP工具类

前人的代码中把FTP操作和业务逻辑实现耦合在一起,据说经过多次的修改,在性能表现方面已经非常靠谱。 在原来的代码中可以看到使用了commons-net进行FTP...

1214
来自专栏Linux驱动

45.INIT_WORK()工作队列使用

中断中通过调用schedule_work(work)来通知内核线程,然后中断结束后,再去继续执行work对应的func函数

611

扫码关注云+社区