【Linux】进程间通信(匿名管道)

秦jh_ 2024-10-22 08:37:02 阅读 66

 🌈个人主页:秦jh__https://blog.csdn.net/qinjh_?spm=1010.2135.3001.5343

🔥 系列专栏:https://blog.csdn.net/qinjh_/category_12625432.html

9efbcbc3d25747719da38c01b3fa9b4f.gif

目录

进程间通信目的 

进程间通信发展

 进程间通信分类

管道

System V IPC 

POSIX IPC 

管道 

匿名管道

使用管道通信的demo 

 进程池实现


前言

💬 hello! 各位铁子们大家好哇。

             今日更新了Linux进程间通信的内容

🎉 欢迎大家关注🔍点赞👍收藏⭐️留言📝

进程间通信目的 

数据传输:一个进程需要将它的数据发送给另一个进程资源共享:多个进程之间共享同样的资源。通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。 

进程间通信发展

管道System V进程间通信POSIX进程间通信

进程间通信的前提:先让不同的进程,看到同一份(操作系统)资源(”一段内存“)。

进程间通信一定是某一个进程先需要通信,让OS创建一个共享资源。此时OS必须提供很多系统调用。

OS创建的共享资源的不同,系统调用接口也就不同,所以进程间通信会有不同的种类。

 进程间通信分类

管道

匿名管道pipe命名管道 

System V IPC 

System V 消息队列System V 共享内存System V 信号量 

POSIX IPC 

消息队列共享内存信号量互斥量条件变量读写锁 

管道 

匿名管道

一个进程将同一个文件打开两次,一次以写方式打开,另一次以读方式打开。此时会创建两个struct file,而文件的属性会共用,不会额外创建。

如果此时又创建了子进程,子进程会继承父进程的文件描述符表,指向同一个文件。我们把上面父子进程都看到的文件,叫管道文件。

管道只允许单向通信。

管道里的内容不需要刷新到磁盘。

未来要用父进程写,子进程读的话,在fork之后,各自关闭掉不用的文件描述符即可。 不用的描述符建议关闭,因为未来可能会误用,或者导致文件描述符泄露。

功能:创建匿名管道

参数:

pipefd[2]:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端。它是输出型参数。

返回值:成功返回0,失败返回错误代码

使用管道通信的demo 

 上图是创建管道,pipe的使用的例子。

下面是测试的完整代码:

<code>#include <iostream>

#include <string>

#include <cerrno> // errno.h

#include <cstring> // string.h

#include <unistd.h>

#include <sys/types.h>

#include <sys/wait.h>

const int size = 1024;

std::string getOtherMessage()

{

static int cnt = 0;

std::string messageid = std::to_string(cnt);

cnt++;

pid_t self_id = getpid();

std::string stringpid = std::to_string(self_id);

std::string message = "messageid: ";

message += messageid;

message += " my pid is : ";

message += stringpid;

return message;

}

// 子进程进行写入

void SubProcessWrite(int wfd)

{

int pipesize = 0;

std::string message = "father, I am your son prcess!";

char c = 'A';

while (true)

{

std::cerr << "+++++++++++++++++++++++++++++++++" << std::endl;

std::string info = message + getOtherMessage(); // 这条消息,就是我们子进程发给父进程的消息

write(wfd, info.c_str(), info.size()); // 写入管道的时候,没有写入\0,

std::cerr << info << std::endl;

// sleep(1); // 子进程写慢一点

// write(wfd, &c, 1);

// std::cout << "pipesize: " << ++pipesize << " write charator is : "<< c++ << std::endl;

// // if(c == 'G') break;

// sleep(1);

}

std::cout << "child quit ..." << std::endl;

}

// 父进程进行读取

void FatherProcessRead(int rfd)

{

char inbuffer[size]; // c99 , gnu g99

while (true)

{

sleep(2);

std::cout << "-------------------------------------------" << std::endl;

// sleep(500);

ssize_t n = read(rfd, inbuffer, sizeof(inbuffer) - 1);

if (n > 0)

{

inbuffer[n] = 0; // == '\0'

std::cout << inbuffer << std::endl;

}

else if (n == 0)

{

// 如果read的返回值是0,表示写端直接关闭了,我们读到了文件的结尾

std::cout << "client quit, father get return val: " << n << " father quit too!" << std::endl;

break;

}

else if(n < 0) //读取失败

{

std::cerr << "read error" << std::endl;

break;

}

// sleep(1);

// break;

}

}

int main()

