【Linux】生产消费模型实践 --- 基于信号量的环形队列

CSDN 2024-08-27 15:07:04 阅读 60

在这里插入图片描述

你送出去的每颗糖都去了该去的地方,

其实地球是圆的,

你做的好事终会回到你身上。

--- 何炅 ---


基于信号量的环形队列

1 信号量2 框架构建3 代码实现4 测试运行

1 信号量

信号量本质是一个计数器,可以在初始化时对设置资源数量,进程 / 线程 可以获取信号量来对资源进行操作和结束操作可以释放信号量!

用于多进程 / 多线程 对共享数据对象的读取,它和管道有所不同,它不以传送数据为主要目的,它主要是用来保护共享资源(信号量也属于临界资源),使得资源在一个时刻只有一个进程独享。 在资源只有一个时就一把互斥锁!

信号量只能进行两种操作获取等待和释放信号,即PV操作:

P(sv):我们将申请获取信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减去一。所以P操作的本质就是让计数器减一,如果sv的值大于零,就给它减1;如果它的值为零,就挂起该进程的执行。对应的接口为,使用很简单:

<code>#include <semaphore.h>

//阻塞等待获取

int sem_wait(sem_t *sem);

//只进行一次获取,非阻塞等待

int sem_trywait(sem_t *sem);

//时间片内进行等待,超出就退出阻塞!

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

V(sv):我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一。所以V操作本质就是让计数器加一,如果有其他进程 / 线程因等待sv而被挂起,就发送信号让它恢复运行,如果没有进程 / 线程因等待信号量而挂起,就给他加1。对应接口为:

#include <semaphore.h>

//释放获取的信号量

int sem_post(sem_t *sem);

PV操作都是原子的,不用担心线程安全!此外信号量初始化和销毁的接口是:

信号量初始化:

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数分别为:

sem_t *sem:传入信号量的地址pshared:传入0值表示线程间共享,传入非零值表示进程间共享。value:信号量的初始值(计数器的初始值)。 信号量销毁:

#include <semaphore.h>

int sem_destroy(sem_t *sem);

2 框架构建

环形队列的成员变量

线性容器vector模拟环形队列最大容量 int _max_step消费者位置 _c_step 与 生产者位置 _p_step两个信号量来表示生产与消费的剩余容量

sem_t _data_sem : 当前有多少数据

sem_t _space_sem: 当前剩余空间还有多少

构造函数初始化

最大容量需要给值初始化两个初始位置都为 0信号量初始化 sem_init() 数据为 0 ,空间为 最大容量

Push接口用来加入数据

首先需要申请信号量 P 来对空间信号量进行获取 sem_wait (&sem_t _space_sem)(申请信号量是原子的)

获取信号量的本质是对资源 –生产进行插入 , 对应下标向后移动 , 注意不能越界最后进行释放信号量 V 来对资源信号量进行释放 sem_post()

释放信号量的本质是对资源 ++

Pop接口用来获取数据

首先需要申请信号量 P 来对资源信号量进行获取 sem_wait (&sem_t _space_sem)(申请信号量是原子的)

获取信号量的本质是对资源 –获取队列资源,并进行释放, 对应下标向后移动 , 注意不能越界最后进行释放信号量 V 来对空间信号量进行释放 sem_post()

释放信号量的本质是对资源 ++

多生产多消费改造:多个生产 / 消费线程存在 消费对消费 生产对生产的问题!

信号量保证了单生产单消费中,两个线程可以通过信号量来保证不会出现访问越界 / 访问重叠的问题!多线程的情况下可能会发生访问同一位置的可能,获取到信号量之后由于中间的处理是临界区,可能会发生线程的切换,就会导致对同一位置进行处理,进而发生问题!为了保证线程安全,需要两把锁,分别管理生产者和消费者!锁的处理:

获取信号量之后再进行加锁,获取信号量是原子的,先申请信号量可以保证多个线程在获取中进行排队等待。如果先加锁,就只能使一个线程进入到获取信号量的队列中,效率低(电影院先买票在排队 ,先排队再买票)

6.为什么信号量不加条件判断?:

在环形队列的实现中,没有使用条件变量,像阻塞队列一样进行条件的判断 而是直接来不管三七二十一进行获取信号量,因为信号量本身就是判断条件,信号量是用来描述内部资源的多少的,是原子的!本质是一个计数器 通过预订机制来保证内部资源的合理使用,当信号量的资源数量为1时和锁时等价的!

