[Linux#44][线程] CP模型2.0 | 信号量接口 | 基于环形队列

lvy- 2024-08-28 08:37:01 阅读 86

目录

1.回忆

Task.hpp

1. #pragma once

2. 头文件和命名空间

3. 类 CallTask

4. 操作符字符串

5. 回调函数 mymath

阻塞队列 BlockQueue 的实现

BlockQueue

生产者和消费者线程

生产者productor

消费者 consumer

主函数 main

代码整体说明

2. 信号量

2.1 回忆:多线程对锁的凌乱竞争✔️

2.2 信号量函数

1. 初始化信号量

2. 销毁信号量

3. 等待信号量(P操作)

4. 发布信号量(V操作)

CP 模型中的锁与信号量使用总结

3.基于环形队列的生产者消费者模型


1.回忆

生产的数据从哪里来?

用户,网络等

生产者生产的数据也是要花时间获取的!

获取数据还有 生产数据到队列

同样的,消费者

消费数据还有 加工处理数据

真正的高效:非临界区(获取/加工)之间高并发的同时进行

实现派发任务的代码的代码细节讲解

我们先把上一篇文章的代码拷过来解释一下,思路如下

下面这段代码定义了类 <code>CallTask,并且展示了如何使用这些类来处理计算任务和保存任务。下面会逐行解释代码的各个部分。

Task.hpp

1. #pragma once

#pragma once

这是一个预处理指令,用来防止头文件被多次包含,避免重复定义。

2. 头文件和命名空间

#include <iostream>

#include <functional>

#include <string>

using namespace std;

包含了标准输入输出库、函数对象库和字符串库。使用 std 命名空间,以便在代码中直接使用标准库中的类型和函数。

3. 类 CallTask

class CallTask

{

typedef function<int(int, int, char)> func_t;

public:

CallTask() {}

CallTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)

{

}

string operator()()

{

int result = _callback(_x, _y, _op);

char buffer[1024];

snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);

return buffer;

}//调用()实现对结果的打印

string toTaskString()

{

char buffer[1024];

snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);

return buffer;

}

private:

int _x;

int _y;

char _op;

func_t _callback;

};

CallTask 类定义了一个计算任务。func_t 是一个 typedef,定义了一个函数类型,它接受三个参数(两个整数和一个字符)并返回一个整数。CallTask 的构造函数可以初始化两个整数 _x_y,一个字符操作符 _op,以及一个回调函数 _callbackoperator() 重载了 () 操作符,使得对象可以像函数一样被调用。调用时,它会执行 _callback 函数(即用户定义的计算逻辑),并返回计算结果的字符串形式。toTaskString() 返回一个描述任务的字符串,但不包含实际计算结果,仅展示操作符和操作数。

4. 操作符字符串

string oper = "+-*/%";

定义了一个包含常见数学操作符的字符串。

5. 回调函数 mymath

int mymath(int x, int y, char op)

{

int result = 0;

switch (op)

{

case '+':

result = x + y;

break;

case '-':

result = x - y;

break;

case '*':

result = x * y;

break;

case '/':

{

if (y == 0)

{

cout << "div zero error" << endl;

result = -1;

}

else

{

result = x / y;

}

}

break;

case '%':

{

if (y == 0)

{

cout << "mod zero error" << endl;

result = -1;

}

else

{

result = x % y;

}

}

break;

default:

break;

}

return result;

}

mymath 函数根据 op 操作符执行不同的数学运算。如果 op/% 并且 y 为 0,会输出错误信息并返回 -1 以表示出错。

阻塞队列 BlockQueue 的实现

进入线程,准备工作做完后,对线程进行判断处理

#pragma once

#include <iostream>

#include <queue>

#include <pthread.h>

using namespace std;

const int maxcapacity = 5;

这部分代码是 BlockQueue 类的定义。#pragma once 防止头文件被多次包含。maxcapacity 定义了队列的最大容量。

BlockQueue

这两部分代码分别展示了一个多线程生产者-消费者模型的实现,其中使用了一个阻塞队列 BlockQueue 来管理生产者和消费者之间的任务流。下面我将详细解释每一部分的代码。

template <class T>

class BlockQueue