{

// 1. 创建管道

int pipefd[2];

int n = pipe(pipefd); // 输出型参数,rfd, wfd

if (n != 0)

{

std::cerr << "errno: " << errno << ": "

<< "errstring : " << strerror(errno) << std::endl;

return 1;

}

std::cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << std::endl;

sleep(1);

// 2. 创建子进程

pid_t id = fork();

if (id == 0)

{

std::cout << "子进程关闭不需要的fd了, 准备发消息了" << std::endl;

sleep(1);

// 子进程 --- write

// 3. 关闭不需要的fd

close(pipefd[0]);

// if(fork() > 0) exit(0);

SubProcessWrite(pipefd[1]);

close(pipefd[1]);

exit(0);

}

std::cout << "父进程关闭不需要的fd了, 准备收消息了" << std::endl;

sleep(1);

// 父进程 --- read

// 3. 关闭不需要的fd

close(pipefd[1]);

FatherProcessRead(pipefd[0]);

std::cout << "5s, father close rfd" << std::endl;

sleep(5);

close(pipefd[0]);

int status = 0;

pid_t rid = waitpid(id, &status, 0);

if (rid > 0)

{

std::cout << "wait child process done, exit sig: " << (status&0x7f) << std::endl;

std::cout << "wait child process done, exit code(ign): " << ((status>>8)&0xFF) << std::endl;

}

return 0;

}

管道的四种情况:

如果管道内部是空的,并且写端fd没有关闭,此时读取条件不具备,读进程会被阻塞,读进程会等待,直到写端写入数据。 如果管道被写满,并且读端fd不读且没有关闭,此时写进程会被阻塞,写端会等待,直到数据被读取。如果管道一直在读并且写端关闭了wfd,读端read返回值会读到0,表示读到文件结尾。如果读端rfd直接关闭,写端wfd一直在写入,那么写端进程会被OS直接用13号信号关掉,相当于进程出现了异常。

管道的特征:

匿名管道:只用来进行具有血缘关系的进程之间,进行通信,常用于父子进程之间通信管道文件的生命周期是随进程的管道内部,自带进程之间同步的机制(多执行流执行代码的时候,具有明显的顺序性)管道文件在通信的时候,是面向字节流的。(写的次数和读取的次数不是一一匹配的)管道的通信模式,是一种特殊的半双工模式,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道

 

当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。 

原子的意思就是这次的写入操作不会被中断。写的时候,不会写一半就被读走。在读方看来,要么不写,要么写完了。 

当shell执行用管道连接起来的多条命令时,shell内部会把他们各自变成一个进程,他们是同时启动的。他们的父进程都是bash,他们是兄弟关系。所以命令行上的 | 就是匿名管道。 

 进程池实现

 ProcessPool.cc

<code>#include <iostream>

#include <string>

#include <vector>

#include <unistd.h>

#include <sys/types.h>

#include <sys/wait.h>

#include "Task.hpp"

// void work(int rfd)

// {

// while (true)

// {

// int command = 0;

// int n = read(rfd, &command, sizeof(command));

// if (n == sizeof(int))

// {

// std::cout << "pid is : " << getpid() << " handler task" << std::endl;

// ExcuteTask(command);

// }

// else if (n == 0)

// {

// std::cout << "sub process : " << getpid() << " quit" << std::endl;

// break;

// }

// }

// }

// master

class Channel

{

public:

Channel(int wfd, pid_t id, const std::string &name)

: _wfd(wfd), _subprocessid(id), _name(name)

{

}

int GetWfd() { return _wfd; }

pid_t GetProcessId() { return _subprocessid; }

std::string GetName() { return _name; }

void CloseChannel()

{

close(_wfd);

}

void Wait()

{

pid_t rid = waitpid(_subprocessid, nullptr, 0);

if (rid > 0)

{

std::cout << "wait " << rid << " success" << std::endl;

}

}

~Channel()

{

}

private:

int _wfd;

pid_t _subprocessid;

std::string _name;

};

// 形参类型和命名规范

// const &: 输入

// & : 输入输出型参数

// * : 输出型参数

// task_t task: 回调函数

void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)

{

for (int i = 0; i < num; i++)

{

// 1. 创建管道

int pipefd[2] = {0};

int n = pipe(pipefd);

if (n < 0)

exit(1);

// 2. 创建子进程

pid_t id = fork();

if (id == 0)

{

if (!channels->empty())

{

// 第二次之后,开始创建的管道要关闭继承下来的写端

for(auto &channel : *channels) channel.CloseChannel();

}

// child - read

close(pipefd[1]);

dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入

task();

close(pipefd[0]);

exit(0);

}

// 3.构建一个channel名称

std::string channel_name = "Channel-" + std::to_string(i);

// 父进程

close(pipefd[0]);

// a. 子进程的pid b. 父进程关心的管道的w端

channels->push_back(Channel(pipefd[1], id, channel_name));

}

}

