Linux--生产消费模型

诡异森林。 2024-07-13 16:37:01 阅读 83

线程系列:

Linux–线程的认识(一)

Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)

线程的互斥:临界资源只能在同一时间被一个线程使用

生产消费模型

生产消费模型是多线程编程和分布式系统中的一个经典概念,它描述了生产者和消费者之间的交互方式。在这个模型中,生产者负责生成数据或任务,而消费者则负责处理这些数据或任务。这种模型在处理并发和异步操作时非常有用,尤其是在需要平衡生产速率和消费速率的情况下。

基本概念

生产者(Producer):负责生成数据或任务的实体。在多线程环境中,这通常是一个线程或一组线程。

消费者(Consumer):负责处理数据或任务的实体。同样,这也可以是一个线程或一组线程。

缓冲区(Buffer):生产者和消费者之间的中间存储区域,用于临时存放生产者生成的数据,直到消费者准备好处理它们。

工作原理

生产者:当生产者生成数据时,它将数据放入缓冲区。如果缓冲区已满,生产者可能需要等待或停止生产,直到缓冲区有足够的空间。消费者:消费者从缓冲区取出数据进行处理。如果缓冲区为空,消费者可能需要等待或暂停,直到有新的数据可用。

关键技术

同步机制:如信号量、互斥锁、条件变量等,用于控制对共享资源的访问,确保生产者和消费者不会同时访问或修改缓冲区。阻塞队列:一种特殊的队列,当尝试添加或移除元素时,如果队列已满或为空,操作会被阻塞,直到条件满足。

应用场景

并发编程:在多线程环境中,生产消费模型可以帮助有效地管理资源和任务分配。分布式系统:在网络服务中,如消息队列系统,生产者可以是发送消息的服务,消费者则是接收并处理这些消息的服务。

生产消费模型是理解和实现高效并发和分布式系统的关键,通过合理设计和优化,可以显著提高系统的性能和稳定性

单生产-单消费

BlockQueue.hpp: 阻塞队列

阻塞队列是一种支持两个附加操作的队列。这两个附加的操作是:当队列为空时,获取元素的线程会等待队列变为非空;当队列已满时,存储元素的线程会等待队列可用。

<code>#ifndef __BLOCK_QUEUE_HPP__

#define __BLOCK_QUEUE_HPP__

#include <iostream>

#include <string>

#include <queue>

#include <pthread.h>

using namespace std;

template <class T>

class BlockQueue

{

public:

BlockQueue(int cap) :_cap(cap),_product_wait_num(0),_consum_wait_num(0)

{

pthread_mutex_init(&_mutex,nullptr);

pthread_cond_init(&_product_cond,nullptr);

pthread_cond_init(&_consum_cond,nullptr);

}

void Enqueue(T& in)//生产者所用接口

{

pthread_mutex_lock(&_mutex);//对临界资源开启保护

while(IsFull())//当队列存满后需要让生产者停止生产,进入阻塞状态

{

_product_wait_num++;

pthread_cond_wait(&_product_cond,&_mutex);

_product_wait_num--;

}

//开始生产

_block_queue.push(move(in));

//让消费者来消费

if(_consum_wait_num>0)

pthread_cond_signal(&_consum_cond);

pthread_mutex_unlock(&_mutex);

}

void Pop(T* out)

{

pthread_mutex_lock(&_mutex);//对临界资源开启保护

while(IsEmpty())//当队列空缺后需要让消费者停止消费,进入阻塞状态

{

_consum_wait_num++;

pthread_cond_wait(&_consum_cond,&_mutex);

_consum_wait_num--;

}

//进行消费

*out=_block_queue.front();

_block_queue.pop();

//通知生产者

if(_product_wait_num>0)

pthread_cond_signal(&_product_cond);

pthread_mutex_unlock(&_mutex);

}

~BlockQueue()

{

pthread_mutex_destroy(&_mutex);

pthread_cond_destroy(&_product_cond);

pthread_cond_destroy(&_consum_cond);

}

private:

bool IsFull()

{

return _block_queue.size() == _cap;

}

bool IsEmpty()

{

return _block_queue.empty();

}

queue<T> _block_queue;//阻塞队列

int _cap; //总上限

pthread_mutex_t _mutex; //互斥锁

pthread_cond_t _product_cond; //生产者的条件变量

pthread_cond_t _consum_cond; //消费者的条件变量

int _product_wait_num;

int _consum_wait_num;

};

#endif

代码解释:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

main.cc: 主函数

在这里插入图片描述

<code>#include"BlockQueue.hpp"

#include"Thread.hpp"

#include<string>

#include<vector>

#include<unistd.h>

using namespace ThreadMdule;

int a=10;

//生产者

void Productor(BlockQueue<int>& bq)

{

int cnt=1;

while (true)

{

bq.Enqueue(cnt);

std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;

cnt++;

//sleep(3);

}

}

//消费者

void Consumer(BlockQueue<int>& bq)

{

while (true)

{

int data;

bq.Pop(&data);

std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;

sleep(5);

}

}

//执行创建线程的函数

void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)

{

for (int i = 0; i < num; i++)

{

std::string name = "thread-" + std::to_string(i + 1);

//将线程放入threads中,记录信息

threads->emplace_back(func, bq, name);

threads->back().start();

}

}

void StartProductor(vector<Thread<BlockQueue<int>>>* threads,int num,BlockQueue<int>& bq)

{

StartComm(threads,num,bq,Productor);

}

void StartConsumer(vector<Thread<BlockQueue<int>>>* threads,int num,BlockQueue<int>& bq)

{

StartComm(threads,num,bq,Consumer);

}

void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)

{

for (auto &thread : threads)

{

thread.Join();

}

}

int main()

{

BlockQueue<int>* bq=new BlockQueue<int>(5);

vector<Thread<BlockQueue<int>>> threads;//用threads来记录线程的信息

StartProductor(&threads,1,*bq);

StartConsumer(&threads,1,*bq);

WaitAllThread(threads);

}

在这里插入图片描述

在这里插入图片描述

细节:

在这里插入图片描述

在这里插入图片描述

多生产-单消费

这里在上面主函数代码上更改生产者的数量即可。

直接验证:

在这里插入图片描述

这里用任务类来作为阻塞队列的任务,让生产者产出对应任务,消费者来解决任务;生产出来的任务先放入阻塞队列作为缓冲;

<code>#include<iostream>

#include<string>

#include<functional>

using namespace std;

class Task

{

public:

Task(){ }

Task(int a,int b): _a(a),_b(b),_result(0)

{ }

void Excute()

{

_result=_a+_b;

}

string ResultToString()

{

return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result);

}

string DebugToString()

{

return to_string(_a) + "+" + to_string(_b) + "= ?";

}

private:

int _a;

int _b;

int _result;

};

//类型

在这里插入图片描述

//生产者:

在这里插入图片描述

//消费者:

在这里插入图片描述



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。