[Linux]条件变量:实现线程同步(什么是条件变量、为什么需要条件变量,怎么使用条件变量(接口)、例子,代码演示(生产者消费者模型))

这博客不写也罢 2024-07-19 09:07:06 阅读 85

目录

一、条件变量

1.什么是条件变量

故事说明

2、为什么需要使用条件变量

竞态条件

 3.什么是同步 

饥饿问题

二、条件变量的接口

1.pthread_cond_t

2.初始化(pthread_cond_init)

3.销毁(pthread_cond_destroy)

4.等待(pthread_cond_wait)

5.唤醒(pthread_cond_signal && pthread_cond_broadcast)

pthread_cond_signal

pthread_cond_broadcast

三、使用演示 (模拟生产者消费者模型)


一、条件变量

1.什么是条件变量

条件变量(Condition Variable)是一种用于线程同步的机制,通常与互斥锁(Mutex)一起使用。条件变量提供了一种线程间的通信机制,允许一个线程等待另一个线程满足某个条件后再继续执行。

故事说明

现在小明要在在一张桌子上放一个苹果,而旁边有一群蒙着眼睛的人,因为他们的眼睛被蒙着,他们如果想拿到这个苹果,就会时不时来桌子前摸一摸看看桌子是否有苹果,并且谁来桌子前摸苹果是无序的,这时的场面就很混乱,小明一看不行,于是小明就桌子上放了个铃铛,并且组织需要苹果的人排好队,有苹果小明就会摇响铃铛,排在第一个的人就拿走苹果,排到队尾排队等待。此时混乱的场面就显得井然有序了。在本故事中,小明就是操作系统,苹果就是临界资源,一群蒙着眼睛都人就是多线程,铃铛就是条件变量,排队就是实现同步,摇响铃铛就是唤醒线程。

2、为什么需要使用条件变量

使用条件变量主要是因为它们提供了在多线程编程中一种有效的同步机制。当多个线程需要等待某个特定条件成立才能继续执行时,条件变量就显得尤为重要。通过条件变量,线程可以安全地进入等待状态,直到被其他线程显式地唤醒或满足等待的条件。这有助于避免线程的无谓轮询或忙等待,提高了系统的响应能力和效率。

注意:在使用条件变量时,必须确保与互斥锁一起使用,以避免竞态条件的发生

竞态条件

竞态条件(Race Condition)是指在设备或系统尝试同时执行两个或多个操作时,由于操作顺序不当而导致的不期望的结果。简单来说就是因为时序问题,而导致程序异常。

 

 3.什么是同步 

在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。

饥饿问题

饥饿问题指的是某些线程由于某种原因无法获得它们所需要的资源或执行机会,导致它们长时间得不到处理,甚至永远得不到处理的现象。这种情况通常发生在多个线程竞争有限资源时,其中一些线程可能因为优先级过低、调度算法的不公平性、同步机制使用不当或其他原因而无法获得足够的执行时间。

二、条件变量的接口

1.pthread_cond_t

<code>pthread_cond_t 是 POSIX 线程库(Pthreads)中用于表示条件变量的数据类型。

2.初始化(pthread_cond_init)

功能:初始化条件变量

原型

#include <pthread.h>

方式一(pthread_cond_t是局部全局都可以用):

int pthread_cond_init(pthread_cond_t *restrict cond,

                       const pthread_condattr_t *restrict attr);

方式二(pthread_cond_t是全局变量时):

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

注意:restrict 是一个类型限定符,它用于告知编译器两个指针不会指向同一个内存位置,这样编译器可以生成更高效的代码

参数

cond:一个指向 pthread_cond_t 类型的指针,用于存储初始化后的条件变量。attr:一个指向 pthread_condattr_t 类型的指针,用于指定条件变量的属性。通常可以传递 NULL(nullptr),以使用默认属性。

返回值

如果成功,返回 0。如果失败,返回错误码。

使用例子:

#include <pthread.h>

#include <stdio.h>

pthread_cond_t cond; // 全局 pthread_cond_t 变量

