[Linux#43][线程] 死锁 | 同步 | 基于 BlockingQueue 的生产者消费者模型

lvy- 2024-08-26 12:37:01 阅读 75

目录

1. 死锁

解决死锁问题

2. 同步

2.1 条件变量函数 cond

2.2 条件变量的使用:

3.CP 问题--理论

4. 基于 BlockingQueue 的生产者消费者模型

1. 基本概念

2.BlockQueue.hpp

基本设置:

生产关系控制:

消费关系的控制

⭕思考点

test 函数:

进化执行 Task.hpp

3. 注意点


1. 死锁

• 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所占用不会释放的资源,而处于的一种永久等待状态。

死锁四个必要条件 (必须同时满足)

互斥条件:一个资源每次只能被一个执行流使用--前提请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放--原则不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺--原则循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系--重要条件

解决死锁问题

理念:破坏四个必要条件--只需要一个不满足就可以的

方法:

加锁顺序一致

我们在申请锁的时候,A线程先申请A锁,在申请B锁,而B线程先申请B锁,在申请A锁,所以两个线程天然申请锁的顺序就是环状的。我们可以尽量不让线程出现这个环路情况,我们让两个线程申请锁的顺序保持一致,就可以破坏循环等待问题。两个线程都是先申请A锁在申请B锁。

避免锁未释放的场景

接口:<code>pthread_mutex_trylock,失败了就会返回退出,释放锁

资源一次性分配

资源一次性分配,比如说你有一万行代码,有五处要申请锁,你可以最开始一次就给线程分配好,而不要把五处申请打散到代码各个区域里所导致加锁场景非常复杂

避免死锁算法

死锁检测算法(了解)银行家算法(了解)


2. 同步

同步!同步问题是保证数据安全的情况下,让我们的线程访问资源具有一定的顺序性

保证线程安全同步了,为什么还要设置锁?

注意前言和后果。排队是结果。例如突然新来了一个线程,被锁挡在了门外,才开始到后面排队的。

分配均衡的可以使用纯互斥,同步是解决分配不均衡问题的

快速提出解决方案 条件变量

锁和铃铛(条件变量--布尔类型)都是一个结构体,OS 先描述再组织条件变量必须依赖于锁的使用(条件就是被锁了,所以才加入等待队列)

条件变量

• 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它 什么也做不了。

• 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个 节点添加到队列中。这种情况就需要用到条件变量。


2.1 条件变量函数 cond

条件变量就相当于是铃铛,和锁的设置非常的相似

初始化 – pthread_cond_init()

静态分配:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

动态分配:

原型:

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

参数:

cond: 需要初始化的条件变量attr: 初始化条件变量的属性,一般设置为 nullptr

返回值: 成功返回 0,失败返回错误码

销毁 – pthread_cond_destroy()

原型:

int pthread_cond_destroy(pthread_cond_t *cond);

参数:

cond: 需要销毁的条件变量

返回值: 成功返回 0,失败返回错误码注意: 使用 PTHREAD_COND_INITIALIZER 初始化的条件变量不需要销毁

等待条件变量 – pthread_cond_wait()

原型:

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

参数:

cond: 需要等待的条件变量mutex: 当前线程所处临界区对应的互斥锁

⭕ (为什么要传这个锁变量呢?1. pthread_cond_wait让线程等待的时候,会自动释放锁,将其加入到等待队列中,不用管临界资源的状态情况

返回值: 成功返回 0,失败返回错误码注意: wait 一定要在加锁和解锁之间进行!

唤醒等待

唤醒全部线程 – pthread_cond_broadcast()

原型:

int pthread_cond_broadcast(pthread_cond_t *cond);

功能: 唤醒等待队列中的全部线程参数:

cond: 需要等待的条件变量

返回值: 成功返回 0,失败返回错误码

唤醒首个线程 – pthread_cond_signal()

原型:

int pthread_cond_signal(pthread_cond_t *cond);

功能: 唤醒等待队列中的首个线程参数:

cond: 需要等待的条件变量

返回值: 成功返回 0,失败返回错误码

测试:

错乱原因:多线程打印出现错乱,显示器是文件,看作一个共享资源

uint64 是什么?

一种跨平台的方式来表示至少 64 位的无符号整数

2.2 条件变量的使用:

原理图,以我们的单人自习室为例

对于线程的管理: 先所有都锁上,再依次唤醒,就实现了每个人进去执行一次,退出,下一个执行

<code> while(true)

{

pthread_mutex_lock(&mutex);

pthread_cond_wait(&cond, &mutex);

std::cout << "pthread: " << number << " , cnt: " << cnt++ << std::endl;

pthread_mutex_unlock(&mutex);

}

}

int main()

{

for(uint64_t i = 0; i < 5; i++)

{

pthread_t tid;

pthread_create(&tid, nullptr, Count, (void*)i);

}

while(true)

{

sleep(1);

pthread_cond_broadcast(&cond);

std::cout << "signal one thread..." << std::endl;

}

return 0;

}

我们怎么知道我们要让一个线程去等待了?

一定是临界资源(自习室里面有人)不就绪,没错,临界资源也是有状态的!!

直接走人叫互斥,去后面排队叫同步

你怎么知道临界资源是就绪还是不就绪的?即怎么知道自习室里面有没有人

你判断出来的!判断是访问临界资源(自习室) 吗?必须是的,也就是判断必须在加锁之后!!!

所以等待的过程,一定要在加锁和解锁之间pthread_cond_wait让线程等待的时候,会自动释放锁,将其加入到等待队列中

sum

等待条件满足的时候往往是在临界区内等待的

当该线程进入等待的时候,互斥锁会自动释放而当该线程被唤醒时,又会自动获得对应的互斥锁

条件变量需要配合互斥锁使用,其中条件变量是用来完成同步的,而互斥锁是用来完成互斥的


3.CP 问题--理论

生产者消费者模型(consumer producter)

存在超市的原因

效率高,中转站大号的缓存,解决了忙闲不均。生产者视角:有多少存储空间,消费者:有多少商品数让生产和消费的行为,进行一定程度的解耦

在计算机中,抽象出来

生产者:线程承担超市:特定结构的内存空间->共享资源->存在并发问题消费者:线程承担

将商品理解为数据,执行流在做通信

如何高效的通信

互斥是一个保证安全的手段

研究超市的并发 三种关系:

生产者 vs 生产者(竞争的互斥关系,只允许一个)消费者 vs 消费者(互斥)生产者 vs 消费者(互斥--安全,同步--一定的顺序性)

321 原则(便与记忆和给别人介绍)

3 种关系2 种角色--生产和消费1 个交易场所--特点结构的内存空间

例如解耦 add 和 main ,实现高并发


4. 基于 BlockingQueue 的生产者消费者模型

1. 基本概念

在多线程编程中,阻塞队列(Blocking Queue)是一种常用的数据结构,用于实现生产者和消费者模型。与普通队列相比,阻塞队列具有以下特点:

当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中有新元素被放入。当队列满时,往队列里存放元素的操作也会被阻塞,直到队列中有元素被取出。其余时间就是边生产边消费,同时进行

2.BlockQueue.hpp

基本设置:

<code>template <class T>

class BlockQueue

{

static const int defalutnum = 20;

public:

BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)

{

pthread_mutex_init(&mutex_, nullptr);

pthread_cond_init(&c_cond_, nullptr);

pthread_cond_init(&p_cond_, nullptr);

// low_water_ = maxcap_/3;

// high_water_ = (maxcap_*2)/3;

}

~BlockQueue()

{

pthread_mutex_destroy(&mutex_);

pthread_cond_destroy(&c_cond_);

pthread_cond_destroy(&p_cond_);

}

private:

std::queue<T> q_;

//int mincap_;

int maxcap_; // 极值

pthread_mutex_t mutex_;

pthread_cond_t c_cond_;

pthread_cond_t p_cond_;

// int low_water_;

// int high_water_;

};

