【Linux学习】进程间通信之 匿名管道 与 基于管道的进程池
Jupiter· 2024-08-16 14:07:06 阅读 100
🍑个人主页:Jupiter.
🚀 所属专栏:Linux从入门到进阶
欢迎大家点赞收藏评论😊
目录
`🍑进程间通信``🐬进程间通信目的 `
`📚管道 ``📕管道的原理``🐧用fork来共享管道原理``🦌站在文件描述符角度-深度理解管道`
`🚀匿名管道 ``🔒管道读写规则`` 🐟管道特点`
`🦅基于管道实现一个进程池`
<code>🍑进程间通信
🐬进程间通信目的
数据传输:一个进程需要将它的数据发送给另一个进程资源共享:多个进程之间共享同样的资源。通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信的实质
:让不同的进程看到一份资源
。
📚管道
什么是管道?
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”,其中,管道只能被设计为单向通信
的。
<code>📕管道的原理
🐧用fork来共享管道原理
<code>🦌站在文件描述符角度-深度理解管道
当一个进程以读和以写
打开同一个文件
的时候,会有两个struct file对象
(如下图),一个被打开的文件只有一个文件缓冲区,所以这两个struct file对象指向的是同一个缓冲区。(struct file允许多个指针指向的,里面有引用计数,当上层调用close的时候,实际上是将对应的struct file* fd_struct里面的数据清空,然后引用计数–,为0就会回收对应的文件缓冲区)当我们fork创建子进程后,会以父进程为模板,将进程独有的资源拷贝给子进程(浅拷贝)。这时,子进程也指向父进程所指向的那两个struct file对象,两个进程就看到了同一份资源(缓冲区)。将父子进程对应的读或则写端关闭,就形成了单向的管道。
<code>🚀匿名管道
OS提供的一个系统调用pipe
,调用后OS还是使用文件那一套,只需创建一个内存级文件
对象与文件缓冲区,但并不是真的打开了一个文件,其中磁盘中并不存在这个文件,不需要向磁盘做刷新;
头文件: #include <unistd.h>
功能:创建一无名管道
原型:
int pipe(int fd[2]);参数:
fd:文件描述符数组,其中fd[0]
表示读端
, fd[1]
表示写端
返回值:成功返回0,失败返回错误代码。
示例代码:
<code>void writer(int wfd)
{ -- -->
const char *str = "hello father, I am child";
char buffer[128];
snprintf(buffer, sizeof(buffer), "%s", str);
write(wfd, buffer, strlen(buffer));
}
void reader(int rfd)
{
char buffer[1024];
ssize_t n = read(rfd, buffer, sizeof(buffer)-1);
(void)n;
printf("father get a message: %s", buffer);
}
int main()
{
// 1. 创建管道
int pipefd[2];
int n = pipe(pipefd);
if(n < 0) return 1;
printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0], pipefd[1]); // 3, 4
// 2. 创建子进程
pid_t id = fork();
if(id == 0)
{
//子进程作为写端,关闭读
close(pipefd[0]);
writer(pipefd[1]);
exit(0);
}
//父进程作为读端,关闭写
close(pipefd[1]);
reader(pipefd[0]);
wait(NULL);
return 0;
}
🔒管道读写规则
管道内部没有数据
并且写端没有关闭自己的fd,读端
就会阻塞等待
,直到pipe中有数据。管道内部写满数据
并且读端没有关闭自己的fd,写端
就会阻塞等待
,直到pipe中有空间可以写。如果所有管道写端
对应的文件描述符被关闭
,则read返回0
,表示读结束
,类似读到了文件结尾。如果所有管道读端
对应的文件描述符被关闭
,则write操作
会产生信号SIGPIPE
,进而可能导致write进程退出
。
🐟管道特点
只能用于具有共同祖先的进程(具有亲缘关系
的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。管道是面向字节流
的。一般而言,进程退出,管道释放,所以管道的生命周期随进程。一般而言,内核会对管道操作进行同步与互斥。当要写入的数据量不大于PIPE_BUF
时,linux将保证写入的原子性
。当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。(PIPE_BUF==4096字节
)管道是半双工的
,数据只能向一个方向流动(单向通信 );需要双方通信时,需要建立起两个管道 。
<code>🦅基于管道实现一个进程池
实现目标
:创建多个子进程,父进程通过管道向子进程发送任务,对子进程进行控制,使子进程能够负载均衡的完成父进程提供的任务。
#pragma once
#include<iostream>
#include<cstdlib>
#include<unistd.h>
using namespace std;
typedef void(*work_t)();
typedef void(*task_t)();
void Printlog()
{ -- -->
cout<<"Printlog"<<endl;
}
void callone()
{
cout<<"callone"<<endl;
}
void ConnectMysql()
{
cout<<"ConnectMysql"<<endl;
}
task_t tasks[3]={ Printlog , callone , ConnectMysql};
uint32_t nextwork()
{
return rand()%3;
}
void worker()
{
while(true)
{
uint32_t command_code = 0;
ssize_t n = read(0,&command_code,sizeof(command_code));
if(n==sizeof(command_code))
{
if(command_code>=3) continue;
tasks[command_code]();
}
else if(n==0)
{
break;
}
}
}
#include <iostream>
#include <unistd.h>
#include <string>
#include <cstdlib>
#include<sys/wait.h>
#include <vector>
#include <ctime>
#include "task.hpp"
using namespace std;
#define Subprocnum 5
enum
{
UsageError = 1,
NumError,
PipeError
};
void Usage(const string &proc)
{
cout << "Usage:" << proc << "-Subprocnum" << endl;
}
//./myprocess 6
class channel
{
public:
channel(int wfd, pid_t Sub_id, const string &name)
: _wfd(wfd),
_id(Sub_id),
_name(name)
{
}
int wfd()
{
return _wfd;
}
void Close()
{
close(_wfd);
}
pid_t id()
{
return _id;
}
~channel()
{
}
private:
int _wfd;
pid_t _id;
string _name;
};
class processpool
{
public:
processpool(int sub_num)
: _Sub_num(sub_num)
{
}
int Nextchannel()
{
static int next = 0;
int n = next++;
next %= channels.size();
return n;
}
int Creatproc(work_t worker)
{
vector<int> fds;
for (int num = 0; num < _Sub_num; num++)
{
int pipefd[2] = { 0};
int n = pipe(pipefd);
if (n < 0)
return PipeError;
pid_t id = fork();
if (id == 0)
{
if(!fds.empty())
{
for(auto fd:fds)
{
close(fd);
}
}
// child r
close(pipefd[1]);
dup2(pipefd[0], 0);
worker();
exit(0);
}
string cname = "channel-" + to_string(num);
// father w
close(pipefd[0]);
channels.push_back(channel(pipefd[1], id, cname));
fds.push_back(pipefd[1]);
}
return 0;
}
void Send_task(int nextchannel, uint32_t code)
{
int wfd = channels[nextchannel].wfd();
write(wfd, &code, sizeof(code));
}
void Killall()
{
for(auto &channel:channels)
{
channel.Close();
}
}
void wait()
{
for(auto &channel:channels)
{
int status = 0;
pid_t rid = waitpid(channel.id(),&status,0);
if(rid==channel.id())
{
cout<<"wait success..."<<endl;
}
}
}
~processpool()
{
}
private:
int _Sub_num;
vector<channel> channels;
};
int Col(processpool *processpool_ptr)
{
while (true)
{
// a.选择进程与通道
int nextchannel = processpool_ptr->Nextchannel();
// b.选择任务
uint16_t code = nextwork();
// c.发送任务
processpool_ptr->Send_task(nextchannel, code);
}
return 0;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
return UsageError;
}
int Subnum = stoi(argv[1]);
if (Subnum < 0)
return NumError;
srand((uint32_t)time(NULL));
// 1. 创建进程
processpool *processpool_ptr = new processpool(Subnum);
processpool_ptr->Creatproc(worker);
// 2.控制进程
Col(processpool_ptr);
// 3.回收进程
// wait process
//杀掉所有的进程
processpool_ptr->Killall();
//等待所有进程
processpool_ptr->wait();
delete(processpool_ptr);
return 0;
}
代码中细节处理:在创建子进程代码中已经处理。
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。