3 代码实现

#pragma once

#include <vector>

#include <semaphore.h>

const int default_cap = 5;

template <class T>

class RingQueue

{

public:

RingQueue(int max_cap = default_cap) : _rq(max_cap), _max_cap(max_cap), _p_step(0), _c_step(0)

{

// 信号量初始化

sem_init(&_space_sem, 0, _max_cap);

sem_init(&_data_sem, 0, 0);

//锁进行初始化

pthread_mutex_init(&_c_mtx , nullptr);

pthread_mutex_init(&_p_mtx , nullptr);

}

// 获取信号量

void P(sem_t &sp)

{

sem_wait(&sp);

}

// 释放信号量

void V(sem_t &sp)

{

sem_post(&sp);

}

// 插入操作

void Push(const T &t)

{

// 获取空间信号量 --

P(_space_sem);

//临界区上锁

pthread_mutex_lock(&_p_mtx );

_rq[_p_step] = t;

_p_step++;

_p_step %= _max_cap;

//解锁

pthread_mutex_unlock(&_p_mtx);

// 释放信号量 ++

V(_data_sem);

}

// 获取操作

void Pop(T *t)

{

// 获取资源信号量

P(_data_sem);

pthread_mutex_lock(&_c_mtx);

*t = _rq[_c_step];

_c_step++;

_c_step %= _max_cap;

pthread_mutex_unlock(&_c_mtx);

// 释放信号量

V(_space_sem);

}

~RingQueue()

{

// 销毁对应信号量!

sem_destroy(&_space_sem);

sem_destroy(&_data_sem);

//锁进行释放

pthread_mutex_destroy(&_c_mtx);

pthread_mutex_destroy(&_p_mtx);

}

private:

// 底层线性结构,模拟环形队列

std::vector<T> _rq;

// 最大容量

int _max_cap;

// 生产者/消费者 下标

int _p_step;

int _c_step;

// 空间/资源 信号量

sem_t _space_sem;

sem_t _data_sem;

// 生产 / 消费 锁

pthread_mutex_t _p_mtx;

pthread_mutex_t _c_mtx;

};

4 测试运行

我们来做一些简单测试,我们设计了Task类,用于执行加法操作。它包含两个整型参数_x_y,并提供方法来执行加法并获取结果。通过重载括号运算符,Task对象可以被直接调用以执行计算。此外,类还提供了调试信息和结果输出的功能。

我写了一段代码段用于测试。在该测试中:定义了两个线程函数ConsumerProductor,分别模拟消费者和生产者行为:

Consumer线程不断从环形队列中取出Task对象,执行其操作,并打印消费结果。Productor线程则持续生成新的Task对象并将其放入队列中,同时打印出生产信息。

主函数main中创建了一个容量为5的RingQueue<Task>实例,并启动了两个线程。pthread_create用于创建线程,pthread_join确保主线程等待子线程执行完毕。通过这种方式,我们验证了环形队列在多线程环境下的线程安全性和功能正确性。

#include <iostream>

#include "RingQueue.hpp"

#include <pthread.h>

#include <stdlib.h>

#include <sys/types.h>

#include <unistd.h>

#include "Task.hpp"

void *Consumer(void *args)

{

RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);

srand(time(nullptr) ^ getpid());

while (true)

{

// 不断的进行获取

Task data ;

rq->Pop(&data);

data();

std::cout << "Consumer 消费者消费 -> " << data.result() << std::endl;

sleep(1);

}

}

void *Productor(void *args)

{

RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);

srand(time(nullptr) ^ getpid());

while (true)

{

// 不断的进行写入

int num1 = rand() % 10;

usleep(1000);

int num2 = rand() % 10;

Task t(num1 , num2);

rq->Push(t);

std::cout << "Productor 生产者生产 -> " << t.debug() << std::endl;

usleep(10000);

}

}

int main()

{

// 环形队列

RingQueue<Task> rq(5);

// 使用两个线程来测试

pthread_t t1, t2;

pthread_create(&t1, nullptr, Consumer, &rq);

pthread_create(&t2, nullptr, Productor, &rq);

pthread_join(t1, nullptr);

pthread_join(t2, nullptr);

}

运行效果:

在这里插入图片描述

很好的完成了任务!!!



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。