【Linux】线程池|单例模式|STL、智能指针线程安全|读者写者问题

დ旧言~ 2024-07-11 13:07:03 阅读 52

 > 作者:დ旧言~

> 座右铭:松树千年终是朽,槿花一日自为荣。

> 目标:理解【Linux】线程池|单例模式|STL、智能指针线程安全|读者写者问题。

> 毒鸡汤:有些事情,总是不明白,所以我不会坚持。早安!

> 专栏选自:Linux初阶

> 望小伙伴们点赞👍收藏✨加关注哟💕💕

​​

🌟前言

今天是Linux的最后一片博客,相信大家已经坚持下来了,还是那句话 " 学,然后知不足 "!!!

⭐主体

学习【Linux】线程池|单例模式|STL、智能指针线程安全|读者写者问题咱们按照下面的图解:

​🌙 线程池

线程概念:

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

线程池示例:

创建固定数量线程池,循环从任务队列中获取任务对象。获取到任务对象后,执行任务对象中的任务接口。

​🌙 线程池代码

Thread.hpp的简单封装线程,下面我们进行简单的验证:

Thread类主要成员变量是线程名,函数,线程参数,参数ID以及对应编号。Thread类提供了一个无参构造,完成对成员变量name的赋值。同时,对外主要提供了start接口和join接口,对于join接口就是线程等待,而对于start接口就是创建线程的接口,在外部如果调用的话我们需要传入对应的函数以及线程对应的参数。

代码如下:

<code>pragma once

#include <iostream>

#include <string>

#include <functional>

#include <cstring>

#include <cassert>

#include <pthread.h>

namespace ThreadNs

{

typedef std::function<void *(void *)> func_t;

const int num = 1024;

class Thread

{

private:

static void *start_routine(void *args)

{

Thread *td = static_cast<Thread *>(args);

return td->callback();

}

public:

Thread()

{

char buffer[num];

snprintf(buffer, sizeof buffer, "thread-%d", threadnum++);

_name = buffer;

}

void start(func_t func, void *args)

{

_func = func;

_args = args;

int n = pthread_create(&_tid, nullptr, start_routine, this);

}

void join()

{

int n = pthread_join(_tid, nullptr);

assert(n == 0);

(void)n;

}

std::string threadname()

{

return _name;

}

void *callback()

{

return _func(_args);

}

~Thread()

{

}

private:

std::string _name;

void *_args;

func_t _func;

pthread_t _tid;

static int threadnum;

};

int Thread::threadnum = 1;

}

对于任务队列,可以由多个线程进行访问,我们就需要加锁保护了,把之前写过的锁的小组件引入进来:

LockGuard.hpp:

#include <iostream>

#include <mutex>

class Mutex

{

public:

Mutex(pthread_mutex_t*lock_p=nullptr)

:lock_p_(lock_p)

{}

void lock()

{

if(lock_p_) pthread_mutex_lock(lock_p_);

}

void unlock()

{

if(lock_p_) pthread_mutex_unlock(lock_p_);

}

~Mutex()

{}

private:

pthread_mutex_t * lock_p_;

};

class LockGuard

{

public:

LockGuard(pthread_mutex_t*mutex)

:mutex_(mutex)

{

mutex_.lock();

}

~LockGuard()

{

mutex_.unlock();

}

private:

Mutex mutex_;

};

线程池代码如下:创建一批线程时,我们需要实现线程的运行函数static void*handlerTask,之所以是静态的,是因为我们要把这个运行函数传递给Thread类中的func_,不能有this指针,所以是静态成员函数。而没有this指针,我们无法访问ThreadPool里面的成员变量,所以需要封装接口供其调用。

ThreadPool.hpp:

#pragma once

#include "Thread.hpp"

#include "LockGuard.hpp"

#include <vector>

#include <queue>

#include <pthread.h>

#include <mutex>

#include <unistd.h>

using namespace ThreadNs;

const int gnum = 3;

template <class T>

class ThreadPool;

template <class T>

class ThreadData

{

public:

ThreadPool<T> *threadpool;

std::string name;

public:

ThreadData(ThreadPool<T> *tp, const std::string &n) : threadpool(tp), name(n)

{ }

};

template <class T>

class ThreadPool

{

private:

static void *handlerTask(void *args)

{

ThreadData<T> *td = (ThreadData<T> *)args;

ThreadPool<T> *threadpool = static_cast<ThreadPool<T> *>(args);

while (true)

{

T t;

{

LockGuard lockguard(td->threadpool->mutex());

while(td->threadpool->isQueueEmpty())

{

td->threadpool->threadWait();

}

t = td->threadpool->pop();

}

std::cout << td->name << " 获取了一个任务" << t.toTaskString() << "并处理完成,结果是: " << t() << std::endl;

}

delete td;

return nullptr;

}

public:

void lockQueue() { pthread_mutex_lock(&_mutex); }

void unlockQueue() { pthread_mutex_unlock(&_mutex); }

bool isQueueEmpty() { return _task_queue.empty(); }

void threadWait() { pthread_cond_wait(&_cond, &_mutex); }

T pop()

{

T t = _task_queue.front();

_task_queue.pop();

return t;

}

void Push(const T &in)

{

LockGuard lockguard(&_mutex);

_task_queue.push(in);

pthread_cond_signal(&_cond);

}

pthread_mutex_t *mutex()

{

return &_mutex;

}

public:

ThreadPool(const int &num = gnum) : _num(num)

{

pthread_mutex_init(&_mutex, nullptr);

pthread_cond_init(&_cond, nullptr);

for (int i = 0; i < _num; i++)

{

_threads.push_back(new Thread());

}

}

void run()

{

for (const auto &t : _threads)

{

ThreadData<T> *td = new ThreadData<T>(this, t->threadname());

t->start(handlerTask, td);

std::cout << t->threadname() << "start..." << std::endl;

}

}

~ThreadPool()

{

pthread_mutex_destroy(&_mutex);

pthread_cond_destroy(&_cond);

for (const auto &t : _threads)

delete t;

}

private:

int _num;

std::vector<Thread *> _threads;

std::queue<T> _task_queue;

pthread_mutex_t _mutex;

pthread_cond_t _cond;

};