int main() {

int rc;

// 显式初始化全局 pthread_cond_t 变量

rc = pthread_cond_init(&cond, NULL);

if (rc != 0) {

printf("Cond init failed: %d\n", rc);

return 1;

}

// ... 其他代码,包括线程创建和同步 ...

// 在不再需要条件变量时销毁它

//...

return 0;

}

3.销毁(pthread_cond_destroy)

功能:销毁条件变量

原型

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);

参数

cond:指向要销毁的条件变量的指针。

返回值

如果成功,返回 0。如果失败,返回错误码。

4.等待(pthread_cond_wait)

pthread_cond_wait 的行为如下:

解锁互斥锁:调用 pthread_cond_wait 的线程首先会释放(解锁)它当前持有的互斥锁。这一步是必要的,因为条件变量通常与互斥锁一起使用,以确保对共享数据的访问是同步的。解锁互斥锁允许其他线程获取该锁,从而可以安全地修改共享数据。

加入等待队列:在解锁互斥锁之后,调用 pthread_cond_wait 的线程会将自己添加到与该条件变量相关联的等待队列中。此时,线程进入阻塞状态,等待被唤醒。

阻塞并等待信号:线程在等待队列中保持阻塞状态,直到它收到一个针对该条件变量的信号(通过 pthread_cond_signal 或 pthread_cond_broadcast 发出)。需要注意的是,仅仅因为线程在等待队列中并不意味着它会立即收到信号;它必须等待直到有其他线程显式地发出信号。

重新获取互斥锁:当线程收到信号并准备从 pthread_cond_wait 返回时,它首先会尝试重新获取之前释放的互斥锁。如果此时锁被其他线程持有,那么该线程会阻塞在互斥锁的等待队列中,直到获得锁为止。这一步确保了线程在继续执行之前能够重新获得对共享数据的独占访问权。

检查条件:一旦线程成功获取到互斥锁,它会再次检查导致它调用 pthread_cond_wait 的条件是否现在满足。虽然通常认为在收到信号时条件已经满足,但这是一个编程错误的常见来源。正确的做法是在每次从 pthread_cond_wait 返回后都重新检查条件,因为可能有多个线程在等待相同的条件,或者条件可能在信号发出和线程被唤醒之间发生变化。

返回并继续执行:如果条件满足,线程会从 pthread_cond_wait 返回,并继续执行后续的代码。如果条件仍然不满足,线程可以选择再次调用 pthread_cond_wait 进入等待状态,或者执行其他操作。

功能:解锁互斥锁,并使线程进入等待状态,直到指定的条件变量被其他线程信号通知或广播。

原型

#include <pthread.h>

int pthread_cond_wait(pthread_cond_t *restrict cond,

                  pthread_mutex_t *restrict mutex);

参数

cond:指向条件变量的指针。mutex:指向互斥锁的指针,该互斥锁应该在调用 pthread_cond_wait 之前由当前线程锁定。调用该函数时互斥锁mutex会被解锁。

返回值

如果成功,返回 0。如果失败,返回错误码。

5.唤醒(pthread_cond_signal && pthread_cond_broadcast)

pthread_cond_signal

功能:唤醒正在等待特定条件变量的一个线程

原型

#include <pthread.h>

int pthread_cond_signal(pthread_cond_t *cond);

参数

cond:指向要发送信号(广播)的条件变量的指针。

返回值

如果成功,返回 0。如果失败,返回错误码。

问题:在调用pthread_cond_wait前如果已经提前收到唤醒通知会怎么样?

答:

如果在调用pthread_cond_wait之前线程已经收到了条件变量的唤醒通知(通过pthread_cond_signalpthread_cond_broadcast),那么该通知实际上会被“记住”,直到线程真正进入pthread_cond_wait并准备返回。这是因为条件变量的实现通常包含一个等待队列,用于存储那些正在等待条件变量的线程。当调用pthread_cond_signalpthread_cond_broadcast时,会唤醒等待队列中的一个或多个线程,但如果没有线程实际在pthread_cond_wait中等待,那么这个通知就会被保留,直到有线程调用pthread_cond_wait