{

public:

BlockQueue(const int& capacity = maxcapacity)

: _capacity(capacity)

{

// 构造时初始化

pthread_mutex_init(&_mutex, nullptr);

pthread_cond_init(&_pcond, nullptr);

pthread_cond_init(&_ccond, nullptr);

}

void push(const T& in)

{

pthread_mutex_lock(&_mutex);

while (is_full())

{

pthread_cond_wait(&_pcond, &_mutex);

}

_q.push(in);

pthread_cond_signal(&_ccond);

pthread_mutex_unlock(&_mutex);

}

void pop(T* out)

{

pthread_mutex_lock(&_mutex);

while (is_empty())

{

pthread_cond_wait(&_ccond, &_mutex);

}

*out = _q.front();

_q.pop();

pthread_cond_signal(&_pcond);

pthread_mutex_unlock(&_mutex);

}

~BlockQueue()

{

pthread_mutex_destroy(&_mutex);

pthread_cond_destroy(&_pcond);

pthread_cond_destroy(&_ccond);

}

private:

bool is_full()

{

return _q.size() == _capacity;

}

bool is_empty()

{

return _q.empty();

}

private:

queue<T> _q;

int _capacity;

pthread_mutex_t _mutex;

pthread_cond_t _pcond;

pthread_cond_t _ccond;

};

BlockQueue 是一个模板类,使用标准库的 queue 来存储队列元素。_mutex 是一个互斥锁,用于保护队列的并发访问。_pcond_ccond 是条件变量,用于协调生产者和消费者的等待与唤醒。push 方法用于将元素放入队列。如果队列满了,生产者会等待,直到队列有空余位置。pop 方法用于从队列中取出元素。如果队列为空,消费者会等待,直到队列有元素可取。析构函数负责销毁互斥锁和条件变量。

生产者和消费者线程

#include "BlockQueue.hpp"

#include <ctime>

#include <unistd.h>

#include "Task.hpp"

这部分代码包含了阻塞队列 BlockQueue 和任务类 CallTask 的头文件,并引入了 ctime 用于时间相关操作和 unistd.h 用于线程的休眠。

生产者productor

void* productor(void* args)

{

BlockQueue<Task>* _c_bq = static_cast<BlockQueue<CallTask>*>(args);

while (true)

{

// 生产活动

int x = rand() % 10 + 1;

int y = rand() % 5;

char op = oper[rand() % oper.size()];

Task t(x, y, op, mymath);

_c_bq->push(t);

cout << "productor thread, 生产计算任务: " << t.toTaskString() << endl;

sleep(1); // 生产的慢一些

}

}

该函数是生产者线程的执行体。生产者从 args 参数中获得阻塞队列 _c_bq 的指针,然后进入一个无限循环。在循环中,生产者随机生成两个整数 xy 以及一个操作符 op,创建一个 CallTask 对象 t,并将其推入阻塞队列。每生成一个任务后,生产者线程输出一个消息,显示任务的描述,并且线程会休眠1秒,以降低生产速度。

消费者 consumer

void* consumer(void* args)

{

BlockQueue<Task>* _c_bq = static_cast<BlockQueue<CallTask>*>(args);

while (true)

{

// 消费活动

Task t;

_c_bq->pop(&t);

cout << "cal thread, 完成计算任务: " << t() << endl;

}

}

该函数是消费者线程的执行体。消费者从 args 参数中获得阻塞队列 _c_bq 的指针(是从生产者中传入的),然后进入一个无限循环。在循环中,消费者从阻塞队列中弹出一个 CallTask 对象 t,并执行该任务(通过 t() 调用),然后输出计算结果。

主函数 main

队列类当中的函数类 类型

int main()

{

srand((unsigned int)time(nullptr));

BlockQueue<Task>* bq = new BlockQueue<Task>();

pthread_t p[3], c[2];

for (int i = 0; i < 3; ++i)

{

pthread_create(p + i, nullptr, productor, bq);

}

for (int i = 0; i < 2; ++i)

{

pthread_create(c + i, nullptr, consumer, bq);

}

for (int i = 0; i < 3; ++i)

{

pthread_join(p[i], nullptr);

}

for (int i = 0; i < 2; ++i)

{

pthread_join(c[i], nullptr);

}

return 0;

}

埋种子:srand((unsigned int)time(nullptr));

main 函数中首先设置了随机数种子,然后创建一个 BlockQueue<CallTask> 对象,用于存放生产者生成的任务。创建了3个生产者线程和2个消费者线程,所有线程都使用同一个阻塞队列对象 bq。主线程等待所有生产者和消费者线程结束,确保所有任务都被正确处理。

代码整体说明

生产者-消费者模型:这是一个经典的多线程同步问题,生产者生成任务并放入队列,消费者从队列中取出任务并执行。阻塞队列:通过互斥锁和条件变量,阻塞队列保证了生产者和消费者在多线程环境下的正确协调,防止竞争条件和资源浪费。生产者在队列满时等待,消费者在队列空时等待,从而保证了任务的有序处理。

