【Linux学习】进程间通信之 匿名管道 与 基于管道的进程池

Jupiter· 2024-08-16 14:07:06 阅读 100

🍑个人主页:Jupiter.

🚀 所属专栏:Linux从入门到进阶

欢迎大家点赞收藏评论😊

在这里插入图片描述

在这里插入图片描述

目录

`🍑进程间通信``🐬进程间通信目的 `

`📚管道 ``📕管道的原理``🐧用fork来共享管道原理``🦌站在文件描述符角度-深度理解管道`

`🚀匿名管道 ``🔒管道读写规则`` 🐟管道特点`

`🦅基于管道实现一个进程池`


<code>🍑进程间通信

🐬进程间通信目的

数据传输:一个进程需要将它的数据发送给另一个进程资源共享:多个进程之间共享同样的资源。通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

进程间通信的实质让不同的进程看到一份资源

📚管道

什么是管道?

管道是Unix中最古老的进程间通信的形式。

我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”,其中,管道只能被设计为单向通信的。

在这里插入图片描述

<code>📕管道的原理

🐧用fork来共享管道原理

在这里插入图片描述

<code>🦌站在文件描述符角度-深度理解管道

当一个进程以读和以写打开同一个文件的时候,会有两个struct file对象(如下图),一个被打开的文件只有一个文件缓冲区,所以这两个struct file对象指向的是同一个缓冲区。(struct file允许多个指针指向的,里面有引用计数,当上层调用close的时候,实际上是将对应的struct file* fd_struct里面的数据清空,然后引用计数–,为0就会回收对应的文件缓冲区)当我们fork创建子进程后,会以父进程为模板,将进程独有的资源拷贝给子进程(浅拷贝)。这时,子进程也指向父进程所指向的那两个struct file对象,两个进程就看到了同一份资源(缓冲区)。将父子进程对应的读或则写端关闭,就形成了单向的管道。

在这里插入图片描述

<code>🚀匿名管道

OS提供的一个系统调用pipe,调用后OS还是使用文件那一套,只需创建一个内存级文件对象与文件缓冲区,但并不是真的打开了一个文件,其中磁盘中并不存在这个文件,不需要向磁盘做刷新;

头文件: #include <unistd.h>

功能:创建一无名管道

原型:

int pipe(int fd[2]);参数:

fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端返回值:成功返回0,失败返回错误代码。

在这里插入图片描述

示例代码:

<code>void writer(int wfd)

{ -- -->

const char *str = "hello father, I am child";

char buffer[128];

snprintf(buffer, sizeof(buffer), "%s", str);

write(wfd, buffer, strlen(buffer));

}

void reader(int rfd)

{

char buffer[1024];

ssize_t n = read(rfd, buffer, sizeof(buffer)-1);

(void)n;

printf("father get a message: %s", buffer);

}

int main()

{

// 1. 创建管道

int pipefd[2];

int n = pipe(pipefd);

if(n < 0) return 1;

printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0], pipefd[1]); // 3, 4

// 2. 创建子进程

pid_t id = fork();

if(id == 0)

{

//子进程作为写端,关闭读

close(pipefd[0]);

writer(pipefd[1]);

exit(0);

}

//父进程作为读端,关闭写

close(pipefd[1]);

reader(pipefd[0]);

wait(NULL);

return 0;

}

🔒管道读写规则

管道内部没有数据并且写端没有关闭自己的fd,读端就会阻塞等待,直到pipe中有数据。管道内部写满数据并且读端没有关闭自己的fd,写端就会阻塞等待,直到pipe中有空间可以写。如果所有管道写端对应的文件描述符被关闭,则read返回0,表示读结束,类似读到了文件结尾。如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出

🐟管道特点

只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。管道是面向字节流的。一般而言,进程退出,管道释放,所以管道的生命周期随进程。一般而言,内核会对管道操作进行同步与互斥。当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。(PIPE_BUF==4096字节)管道是半双工的,数据只能向一个方向流动(单向通信 );需要双方通信时,需要建立起两个管道 。

在这里插入图片描述

<code>🦅基于管道实现一个进程池