队列 q_ 共享资源, q被当做整体使用的,q只有一份,加锁

生产关系控制:

void push(const T &in)

{

pthread_mutex_lock(&mutex_);

while(q_.size() == maxcap_){

pthread_cond_wait(&p_cond_, &mutex_);

}

q_.push(in);

// if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);

pthread_cond_signal(&c_cond_);

pthread_mutex_unlock(&mutex_);

}

细节点:

你想生产,就直接能生产吗?不一定。你得先确保生产条件满足

pthread_mutex_lock(&mutex_);

while(q_.size() == maxcap_)

判断也是在访问临界资源,在内部进行判断的,所以锁要放在外面保护货物满了,就伪唤醒后加入等待队列

消费关系的控制

T pop()

{

pthread_mutex_lock(&mutex_);

while(q_.size() == 0)

{

pthread_cond_wait(&c_cond_, &mutex_);

}

T out = q_.front();

q_.pop();

// if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);

pthread_cond_signal(&p_cond_);

pthread_mutex_unlock(&mutex_);

return out;

}

细节点:

生产和消费需要分别设置两个等待队列你想消费,就直接能消费吗?不一定。你得先确保消费条件满足,while判断队列情况

⭕思考点

谁来唤醒呢?

例如:有生产,就可以解锁唤醒消费队列了

q_.pop();

