【Linux】基于环形队列RingQueue的生产消费者模型
2301_79585944 2024-08-03 12:07:02 阅读 66
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
目录
前言
环形队列的概念及定义
POSIX信号量
RingQueue的实现方式
RingQueue.hpp的构建
Thread.hpp
Main.cc主函数的编写
Task.hpp function包装器的使用
总结
前言
<code>世上有两种耀眼的光芒,一种是正在升起的太阳,一种是正在努力学习编程的你!一个爱学编程的人。各位看官,我衷心的希望这篇博客能对你们有所帮助,同时也希望各位看官能对我的文章给与点评,希望我们能够携手共同促进进步,在编程的道路上越走越远!
提示:以下是本篇文章正文内容,下面案例可供参考
环形队列的概念及定义
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来 判断满或者空。另外也可以预留一个空的位置,作为满的状态
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
为什么判断阻塞队列为空为满时,要在我们对应的加锁和解锁之间呢?
判断阻塞队列是否为满,本身就是对阻塞队列内部的成员属性做访问、做比较,如果判断在加锁和解锁的区间之外判断时,可能会出现pop和push的操作,会导致并发访问出问题,判断也就不准了。加锁是对内部资源(阻塞队列)进行整体使用的,虽然对阻塞队列保护起来了,但是对阻塞队列的使用情况,我们在锁这里看不出来,我们只能证明当前阻塞队列有人想要使用它,但是队列中的情况是不清楚的。
当队列为空的时候,生产者和消费者的下标是同一个位置;
当队列为满的时候,生产者和消费者下标是用同一个位置。
环形队列判空盘满的时候,难度有点大,因为生产者和消费者在队列为空为满时,都是用同一个位置,所以
方法一:在环形队列中引入了一个元素的计数器count,count==0时,为空;count==N时,为满;方法二:当生产者下标加1等于消费者下标,则说明队列为满。
环形队列中共分三种情况:
生产者和消费者下标为同一个位置:
队列满:当队列为满时,生产者就不能在盘子中放入数据了,否则会覆盖之前的数据;所以要访问临界资源,要让消费者先跑。队列空:当队列为空时,必须保证要生产者先跑,因为生产者和消费者为空,指向同一个位置,那么这个位置就是一个局部性的临界资源,不能让两者同时跑,否则会出现二义性(盘子和苹果),所以要保证两者互斥,其次必须生产者先跑。
通过队列为空为满这两种情况得出结论:生产者不能把消费者套一个圈;消费者不能超过生产者。
生产者和消费者的下标不是同一个位置:
队列一定不为空&&队列一定不为满
生产者和消费者的下标不在同一个位置,就意味着生产者和消费者的动作,可以真正的并发。
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
我们在电影院中买票,把票买到了,(申请信号量成功了)能证明电影院中一定有资源给我。所以当我申请信号量成功之后,我根本就不用判断这里面的资源是否满足我的条件,所以信号量是一把计数器,这个计数器是用来衡量资源数目的,只要申请成功,就一定会有对应的资源提供给你,从而有效减少内部的判断!!!
在进入临界资源之前,申请信号量时,就已经知道要的资源有还是没有了;而阻塞队列加锁那里,我们还要在对应的临界区里做判断。
定义一个信号量跟定义一个整型一样。
理解:信号量内部维护了一个计数器和队列
初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:值为0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
等待信号量:对指定信号量的计数器做--,
减之前先判断:信号量值是否大于0,大于0,继续往后走;小于0,则阻塞
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
发布信号量:对指定信号量的计数器做++
RingQueue的实现方式
RingQueue.hpp的构建
将和环形队列相关的控制方法进行封装,通过模板传入Thread模板之中,之后每个线程都能看到环形队列的相关方法及规则,从而更好的对所有的线程进行管理,依旧是遵循Linux中的先描述,再组织。
#pragma once
// 环形队列
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
// 单生产,单消费
// 多生产,多消费
// "321":
// 3: 三种关系
// a: 生产和消费互斥和同步
// b: 生产者之间:一把锁
// c: 消费者之间:一把锁
// b和c的解决方案:加锁,是因为下标只有一个
// 1. 需要几把锁?2把
// 2. 如何加锁?
template<typename T>
class RingQueue
{
private:
void P(sem_t& sem)
{
sem_wait(&sem);// -1
}
void V(sem_t& sem)
{
sem_post(&sem);// +1
}
// 加锁和解锁的接口封装
void Lock(pthread_mutex_t& mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t& mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap) : _ring_queue(cap), _cap(cap), _productor_step(0), _consumer_step(0)
{
sem_init(&_room_sem, 0, _cap);// 0:线程间共享
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_productor_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr);
}
// 入队列操作
void Enqueue(const T& in)
{
// 先申请信号量再加锁的优点:
// 多个线程来了,抢信号量都不互相干扰;虽然竞争锁的时候,只有一个线程竞争成功了,
// 其余线程都在锁这里等待下次竞争锁,但是这些线程都提前预定了信号量,当这些线程被唤醒时,
// 直接进来生产就行了,效率提高了
// 生产行为
P(_room_sem);// 先申请空间资源,申请信号量:本质是对资源的预定机制,是原子的
Lock(_productor_mutex);// 加锁使得多生产多消费--->转为单生产但消费
// 一定有空间!!!
_ring_queue[_productor_step++] = in; // 生产,先访问再++
_productor_step %= _cap;// 防止越界,维持环状特性
Unlock(_productor_mutex);
V(_data_sem);// 释放数据信号量,数据信号量+1
}
void Pop(T* out)
{
// 消费行为
P(_data_sem);// 先申请数据资源
Lock(_consumer_mutex);
*out = _ring_queue[_consumer_step++];
_consumer_step %= _cap;
Unlock(_consumer_mutex);
V(_room_sem);// 释放空间信号量:数据取走,空间露出来,把空间资源释放掉
}
~RingQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_productor_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
private:
// 1. 环形队列
std::vector<T> _ring_queue;
int _cap; // 环形队列的容量上限
// 2. 生产和消费的下标
int _productor_step;// 多个生产者线程都要竞争这个下标,下标只有一个,所以得争锁
int _consumer_step;
// 3. 定义信号量
// 当队列为空为满时,生产和消费线程都在各自的信号量中的队列中休眠
sem_t _room_sem; // 生产者关心(空间信号量)
sem_t _data_sem; // 消费者关心(数据信号量)
// 4. 定义锁,维护多生产之间、多消费之间的互斥关系
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
};
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T&, std::string name)>;
// typedef std::function<void(const T&)> func_t;
template<typename T>
class Thread
{
public:
void Excute()
{
// std::cout << _threadname << std::endl;
// 回调线程方法(生产者和消费者执行的函数)
_func(_data, _threadname);
}
public:
Thread(func_t<T> func, T& data, std::string name = "none-name")
: _func(func), _data(data), _threadname(name), _stop(true)
{}
static void* threadroutine(void* args) // 类成员函数,形参是有this指针的!!
{
Thread<T>* self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if (!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if (!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if (!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T& _data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
} // namespace ThreadModule
#endif
Main.cc主函数的编写
#define _CRT_SECURE_NO_WARNINGS 1
#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>
// 我们需要的是向队列中投递任务
using namespace ThreadModule;
using ringqueue_t = RingQueue<Task>;
void Consumer(ringqueue_t& rq, std::string name)
{
while (true)
{
sleep(2);
// 1. 消费任务
Task t;
rq.Pop(&t);
std::cout << "Consumer handler task: " << "[" << name << "]" << std::endl;
// 2. 处理任务
t();
}
}
void Productor(ringqueue_t& rq, std::string name)
{
srand(time(nullptr) ^ pthread_self());
//int cnt = 10;
while (true)
{
// 获取任务
// 生产任务
rq.Enqueue(Download);
std::cout << "Productor : " << "[" << name << "]" << std::endl;
// cnt--;
}
}
void InitComm(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq, func_t<ringqueue_t> func, const std::string& who)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1) + "-" + who;
threads->emplace_back(func, rq, name);// 构建出一个线程对象
// threads->back()->Start();// 真正的创建出新线程
// 当前线程对象在创建新线程和执行函数方法时,主线程可能会先一步回来又创建了一个线程对象,
// 那么vector容器中最后一个元素就改变了,那么又执行容器的最后一个线程可能会出错,
// 因为上一个线程的执行函数的过程还没有执行完,刚拿到最后一个线程的数据时,还没来得及使用,
// 容器中最后一个线程变化了,那么就拿新线程的数据,但是新线程的数据并没有初始化完成,
// 此时访问的对象那个就是一个空对象。
// 所以我们应该把线程构建起来,最后统一启动StartAll
}
}
void InitConsumer(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{
InitComm(threads, num, rq, Consumer, "consumer");
}
void InitProductor(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{
InitComm(threads, num, rq, Productor, "productor");
}
void WaitAllThread(std::vector<Thread<ringqueue_t>>& threads)
{
for (auto& thread : threads)
{
thread.Join();
}
}
void StartAll(std::vector<Thread<ringqueue_t>>& threads)
{
for (auto& thread : threads)
{
std::cout << "start: " << thread.name() << std::endl;
thread.Start();
}
}
int main()
{
ringqueue_t* rq = new ringqueue_t(10);
std::vector<Thread<ringqueue_t>> threads;
// std::vector<Thread<ThreadData>> threads;
InitProductor(&threads, 1, *rq);
InitConsumer(&threads, 1, *rq);
StartAll(threads);
WaitAllThread(threads);
return 0;
}
Task.hpp function包装器的使用
Task是一个function<void()>的类型,也就是说用Task实例化出的模板可以接收任意类型的函数方法(也就是生产消费者模型中的任务)这样就最大的实现了来什么执行什么,大大提高了代码的灵活性可拓展性。
#pragma once
#include <iostream>
#include <functional>
using Task = std::function<void()>;
void Download()
{
std::cout << "this is a download task" << std::endl;
}
总结
好了,本篇博客到这里就结束了,如果有更好的观点,请及时留言,我会认真观看并学习。
不积硅步,无以至千里;不积小流,无以成江海。
上一篇: Docker:安装和使用教程
下一篇: 运维系列(亲测有效):Docker pull拉取镜像报错“Error response from daemon: Get “https://registry-1.docker.io/v2”解决办法
本文标签
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。