pthread_cond_broadcast

功能:用于唤醒所有正在等待指定条件变量的线程

原型

#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cond);

参数

cond:指向要发送信号(广播)的条件变量的指针。

返回值

如果成功,返回 0。如果失败,返回错误码。

三、使用演示 (模拟生产者消费者模型)

说明:模拟生产者消费者模式

注意:使用pthrad原生线程库(POSIX库)要链接库:-lpthread

不会连接动态库的可以看我这篇文章:[Linux]动静态库(什么是动静态库,怎么生成动静态库,怎么使用(连接)动静态库)-CSDN博客

cond.cc

#include <iostream>

#include <pthread.h>

#include <vector>

#include <string>

#include <unistd.h>

using namespace std;

// 定义条件变量和互斥锁

// 全局的初始化方式

pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// 共享变量,用于线程间的同步

int shared_data = 0;

// 线程函数,模拟生产者

void *producer(void *args)

{

string producer_name = static_cast<char *>(args);

// 生产数据,并通知消费者

while (1)

{

// 锁定互斥锁

pthread_mutex_lock(&mutex);

// 生产数据(这里只是简单地递增shared_data)

shared_data++;

cout << "I is " << producer_name << " "

<< " Producer produced data: "

<< shared_data << endl;

// 唤醒等待的消费者线程

pthread_cond_signal(&cond_var);

// 解锁互斥锁

pthread_mutex_unlock(&mutex);

// 模拟生产耗时

sleep(1);

}

return nullptr;

}

// 线程函数,模拟消费者

void *consumer(void *args)

{

string consumer_name = static_cast<char *>(args);

// 消费数据

while (1)

{

// 锁定互斥锁

pthread_mutex_lock(&mutex);

// 等待生产者生产数据

while (shared_data == 0)

{

// 等待条件变量,解锁互斥锁,进入等待状态

pthread_cond_wait(&cond_var, &mutex);

}

// 消费数据(这里只是简单地递减shared_data)

shared_data--;

cout << "I is " << consumer_name << " "

<< " Consumer consumed data: "

<< shared_data << endl;

cout << "-----------------------------------"

<< endl;

// 解锁互斥锁

pthread_mutex_unlock(&mutex);

// 模拟消费耗时

sleep(4);

}

return nullptr;

}

int main()

{

int producer_thread_num = 5; // 生产者人数

int consumer_thread_num = 10; // 消费者人数

vector<pthread_t> producers;

vector<pthread_t> consumers;

for (int i = 0; i < producer_thread_num; i++)

{

pthread_t producer_thread; // 生产者

char buffer[64];

sprintf(buffer, "producer-%d", i + 1);

// 创建生产者线程

if (pthread_create(&producer_thread, nullptr, producer, buffer) != 0)

{

perror("pthread_create producer");

exit(EXIT_FAILURE);

}

producers.push_back(producer_thread);//保存pthread_t,以备等待回收

}

for (int i = 0; i < consumer_thread_num; i++)

{

pthread_t consumer_thread; // 消费者

// 创建消费者线程

char buffer[64];

sprintf(buffer, "consumer-%d", i + 1);

if (pthread_create(&consumer_thread, nullptr, consumer, buffer) != 0)

{

perror("pthread_create consumer");

exit(EXIT_FAILURE);

}

consumers.push_back(consumer_thread);//保存pthread_t,以备等待回收

}

// 等待线程结束

for (auto& thraed:producers)

{

pthread_join(thraed, nullptr);

}

for (auto& thraed:consumers)

{

pthread_join(thraed, nullptr);

}

// 销毁条件变量

pthread_cond_destroy(&cond_var);

// 销毁锁

pthread_mutex_destroy(&mutex);

return 0;

}

Makefile

mycond:cond.cc

g++ -o $@ $^ -std=c++11 -lpthread

.PHONY:clean

clean:

rm -f mycond

结果

0d98aa1785264b589c7f43f857e454a2.png



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。