pthread_mutex_unlock(&mutex_);

对策略的添加:

发现生产和消费的同步,通过水位线来进行范围管控,例如:

if(q_.size()>high_water_) pthread_cond_signal(&c_cond_);//大于某一水位后,唤醒尽快消费if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);

test 函数:

#include "BlockQueue.hpp"

#include "Task.hpp"

#include <unistd.h>

#include <ctime>

void *Consumer(void *args)

{

BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);

while (true)

{

// 消费(存在管控的加入执行,调用等待队列的封装接口)

Task t = bq->pop();

std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;

t.run();//接收到的任务对象,调用接口跑起来了

sleep(1);//Pop前已经检验测试一大堆了

}

}

void *Productor(void *args)

{

int len = opers.size();

BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);

int x = 10;

int y = 20;

while (true)

{

// 模拟生产者生产数据

int data1 = rand() % 10 + 1; // [1,10]

usleep(10);

int data2 = rand() % 10;

char op = opers[rand() % len];

Task t(data1, data2, op);

// 生产

bq->push(t);

std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << pthread_self() << std::endl;

sleep(1);

}

}

int main()

{

srand(time(nullptr));

// 因为 321 原则

BlockQueue<Task> *bq = new BlockQueue<Task>();

pthread_t c[3], p[5];

for (int i = 0; i < 3; i++)

{

pthread_create(c + i, nullptr, Consumer, bq);

}

for (int i = 0; i < 5; i++)

{

pthread_create(p + i, nullptr, Productor, bq);

}

for (int i = 0; i < 3; i++)

{

pthread_join(c[i], nullptr);

}

for (int i = 0; i < 5; i++)

{

pthread_join(p[i], nullptr);

}

delete bq;

return 0;

}

BlockQueue 内部可不可以传递其他数据,比如对象?比如任务???

可以。进化为基于任务的阻塞队列了

进化执行 Task.hpp

Task t = bq->pop();cout 执行直接变为t.run()

例如执行如下任务

#pragma once

#include <iostream>

#include <string>

std::string opers="+-*/%";code>

enum{

DivZero=1,

ModZero,

Unknown

};

class Task

{

public:

Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)

{

}

void run()

{

switch (oper_)

{

case '+':

result_ = data1_ + data2_;

break;

case '-':

result_ = data1_ - data2_;

break;

case '*':

result_ = data1_ * data2_;

break;

case '/':

{

if(data2_ == 0) exitcode_ = DivZero;

else result_ = data1_ / data2_;

}

break;

case '%':

{

if(data2_ == 0) exitcode_ = ModZero;

else result_ = data1_ % data2_;

} break;

default:

exitcode_ = Unknown;

break;

}

}

void operator ()()

{

run();

}

std::string GetResult()

{

std::string r = std::to_string(data1_);

r += oper_;

r += std::to_string(data2_);

r += "=";

r += std::to_string(result_);

r += "[code: ";

r += std::to_string(exitcode_);

r += "]";

return r;

}

std::string GetTask()

{

std::string r = std::to_string(data1_);

r += oper_;

r += std::to_string(data2_);

r += "=?";

return r;

}

~Task()

{

}

private:

int data1_;

int data2_;

char oper_;

int result_;

int exitcode_;

};

生产者消费者模型高效在哪里?

答案是生产者消费者模式并不高效在队列中拿放,而是在生产之前和消费之后,让线程并行执行!!

同样生产者消费者的意义也不再队列中,而是在放之前同时生产,拿之后同时消费。


3. 注意点

判断生产消费条件

这是因为线程可能被伪唤醒(即线程被唤醒但条件仍未满足),使用 while 可以确保线程在真正满足条件时才继续执行。

pthread_cond_wait 函数

pthread_cond_wait 是让当前线程进入等待状态的函数。如果调用失败,线程将继续执行,可能导致逻辑错误(如尝试从空队列中取数据或向满队列中添加数据)。

多消费者情况下的唤醒

使用 pthread_cond_broadcast 唤醒所有等待的消费者时,若只有一个数据可供消费,则会导致其他消费者被伪唤醒。为了避免这种情况,线程在被唤醒后应再次检查条件是否满足。

使用 while 判断的必要性

在判断是否满足生产或消费条件时,应使用 while 循环而非 if 语句。

为了防止伪唤醒导致的问题,必须使用 while ,确保线程在满足条件时才继续执行。

思路:上锁访问,检查达到某一条件,等待,未达到执行,检查完后解锁,所检查完后满足的线程,接收任务同时跑,线程再上锁访问,尝试获取下一个任务以此循环。

cp 模型的思路如上,简单实现了代码,下一章将结合信号量,优化完善代码并进行测试~



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。