Linux之信号量 | 消费者生产者模型的循环队列

dbln 2024-07-31 17:07:02 阅读 61

目录

一、信号量

1、概念

2、信号量操作函数

二、基于环形队列的生产者消费者模型

1、模型分析

2、代码实现

1、单生产单消费的生产者消费者模型

2、多生产多消费的生产者消费者模型


一、信号量

1、概念

引入:前面我们讲到了,对临界资源进行访问时,为了保证数据的一致性,我们需要对临界资源进行加锁保护。当我们用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。这样做非常合理,但是效率太低了。

但是,我们最理想的方案,其实是:如果在同一时刻,不同的执行流要访问的是临界资源的不同区域,那么我们是允许它们同时进行临界资源的访问,这样就大大提高了效率。

比如,如果一个临界资源是一个大小为10数组,我们可以对其加锁保护。可是,现在来看,如果有10个线程同时访问这个数组,可以吗?当然可以,只要这10个线程在同一时刻访问的是数组的不同位置,即10个位置一个线程访问一个。这样我们就可以让多个执行流同时访问临界资源了。

这时,我们就使用了信号量来帮助我们实现这个方案。

信号量:信号量的本质是一个计数器,通常用来表示临界资源中,资源数的多少。申请信号量实际上就是对临界资源的预定机制。信号量主要用于同步和互斥。

每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了访问临界资源的权限,当访问完毕后就应该释放信号量。 

信号量的PV操作:

~  P操作:我们将申请信号量称为P操作。申请信号量的本质就是申请获得临界资源中某块资源的访问权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让信号量减一。

~  V操作:我们将释放信号量称为V操作。释放信号量的本质就是归还临界资源中某块资源的访问权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让信号量加一。

2、信号量操作函数

sem_init:初始化信号量的函数,该函数的函数原型如下:返回值,初始化信号量成功返回0,失败返回-1。

<code>NAME

sem_init - initialize an unnamed semaphore

SYNOPSIS

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

Link with -pthread.

参数说明:

sem:需要初始化的信号量。

pshared:传入0值表示线程间共享,传入非零值表示进程间共享。

value:信号量的初始值。

sem_destroy:销毁信号量的函数,该函数的函数原型如下:

NAME

sem_destroy - destroy an unnamed semaphore

SYNOPSIS

#include <semaphore.h>

int sem_destroy(sem_t *sem);

Link with -pthread.

参数说明:sem:需要销毁的信号量。

返回值:销毁信号量成功返回0,失败返回-1。

sem_wait:等待信号量(申请信号量)的函数,该函数的函数原型如下:

NAME

sem_wait, sem_timedwait, sem_trywait - lock a semaphore

SYNOPSIS

#include <semaphore.h>

int sem_wait(sem_t *sem);

Link with -pthread.

参数:sem:需要等待的信号量。

返回值:等待信号量成功返回0,信号量的值减一。等待信号量失败返回-1,信号量的值保持不变。

sem_post:释放信号量(发布信号量)的函数,该函数的函数原型如下:

NAME

sem_post - unlock a semaphore

SYNOPSIS

#include <semaphore.h>

int sem_post(sem_t *sem);

Link with -pthread.

参数:sem:需要释放的信号量。

返回值:释放信号量成功返回0,信号量的值加一。释放信号量失败返回-1,信号量的值保持不变。

有了对信号量的各种操作,我们下面来通过一个具体的例子来使用一下他们。

二、基于环形队列的生产者消费者模型

1、模型分析

实际上并不是真正的环形队列,因为我们没有这种数据结构,它的实现是通过数组模拟的,当数据加入到最后的位置时直接模等于数组的大小即可,这样就可以回到数组的起始位置。

我们在对环形队列进行访问时,当队列为空或者为满,生产者和消费者就会指向同一个位置,这时我们就需要生产者和消费者互斥和同步了,如果为空,让生产者先访问,为满就让消费者先访问。

而当队列不为空,也不为满时,生产者和消费者可以访问队列不同的位置,以实现并发。这样就可以提高效率了。

为了实现消费者和生产者的并发访问,我们需要使用信号量。我们使用信号量来表示队列中相关资源的个数。

1、对于生产者,在意的是队列中的空间资源,只要有空间生产者就可以进行生产。空间资源定义成一个生产者需要的信号量(space_sem),在初始化时,它的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。

2、对于消费者,在意的是队列中的数据资源,只要有数据消费者就可以进行消费。数据资源定义成一个消费者需要的信号量(data_sem),在初始化时,它的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。

当生产者线程生产数据时,它需要先申请信号量,即对space_sem进行P操作,然后生产数据,放入到队列中。生产完成后,这时,队列中就多出了一个数据资源,需要对data_sem进行V操作。

当消费者线程消费数据时,它也需要先申请信号量,即对data_sem进行P操作,然后消费数据。消费完成后,队列中就会多出一个空间资源,需要对space_sem进行V操作。

2、代码实现

对信号量进行封装:sem.hpp

<code>#pragma once

#include <iostream>

#include <semaphore.h>

using namespace std;

class sem

