【从浅学到熟知Linux】进程间通信之消息队列(含消息队列的特点,msgget/msgsnd/msgrcv/msgtrl接口使用详解、消息队列实现Client&Server)
CSDN 2024-08-24 16:07:04 阅读 72
🏠关于专栏:Linux的浅学到熟知专栏用于记录Linux系统编程、网络编程等内容。
🎯每天努力一点点,技术变化看得见
文章目录
消息队列的引入消息队列接口及使用示例msggetmsgsnd&&msgrcvmsgctl
消息队列的特点消息队列——实现Client&Server
消息队列的引入
在我们之前的讨论的匿名管道、命名管道、共享内存中,我们可以不可以实现两个通信进程向同一个共享资源进行写入或读取呢?答案是否定的。从专栏前几篇文章介绍可知,上述三种方式主要用于单向通信,如果通过一些控制策略,则可以实现半双工通信。
为了实现两个进程互相收发消息时使用同一资源,我们就有了消息队列。消息队列是一种在不同进程或者不同系统之间进行通信的机制,它是一种存放消息的容器,发送方向队列中发送消息,接收方从队列中接收消息。下面是消息队列的大致原理:
创建消息队列:首先需要创建一个消息队列,通常由消息队列服务程序负责管理,进程可以通过调用相关API来进行消息队列的创建。
发送消息:发送方将待发送的消息写入到消息队列中,消息包括消息类型和消息。发送方通过指定消息队列的标识符和消息类型来发送消息。
接收消息:接收方从消息队列中获取消息,根据消息类型进行消息过滤或选择性接收。接收方通过指定消息队列的标识符和消息类型来接收消息。
系统中可能存在大量的消息队列,内核需要使用如下数据结构对消息队列进行管理↓↓↓
<code>struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long __msg_cbytes; /* Current number of bytes in
queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages
in queue */
msglen_t msg_qbytes; /* Maximum number of bytes
allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
struct ipc_perm {
key_t __key; /* Key supplied to msgget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};
消息队列接口及使用示例
msgget
第一个参数key需要使用ftok函数接口生成,这个函数具体用法在[共享内存传送门]文章中有介绍。第二个参数可以选择一下几种常用选项↓↓↓
msgflg取值 | 描述 |
---|---|
IPC_CRAET | 创建共享内存时,如果共享内存已经已经存在,获取已经存在的共享内存;不存在则创建并返回 |
IPC_EXCL | 需要与IPC_CREAT组合使用,单独使用没有意义。如果带创建的共享内存存在,则出错返回;如果不存在,则创建并返回对应的共享内存 |
作为读写资源的一种,消息队列也有自己的读写权限,可以通过上述第二个参数与运算对应的权限即可,如第二个参数可以填写<code>IPC_CREAT | 0666(其中0666就表示设置的权限)。
下面代码创建了一个消息队列↓↓↓
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int main()
{
key_t key = ftok("/home/xiaoming", 777);
int msgid = msgget(key, IPC_CREAT | 0666);
return 0;
}
我们可以借助ipcs -q
查看系统中存在的消息队列,下图为创建前后创建后的情况↓↓↓
<code>ipcs -q的各个字段中,key是我们创建消息队列时传入的,msqid是系统自动生成的标识消息队列唯一性的符号,owner是该消息队列的所有者,perms是该消息队列的权限,used_types表示消息队列已经使用的空间大小,messages表示消息队列中的消息数量。
msgsnd&&msgrcv
在msgsnd和msgrcv中,第二个参数需要传入一个形式如下所示的结构体,这个结构体需要用户自己定义(msgsnd用该结构体定义要发送的数据,msgrcv用该结构体接收消息队列中的数据)↓↓↓
<code>The msgp argument is a pointer to caller-defined structure of the following general form:
struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
上述中的mtext表示要发送的数据,因而mtext的数组大小由用户自己指定;而mtype用于标识数据,例如:A进程只读取mtype为1的数据,B进程只读取mtype为2的数据,即mtype标识这个数据要发送给哪些进程。
msgsnd的第一个参数表示向哪一个消息队列中发送,需要传入能够标识队列唯一性的msgid;第三个参数msgsz用于标识msgbuf中的mtext的大小,即数据的大小;msgsnd中的msgflg的取值及含义如下表所示↓↓↓
msgflg取值 | 含义 |
---|---|
0 | 默认行为,阻塞等待消息队列,直到消息队列中有空间 |
IPC_NOWAIT | 如果队列已满,不会阻塞等待,而是会返回错误码EAGAIN(设置在errno中) |
msgrcv的第一个参数也是用于表示向哪一个消息队列中发送,需要传入具有唯一性的msgid;第三个参数msgsz表示用于接收数据的msgbuf中的mtext的大小,即接收缓冲区的大小(如果接收缓冲区的大小小于发送过来的数据,则会出错);第四个参数msgtyp表示只接收消息队列中msgbuf的mtype字段为msgtyp的数据;msgrcv中的msgflg的取值及含义如下表所示↓↓↓
msgflg取值 | 含义 |
---|---|
0 | 默认行为,阻塞等待消息队列,直到消息队列中有满足需求的数据 |
IPC_NOWAIT | 非阻塞等待消息队列,如果队列中没有对应需求的数据,则errno设置为ENOMSG,并返回 |
MSG_EXCEPT | 接收队列中msgbuf中的mtype不为msgtyp的数据 |
MSG_NOERROR | 如果数据的长度长于接收方的msgbuf的mtext大小,则会发生截断,余下数据被丢弃 |
msgsnd.c
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MAX_SIZE 1024
struct msgbuf
{
long mtype;
char mtext[MAX_SIZE];
};
int main()
{
key_t key = ftok("/home/xiaoming", 777);
int msgid = msgget(key, IPC_CREAT | 0666);
char *sendmsg = "Jammingpro";
struct msgbuf buffer;
buffer.mtype = 1;
snprintf(buffer.mtext, MAX_SIZE, "%s", sendmsg);
msgsnd(msgid, &buffer, MAX_SIZE, 0);
return 0;
}
msgrcv.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MAX_SIZE 1024
struct msgbuf
{
long mtype;
char mtext[MAX_SIZE];
};
int main()
{
key_t key = ftok("/home/xiaoming", 777);
int msgid = msgget(key, IPC_CREAT | 0666);
struct msgbuf buffer;
if (msgrcv(msgid, &buffer, MAX_SIZE, 1, 0) == -1)
{
perror("msgcrv error");
exit(0);
}
printf("%s\n", buffer.mtext);
return 0;
}
msgctl
msgctl的第一个参数表示对哪一个消息队列进行操作,第二个参数的取值和描述如下标所示↓↓↓
cmd取值 | 含义 |
---|---|
IPC_STAT | 获取消息队列的状态 |
IPC_SET | 设置消息队列的属性 |
IPC_RMID | 从系统中移除消息队列 |
系统中管理消息队列时,维护了一个msgid_ds的结构,如果需要设置或获取某个新消息队列,需要创建一个struct msqid_ds类型的变量,将其传入第三个参数中。
下面代码演示了如何获取消息队列的状态↓↓↓
msgctl-stat.c
<code>#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int main()
{
key_t key = ftok("/home/xiaoming", 888);
int msgid = msgget(key, IPC_CREAT | 0666);
sleep(2);
struct msqid_ds mqs;
memset(&mqs, 0, sizeof(mqs));
msgctl(msgid, IPC_STAT, &mqs);
printf("num: %d\n", mqs.msg_qnum);
printf("bytes: %d\n", mqs.msg_qbytes);
return 0;
}
下面代码演示了如何设置消息队列的状态↓↓↓
msgctl-set.c
<code>#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int main()
{
key_t key = ftok("/home/xiaoming", 888);
int msgid = msgget(key, IPC_CREAT | 0666);
sleep(2);
struct msqid_ds mqs;
memset(&mqs, 0, sizeof(mqs));
mqs.msg_perm.mode = 0444;
msgctl(msgid, IPC_SET, &mqs);
return 0;
}
下面代码演示了如何删除消息队列↓↓↓
msgctl-rm.c
<code>#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int main()
{
key_t key = ftok("/home/xiaoming", 888);
int msgid = msgget(key, IPC_CREAT);
msgctl(msgid, IPC_RMID, NULL);
return 0;
}
消息队列的特点
异步通信:发送者和接收者之间的通信是异步的,发送者可以继续执行其他操作而不用等待接收者处理消息。
独立性:消息队列是独立于发送者和接收者的,发送者和接收者之间可以是不同的进程,甚至可以在不同的计算机上。
顺序性保证:消息队列通常是一种先进先出(FIFO)的数据结构,保证消息的顺序性。
灵活性:消息队列可以传递多种类型的数据,而且消息的大小通常也比较灵活。
缓冲能力:消息队列可以作为缓冲区,当接收者暂时无法处理消息时,消息可以先存储在消息队列中,等待接收者处理。
可靠性:消息队列通常会提供一定的机制来确保消息的可靠性,例如消息确认和消息重发机制。
支持半双工通信:相比于匿名/命名管道及共享内存的单向通信特征,消息队列具备半双工通信的能力。
消息队列——实现Client&Server
由于struct msgbuf中的mtype可以标识向那个进程发送消息。假设Server接收mtype为1的数据↓↓↓
Com.hpp
<code>#pragma once
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
const std::string PATH = "/home/xiaoming";
const int PROJ_ID = 999;
const int BUFF_SIZE = 1024;
enum
{
MSG_CREAT_ERR = 1,
MSG_GET_ERR,
MSG_DELETE_ERR
};
int CreateMsg()
{
key_t key = ftok(PATH.c_str(), PROJ_ID);
int msgid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);
if (msgid < 0)
{
perror("msg create error");
exit(MSG_CREAT_ERR);
}
return msgid;
}
int GetMsg()
{
key_t key = ftok(PATH.c_str(), PROJ_ID);
int msgid = msgget(key, IPC_CREAT | 0666);
if (msgid < 0)
{
perror("msg get error");
exit(MSG_GET_ERR);
}
return msgid;
}
Server.cc
#include "Com.hpp"
int main()
{
struct msgbuf buffer;
int msgid = CreateMsg();
while (true)
{
msgrcv(msgid, &buffer, BUFF_SIZE, 1, 0);
std::cout << "Client say@ " << buffer.mtext << std::endl;
}
return 0;
}
Client.cc
#include "Com.hpp"
int main()
{
int msgid = GetMsg();
struct msgbuf buffer;
buffer.mtype = 1;
while (true)
{
std::cout << "Says # ";
std::string s;
std::getline(std::cin, s);
strcpy(buffer.mtext, s.c_str());
msgsnd(msgid, &buffer, BUFF_SIZE, 0);
}
return 0;
}
🎈欢迎进入从浅学到熟知Linux专栏,查看更多文章。
如果上述内容有任何问题,欢迎在下方留言区指正b( ̄▽ ̄)d
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。