我们将线程池进行了模板化,因此线程池当中存储的任务类型可以是任意的;现在我们想像之前处理各种数据的计算,那么先引入任务组件:

Task.hpp:

#pragma once

#include <iostream>

#include <functional>

class Task

{

using func_t = std::function<int(int,int ,char)>;

public:

Task(){}

Task(int x,int y,char op,func_t func)

:_x(x),_y(y),_op(op),_callback(func)

{}

std::string operator()()

{

int result = _callback(_x,_y,_op);

char buffer[1024];

snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);

return buffer;

}

std::string toTaskString()

{

char buffer[1024];

snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);

return buffer;

}

private:

int _x;

int _y;

char _op;

func_t _callback;

};

const std::string oper = "+-*/%";

int mymath(int x,int y,char op)

{

int result = 0;

switch(op)

{

case '+':

result = x+y;

break;

case '-':

result = x-y;

break;

case '*':

result = x*y;

break;

case '/':

if(y==0)

{

std::cerr<<"div zero error!"<<std::endl;

result = -1;

}

else

{

result = x/y;

}

break;

case '%':

if(y==0)

{

std::cerr<<"mod zero error!"<<std::endl;

result = -1;

}

else

{

result = x%y;

}

break;

default:

break;

}

return result;

}

main.cc:

#include "ThreadPool.hpp"

#include "Thread.hpp"

#include "Task.hpp"

#include <unistd.h>

#include <ctime>

int main()

{

ThreadPool<Task>* tp = new ThreadPool<Task>();

tp->run();

srand(time(0));

int x,y;

char op;

while(true)

{

x = rand()%10+1;

y = rand()%20+1;

op =oper[rand()%oper.size()];

Task t(x,y,op,mymath);

tp->Push(t);

sleep(1);

}

return 0;

}

​🌙 线程池单列模式

分析:

IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式.

单例模式的特点:

某些类, 只应该具有一个对象(实例), 就称之为单例.例如一个男人只能有一个媳妇.在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.

饿汉实现方式和懒汉实现方式:(我们以洗碗的例子来说明懒汉模式和饿汉模式

吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭。吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式。懒汉方式最核心的思想是 “延时加载”. 从而能够优化服务器的启动速度。

饿汉方式实现单例模式:

<code>template <class T>

class Singleton

{

static T data;

public:

static T* GetInstance()

{

return &data;

}

};

只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例。

懒汉方式实现单例模式:

template <class T>

class Singleton

{

static T* inst;

public:

static T* GetInstance()

{

if (inst == NULL) {

inst = new T();

}

return inst;

}

};

存在一个严重的问题, 线程不安全。第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例,但是后续再次调用, 就没有问题了。

懒汉方式实现单例模式(线程安全版本):

// 懒汉模式, 线程安全

template <class T>

class Singleton

{

volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.

static std::mutex lock;

public:

static T* GetInstance()

{

if (inst == NULL)

{

// 双重判定空指针, 降低锁冲突的概率, 提高性能.

lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.

if (inst == NULL)

{

inst = new T();

}

lock.unlock();

}

return inst;

}

};

注意事项:

加锁解锁的位置。双重 if 判定, 避免不必要的锁竞争。volatile关键字防止过度优化。

​🌙 STL,智能指针和线程安全(拓展)

STL中的容器是否是线程安全的?

不是。原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响。而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全。

智能指针是否是线程安全的?

对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题。对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数。

​🌙 常见的各种锁

悲观锁:

在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。

乐观锁:

每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。

CAS操作:

当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试

自旋锁:

使用自旋锁时,当多线程发生竞争锁的情况时,加锁失败的线程会忙等待(这里的忙等待可以用 while 循环等待实现),直到它拿到锁。而互斥锁加锁失败后,线程会让出 CPU 资源给其他线程使用,然后该线程会被阻塞挂起。如果成功申请临界资源的线程,临界区代码执行时间过长,自旋的线程会长时间占用 CPU 资源,所以自旋的时间和临界区代码执行的时间是成正比的关系。如果临界区代码执行的时间很短,就不应该使用互斥锁,而应该选用自旋锁。因为互斥锁加锁失败,是需要发生上下文切换的,如果临界区执行的时间比较短,那可能上下文切换的时间会比临界区代码执行的时间还要长。

​🌙 读者写者问题

读写锁:

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。

读写锁接口:

//初始化读写锁

pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

//销毁读写锁

pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

//读加锁

pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

//写加锁

pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

//解锁

pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

🌟结束语 

       今天内容就到这里啦,时间过得很快,大家沉下心来好好学习,会有一定的收获的,大家多多坚持,嘻嘻,成功路上注定孤独,因为坚持的人不多。那请大家举起自己的小手给博主一键三连,有你们的支持是我最大的动力💞💞💞,回见。

​​​ 



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。