💡小注意:

在 Visual Studio Code (VSCode) 中进行代码的批量替换操作非常简单,可以通过以下步骤完成:按下 <code>Ctrl + H 快捷键如果确认无误,点击“Replace All”按钮来一次性替换所有匹配项。


为什么要先加锁?

因为判断临界资源调试是否满足,也是在访问临界资源!判断资源是否就绪,是通过再临界资源内部判断的。

为何什么要用 while?解决伪唤醒 ✔️if->while 等待的实现原理

while 一直判断情况,充当休眠等待

当前判断生产条件不满足就把自己挂起,但是这有个问题pthread_cond_wait这是一个函数,只要是函数就有调用失败的可能。另外还存在伪唤醒的情况,假设只有一个消费者,十个生产者。只消费了一个但是却唤醒了一批。但是你这里是if判断,都去push肯定是有问题的。因此充当条件判断的语法必须是while,不能用if

单线程模型->多线程模型的思考✔️

多线程对生产和消费队列加了锁的管控,也是只能有一个人的进/出,但是对于非共享区的准备工作,是可以高并发进行的,真正的优势在于:对于非共享的部分可以多线程的提前执行好

#include <iostream>

#include <queue>

#include <thread>

#include <mutex>

#include <condition_variable>

std::queue<int> buffer;

const int BUFFER_SIZE = 10;

std::mutex mtx;

std::condition_variable cv;

bool done = false; // 生产者是否完成标志

// 生产者

void producer() {

for (int i = 0; i < 20; ++i) {

std::unique_lock<std::mutex> lock(mtx);

cv.wait(lock, [] { return buffer.size() < BUFFER_SIZE; });

buffer.push(i);

std::cout << "Produced: " << i << std::endl;

lock.unlock();

cv.notify_one();

}

done = true; // 生产者完成

cv.notify_one(); // 通知消费者

}

// 消费者

void consumer() {

while (true) {

std::unique_lock<std::mutex> lock(mtx);

cv.wait(lock, [] { return !buffer.empty() || done; });

if (done && buffer.empty()) break; // 如果生产者完成且缓冲区为空,则退出

int item = buffer.front();

buffer.pop();

std::cout << "Consumed: " << item << std::endl;

lock.unlock();

cv.notify_one();

}

}

int main() {

std::thread t1(producer);

std::thread t2(consumer);

t1.join();

t2.join();

return 0;

}

2. 信号量

例如:电影院买票是一种对座位的预定机制

信号量:原子性进行 P(- -)V(++) 操作的计数器

这把计数器的本质是什么?

计数器用来描述资源数目的,把资源是否就绪放在了临界区之外

申请信号量的时候,其实就已经间接的再做判断了

P()---访问资源-- V()

2.1 回忆:多线程对锁的凌乱竞争✔️

解释

当多个线程试图同时访问一个共享资源(如全局变量)时,通常会使用锁(mutex)来确保一次只有一个线程能够访问该资源。然而,如果没有正确的同步机制,线程可能会以一种不可预测的顺序尝试获取锁,导致以下问题:

竞态条件多个线程几乎同时尝试获取同一把锁,但只有一个线程能够成功获取。其他线程需要等待锁释放才能继续执行,这可能导致资源访问的顺序混乱。饥饿:某些线程可能长时间无法获取锁,因为其他线程总是抢先一步获取锁。死锁:两个或多个线程相互等待对方释放锁,导致所有线程都被阻塞。活锁:线程不断尝试获取锁但始终失败,浪费计算资源。

解决方案

为了防止这种“凌乱竞争”,可以采取以下措施:

使用原子操作:确保对共享资源的访问是原子的,即不可分割的。使用互斥锁:确保一次只有一个线程可以访问共享资源。使用条件变量:结合互斥锁使用,允许线程在特定条件下等待,直到满足条件后再继续执行。使用读写锁:允许多个线程同时读取共享资源,但一次只能有一个线程写入。使用信号量:控制多个线程对有限资源的访问。

例如对互斥锁进行回忆:

#include <pthread.h>

#include <stdio.h>

int x = 0;

pthread_mutex_t lock;

void* incrementX(void* arg) {

while (1) {

pthread_mutex_lock(&lock); // 获取锁

x++;

printf("Thread incremented x: %d\n", x);

pthread_mutex_unlock(&lock); // 释放锁

usleep(100000); // 等待一段时间

}

return NULL;

}