实现目标:创建多个子进程,父进程通过管道向子进程发送任务,对子进程进行控制,使子进程能够负载均衡的完成父进程提供的任务。

#pragma once

#include<iostream>

#include<cstdlib>

#include<unistd.h>

using namespace std;

typedef void(*work_t)();

typedef void(*task_t)();

void Printlog()

{ -- -->

cout<<"Printlog"<<endl;

}

void callone()

{

cout<<"callone"<<endl;

}

void ConnectMysql()

{

cout<<"ConnectMysql"<<endl;

}

task_t tasks[3]={ Printlog , callone , ConnectMysql};

uint32_t nextwork()

{

return rand()%3;

}

void worker()

{

while(true)

{

uint32_t command_code = 0;

ssize_t n = read(0,&command_code,sizeof(command_code));

if(n==sizeof(command_code))

{

if(command_code>=3) continue;

tasks[command_code]();

}

else if(n==0)

{

break;

}

}

}

#include <iostream>

#include <unistd.h>

#include <string>

#include <cstdlib>

#include<sys/wait.h>

#include <vector>

#include <ctime>

#include "task.hpp"

using namespace std;

#define Subprocnum 5

enum

{

UsageError = 1,

NumError,

PipeError

};

void Usage(const string &proc)

{

cout << "Usage:" << proc << "-Subprocnum" << endl;

}

//./myprocess 6

class channel

{

public:

channel(int wfd, pid_t Sub_id, const string &name)

: _wfd(wfd),

_id(Sub_id),

_name(name)

{

}

int wfd()

{

return _wfd;

}

void Close()

{

close(_wfd);

}

pid_t id()

{

return _id;

}

~channel()

{

}

private:

int _wfd;

pid_t _id;

string _name;

};

class processpool

{

public:

processpool(int sub_num)

: _Sub_num(sub_num)

{

}

int Nextchannel()

{

static int next = 0;

int n = next++;

next %= channels.size();

return n;

}

int Creatproc(work_t worker)

{

vector<int> fds;

for (int num = 0; num < _Sub_num; num++)

{

int pipefd[2] = { 0};

int n = pipe(pipefd);

if (n < 0)

return PipeError;

pid_t id = fork();

if (id == 0)

{

if(!fds.empty())

{

for(auto fd:fds)

{

close(fd);

}

}

// child r

close(pipefd[1]);

dup2(pipefd[0], 0);

worker();

exit(0);

}

string cname = "channel-" + to_string(num);

// father w

close(pipefd[0]);

channels.push_back(channel(pipefd[1], id, cname));

fds.push_back(pipefd[1]);

}

return 0;

}

void Send_task(int nextchannel, uint32_t code)

{

int wfd = channels[nextchannel].wfd();

write(wfd, &code, sizeof(code));

}

void Killall()

{

for(auto &channel:channels)

{

channel.Close();

}

}

void wait()

{

for(auto &channel:channels)

{

int status = 0;

pid_t rid = waitpid(channel.id(),&status,0);

if(rid==channel.id())

{

cout<<"wait success..."<<endl;

}

}

}

~processpool()

{

}

private:

int _Sub_num;

vector<channel> channels;

};

int Col(processpool *processpool_ptr)

{

while (true)

{

// a.选择进程与通道

int nextchannel = processpool_ptr->Nextchannel();

// b.选择任务

uint16_t code = nextwork();

// c.发送任务

processpool_ptr->Send_task(nextchannel, code);

}

return 0;

}

int main(int argc, char *argv[])

{

if (argc != 2)

{

Usage(argv[0]);

return UsageError;

}

int Subnum = stoi(argv[1]);

if (Subnum < 0)

return NumError;

srand((uint32_t)time(NULL));

// 1. 创建进程

processpool *processpool_ptr = new processpool(Subnum);

processpool_ptr->Creatproc(worker);

// 2.控制进程

Col(processpool_ptr);

// 3.回收进程

// wait process

//杀掉所有的进程

processpool_ptr->Killall();

//等待所有进程

processpool_ptr->wait();

delete(processpool_ptr);

return 0;

}

代码中细节处理:在创建子进程代码中已经处理。

在这里插入图片描述




声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。