{

public:

sem(int sem)

{

sem_init(&sem_, 0, sem);

}

void p()

{

sem_wait(&sem_);

}

void v()

{

sem_post(&sem_);

}

~sem()

{

sem_destroy(&sem_);

}

private:

sem_t sem_;

};

1、单生产单消费的生产者消费者模型

RingQueue.hpp:环形队列的实现:

#include <iostream>

#include <vector>

#include <pthread.h>

#include "sem.hpp"

#include <sys/types.h>

#include <unistd.h>

using namespace std;

#define GVAL 5

template <class T>

class RingQueue

{

public:

RingQueue(const int num = GVAL)

: num_(num), rq_(num), space_sem(num), data_sem(0), p_step(0), c_step(0)

{

}

void push(const T &in)

{

space_sem.p(); // 申请信号量

rq_[p_step++] = in;

p_step %= num_;

data_sem.v(); // 释放信号量

}

void pop(T *out)

{

data_sem.p(); // 申请信号量

*out = rq_[c_step++];

c_step %= num_;

space_sem.v(); // 释放信号量

}

~RingQueue()

{

}

private:

vector<T> rq_;

int num_; // 大小

sem space_sem; // 空间资源信号量

sem data_sem; // 数据资源信号量

int c_step; // 消费者下标

int p_step; // 生产者下标

};

单生产单消费的生产者消费者模型:

#include <iostream>

#include <pthread.h>

#include "RingQueue.hpp"

using namespace std;

void *consumer(void *arg)

{

RingQueue<int> *rq = (RingQueue<int> *)arg;

while(true)

{

int x;

rq->pop(&x);

cout << "消费:" << x << endl;

sleep(1);

}

}

void *productor(void *arg)

{

RingQueue<int> *rq = (RingQueue<int> *)arg;

while(true)

{

int x = rand() % 100 + 1;

rq->push(x);

cout << "生产:" << x << endl;

}

}

int main()

{

srand((uint64_t)time(nullptr) ^ getpid() ^ 2321);

RingQueue<int> *rq = new RingQueue<int>();

pthread_t c, p;

pthread_create(&c, nullptr, consumer, (void *)rq);

pthread_create(&p, nullptr, productor, (void *)rq);

pthread_join(c, nullptr);

pthread_join(p, nullptr);

return 0;

}

运行结果:

2、多生产多消费的生产者消费者模型

我们只要保证,最终进入临界区的是一个生产者,一个消费就行,即生产者和生产者之间是互斥的,消费者和消费者之间是互斥的。所以我们需要提供两把锁。

RingQueue.hpp:

<code>#include <iostream>

#include <vector>

#include <pthread.h>

#include "sem.hpp"

#include <sys/types.h>

#include <unistd.h>

using namespace std;

#define GVAL 5

template <class T>

class RingQueue

{

public:

RingQueue(const int num = GVAL)

: num_(num), rq_(num), space_sem(num), data_sem(0), p_step(0), c_step(0)

{

pthread_mutex_init(&clock, nullptr);

pthread_mutex_init(&plock, nullptr);

}

void push(const T &in)

{

space_sem.p(); // 申请信号量

pthread_mutex_lock(&plock);

rq_[p_step++] = in;

p_step %= num_;

pthread_mutex_unlock(&plock);

data_sem.v(); // 释放信号量

}

void pop(T *out)

{

data_sem.p(); // 申请信号量

pthread_mutex_lock(&clock);

*out = rq_[c_step++];

c_step %= num_;

pthread_mutex_unlock(&clock);

space_sem.v(); // 释放信号量

}

~RingQueue()

{

pthread_mutex_destroy(&clock);

pthread_mutex_destroy(&plock);

}

private:

vector<T> rq_;

int num_; // 大小

sem space_sem; // 空间资源信号量

sem data_sem; // 数据资源信号量

int c_step; // 消费者下标

int p_step; // 生产者下标

pthread_mutex_t plock;

pthread_mutex_t clock;

};

#include <iostream>

#include <pthread.h>

#include "RingQueue.hpp"

using namespace std;

void *consumer(void *arg)

{

RingQueue<int> *rq = (RingQueue<int> *)arg;

while (true)

{

int x;

rq->pop(&x);

cout << "消费:" << x << " " << pthread_self() << endl;

sleep(1);

}

}

void *productor(void *arg)

{

RingQueue<int> *rq = (RingQueue<int> *)arg;

while (true)

{

int x = rand() % 100 + 1;

rq->push(x);

cout << "生产:" << x << " " << pthread_self() << endl;

}

}

int main()

{

srand((uint64_t)time(nullptr) ^ getpid() ^ 2321);

RingQueue<int> *rq = new RingQueue<int>();

pthread_t c[2], p[3];

for (int i = 0; i < 2; i++)

pthread_create(c + i, nullptr, consumer, (void *)rq);

for (int i = 0; i < 3; i++)

pthread_create(p + i, nullptr, productor, (void *)rq);

for (int i = 0; i < 2; i++)

pthread_join(c[i], nullptr);

for (int i = 0; i < 3; i++)

pthread_join(p[i], nullptr);

return 0;

}

运行结果:



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。