int main() {

pthread_t thread1, thread2;

pthread_mutex_init(&lock, NULL);

pthread_create(&thread1, NULL, incrementX, NULL);

pthread_create(&thread2, NULL, incrementX, NULL);

pthread_join(thread1, NULL);

pthread_join(thread2, NULL);

pthread_mutex_destroy(&lock);

return 0;

}

在这个示例中,每个线程在递增 x 的值之前先获取锁,完成递增后释放锁。这样可以确保任何时候只有一个线程能够修改 x 的值,避免了竞态条件的发生。

总结

并发问题:多个线程同时访问同一变量可能导致竞态条件。解决方案:使用互斥锁、原子操作、条件变量等同步机制来保护共享资源。


2.2 信号量函数

信号量是一种用于同步和互斥的机制,在多线程或多进程编程中非常关键。以下是一些常用的信号量操作函数。

1. 初始化信号量

原型int sem_init(sem_t *sem, int pshared, unsigned int value);参数

sem:需要初始化的信号量。pshared:传入0表示线程间共享,传入非0表示进程间共享。value:信号量的初始值,即计数器的初始值。

返回值:成功返回0,失败返回-1。

2. 销毁信号量

原型int sem_destroy(sem_t *sem);参数

sem:需要销毁的信号量。

返回值:成功返回0,失败返回-1。

3. 等待信号量(P操作)

原型int sem_wait(sem_t *sem);参数

sem:需要等待的信号量。

返回值:成功返回0,信号量的值减一;失败返回-1,信号量的值保持不变。

4. 发布信号量(V操作)

原型int sem_post(sem_t *sem);参数

sem:需要发布的信号量。

返回值:成功返回0,信号量的值加一;失败返回-1,信号量的值保持不变。

#include <iostream>

#include <thread>

#include <semaphore.h>

// 定义信号量

sem_t sem;

// 生产者线程函数

void producer() {

for (int i = 0; i < 5; ++i) {

// 发布信号量(V操作)

sem_post(&sem);

std::cout << "Producer incremented semaphore." << std::endl;

std::this_thread::sleep_for(std::chrono::seconds(1));

}

}

// 消费者线程函数

void consumer() {

for (int i = 0; i < 5; ++i) {

// 等待信号量(P操作)

sem_wait(&sem);

std::cout << "Consumer decremented semaphore." << std::endl;

std::this_thread::sleep_for(std::chrono::seconds(1));

}

}

int main() {

// 初始化信号量

if (sem_init(&sem, 0, 0) == -1) {

std::cerr << "Failed to initialize semaphore" << std::endl;

return 1;

}

// 创建生产者和消费者线程

std::thread t1(producer);

std::thread t2(consumer);

// 等待线程结束

t1.join();

t2.join();

// 销毁信号量

sem_destroy(&sem);

return 0;

}


CP 模型中的锁与信号量使用总结

生产多消费的意义

生产与消费的本质

生产:将私有的任务转移到公共空间中。消费:从公共空间中取出任务进行处理。

意义:多生产多消费模型不仅仅局限于将任务或数据放入交易场所,而是包含了任务生产前和消费后的处理过程,这两个阶段往往是最耗时的。

信号量的本质与意义

信号量:本质上是一把计数器。计数器的意义

与互斥量相比,信号量可以预设临界资源的情况。在执行 PV(Proberen/Vergrendelen,即测试/锁定)操作过程中,可以在不进入临界区的情况下得知资源情况。这样可以减少临界区内部的判断,提高系统效率。


3.基于环形队列的生产者消费者模型

常见的环形,例如基于%实现

空和满的时候,tail 和 head 指向的是同一个位置,无法判断是空还是满

解决:

空一个位置(temp=head+1)来判断 temp 及下一个位置是不是==tail交给信号量来处理

生产和消费没有指向同一个格子,就可以运行下去,遵循着三个原则

例如:我围着圆桌进行放苹果,你跟在我后面拿

指向同一个位置的时候,只能一个人访问你不能超过我我不能把你套个圈

正常追逐游戏,必须满足这三个条件

我们两个什么情况才会指向同一个位置?空或满

空:我,生产者执行

满:你,消费者执行

添加信号量

P 关注什么资源呢?还有多少剩余空间 SpaceSem:NC 关注什么资源呢?还有多少剩余数据 DataSem:0

生产者执行时:P(SpaceSem) V(DataSem)

消费者执行时:P(DataSem) V(SpaceSem)

下篇文章将继续讲解~



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。