// 0 1 2 3 4 channelnum

int NextChannel(int channelnum)

{

static int next = 0;

int channel = next;

next++;

next %= channelnum;

return channel;

}

void SendTaskCommand(Channel &channel, int taskcommand)

{

write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));

}

void ctrlProcessOnce(std::vector<Channel> &channels)

{

sleep(1);

// a. 选择一个任务

int taskcommand = SelectTask();

// b. 选择一个信道和进程

int channel_index = NextChannel(channels.size());

// c. 发送任务

SendTaskCommand(channels[channel_index], taskcommand);

std::cout << std::endl;

std::cout << "taskcommand: " << taskcommand << " channel: "

<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;

}

void ctrlProcess(std::vector<Channel> &channels, int times = -1)

{

if (times > 0)

{

while (times--)

{

ctrlProcessOnce(channels);

}

}

else

{

while (true)

{

ctrlProcessOnce(channels);

}

}

}

void CleanUpChannel(std::vector<Channel> &channels)

{

// int num = channels.size() -1;

// while(num >= 0)

// {

// channels[num].CloseChannel();

// channels[num--].Wait();

// }

for (auto &channel : channels)

{

channel.CloseChannel();

channel.Wait();

}

// // 注意

// for (auto &channel : channels)

// {

// channel.Wait();

// }

}

// ./processpool 5

int main(int argc, char *argv[])

{

if (argc != 2)

{

std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;

return 1;

}

int num = std::stoi(argv[1]);

LoadTask();

std::vector<Channel> channels;

// 1. 创建信道和子进程

CreateChannelAndSub(num, &channels, work1);

// 2. 通过channel控制子进程

ctrlProcess(channels, 5);

// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程

CleanUpChannel(channels);

// sleep(100);

return 0;

}

 如上图,左边是父进程,右边是子进程。创建子进程的时候,从第二个子进程开始,创建的时候会继承父进程之前的文件描述符,也就会连接到进程1的写端。随着子进程的增加,越来越多描述符指向先前的管道的写端 ,就会导致要关闭管道时,写端没关完,读端读不到,就会造成阻塞,进程就退出不了。所以要在创建第二个及以后的进程的时候,把继承的写端关掉,如下图:

 Task.hpp

<code>#pragma once

#include <iostream>

#include <ctime>

#include <cstdlib>

#include <sys/types.h>

#include <unistd.h>

#define TaskNum 3

typedef void (*task_t)(); // task_t 函数指针类型

void Print()

{

std::cout << "I am print task" << std::endl;

}

void DownLoad()

{

std::cout << "I am a download task" << std::endl;

}

void Flush()

{

std::cout << "I am a flush task" << std::endl;

}

task_t tasks[TaskNum];

void LoadTask()

{

srand(time(nullptr) ^ getpid() ^ 17777);

tasks[0] = Print;

tasks[1] = DownLoad;

tasks[2] = Flush;

}

void ExcuteTask(int number)

{

if (number < 0 || number > 2)

return;

tasks[number]();

}

int SelectTask()

{

return rand() % TaskNum;

}

void work()

{

while (true)

{

int command = 0;

int n = read(0, &command, sizeof(command));

if (n == sizeof(int))

{

std::cout << "pid is : " << getpid() << " handler task" << std::endl;

ExcuteTask(command);

}

else if (n == 0)

{

std::cout << "sub process : " << getpid() << " quit" << std::endl;

break;

}

}

}

void work1()

{

while (true)

{

int command = 0;

int n = read(0, &command, sizeof(command));

if (n == sizeof(int))

{

std::cout << "pid is : " << getpid() << " handler task" << std::endl;

ExcuteTask(command);

}

else if (n == 0)

{

std::cout << "sub process : " << getpid() << " quit" << std::endl;

break;

}

}

}

void work2()

{

while (true)

{

int command = 0;

int n = read(0, &command, sizeof(command));

if (n == sizeof(int))

{

std::cout << "pid is : " << getpid() << " handler task" << std::endl;

ExcuteTask(command);

}

else if (n == 0)

{

std::cout << "sub process : " << getpid() << " quit" << std::endl;

break;

}

}

}



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。