【Linux】线程池项目详解

CSDN 2024-09-18 13:07:03 阅读 77

在这里插入图片描述

回避、躲闪、辗转腾挪都毫无作用,

既然来的总是要来,

迎着刀锋而上恐怕是最好的选择,

起码节约时间。

--- 廖一梅 《像我这样笨拙地生活》---


线程池项目

1 线程基础2 什么是线程池3 线程池工作原理4 构建线程池4.1 框架搭建4.3 HandlerTask函数4.3 基础函数4.4 单例模式改造4.5 测试运行

5 总结

1 线程基础

线程我们已经学习的差不多了,从线程的概念:

线程的概念我们先从虚拟内存和物理内存之间的页表开始谈起

虚拟内存和物理内存的映射是通过一个二维数组进行的映射,每个元素指向物理内存的 4KB内存块。在通过虚拟地址的后12位来进行4096字节的对应!类型 + 起始地址就能从内存中找到对应的数据!

了解的虚拟内存和物理内存,我们就能知道虚拟内存本质是一种资源可以进行分配。

线程:在进程内部运行,是CPU调度的基本单位。

Linux中是直接套用的进程模块,实现的一种轻量级进程,与主线程共享地址空间!调用成本比多进程低很多!!!

线程 = 线程库里的属性集 + LWP(轻量级进程)线程的本质是代码块!只使用函数的对应代码,即拿页表的一部分来执行!!!

对于线程创建的接口我们也足够熟悉了!不再赘述

线程的管理是在共享区完成的,编译时,动态链接线程库,映射在地址空间的共享区中。在这个共享区中储存着线程的属性内存块(包含线程单独的栈结构),通过tid我们可以找到线程的所有属性。每个线程都对应一个LWP的pid,这是系统层线程调度的单位!

需要特别注意的是线程互斥的场景,在多线程的场景下,对于全局资源的处理有且只能用一个线程进行操作,否则就会出现意想不到的后果!对于多线程的场景使用互斥锁来对全局资源进行保护,可以通过RAII规则的锁守卫完成只能加减锁!

2 什么是线程池

池化技术是一种广泛应用于系统开发中的优化策略,旨在通过复用资源来提高性能和效率。池化技术的核心思想是预先分配一组资源,并在需要时进行复用,而不是每次都重新创建和销毁资源。

池化技术(Pooling)涉及创建和管理一组预先分配的资源,这些资源可以是进程、线程、数据库连接或对象实例。在池化系统中,当请求到达时,它会从池中获取一个空闲资源,使用完毕后将其归还池中。这种方法避免了频繁的创建和销毁操作,从而显著减少了系统开销。

之前我们实现过进程池:

进程池就是通过预先创建若干个进程与管道,在需要进行任务时,选择一个进程,通过管道发送信息,让其完成工作。不同的进程因为不能共享地址空间,所以想要协同工作就需要进行进程间通信,这里使用管道来实现进程

间的通信。而对于线程池来说,多线程之间是共享地址空间的,所以不需要进行额外的通信,直接调用线程来执行任务就可以!

线程池完成的工作就是在程序运行时,自动创建出若干个线程等待主线程发送任务进行执行,这样不再需要每次再创建线程来完成一个任务,只需要向任务队列中压入任务,线程池就会自动唤醒一个线程来执行任务,执行完就会继续等待任务的到来!

在这里插入图片描述

线程池的应用场景:

需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误

3 线程池工作原理

线程池的关键部分可以分为:

线程容器:用来管理创建的线程,方便统一初始化。任务队列:用来储存任务消息,需要支持压入与取出的操作。线程函数:线程都需要执行这个函数模块,在这个函数模块中进行任务的等待和执行。线程唤醒机制:需要一个线程换取机制,通过条件变量个互斥锁完成对线程的保护与唤醒。单例模式:线程池不需要创建多个,一个程序只需要一个线程池,通过单例模式进行优化。

这样,通过主线程对线程池中进行的入队列操作就可以传入任务,然后线程池中会自动检测队列中是否有任务,有任务就调用休眠的线程来执行任务。

4 构建线程池

4.1 框架搭建

首先针对线程池的关键组件进行一个框架的构建:

线程池的成员变量:

线程计数 int _thread_num内部容器 vector _threads任务队列 queue _tast_queue 这个任务队列会被多线程访问时临界资源!需要所保护运行判断 bool _isrunning; 判断是否结束互斥锁 pthread_mutex_t _mtx条件变量 pthread_cond_t _cond 条件不满足(任务队列无任务)时线程阻塞休眠线程计数器:_sleep_num 用来判断是否需要唤醒阻塞的线程(判断方法自定义)

为了使用线程方便,我们直接使用之前实现的线程类!

然后我们还需要一下功能函数来支持主线程传入任务,主线程停止工作,线程池读取任务,线程池删除旧任务:

功能函数:

初始化Init:构建线程,并储存在容器中。线程进入线程等待函数 void work()开始运行Start:遍历进入线程运行 , _isrunning = true停止运行Stop: _isrunning = false! 一切的功能只有true时才可以运行加入任务Equeue:对临界资源进行操作,先上锁,在插入数据,插入之后可以唤醒休眠的线程来执行任务,如果没有就不需要处理,只有运行状态才可以进行插入!

为了方便进行加减锁的操作,我们可以完成一些线程池内部函数:

内部函数

加锁 LockQueue()解锁 UnlockQueue()唤醒线程 WakeUp() : 通过条件变量来唤醒一个线程 — WakeUpAll:唤醒全部队列是否为空 IsEmpty()线程休眠 Sleep() 等待条件变量响应线程任务 HandlerTask(): 启动时所有线程都来执行该函数,有任务就执行任务,没有就阻塞等待,需要注意的是,该函数时成员函数,存在隐藏参数this,不满足条件,可以使用bind绑定来解决,有任务就执行任务

主要框架

<code>#pragma once

#include "Thread.hpp"

#include <vector>

#include <queue>

#include <string>

#include "Log.hpp"

using namespace ThreadMouble;

using namespace log_ns;

const int default_num = 5;

// 测试代码

void test()

{

std::cout << "这是一个测试程序!" << std::endl;

}

template <class T>

class ThreadPool

{

private:

// 加锁 解锁

void LockQueue()

{

pthread_mutex_lock(&_mtx);

}

void UnlockQueue()

{

pthread_mutex_unlock(&_mtx);

}

// 休眠等待

void Sleep()

{

pthread_cond_wait(&_cond, &_mtx);

}

// 唤醒一个线程

void WakeUp()

{

pthread_cond_signal(&_cond);

}

// 全部唤醒

void WakeUpAll()

{

pthread_cond_broadcast(&_cond);

}

// 队列为空

bool IsEmpty()

{

return _tasks.empty();

}

void HandlerTask(std::string &name)

{

}

public:

ThreadPool(int num = default_num) : _thread_num(num), _sleep_num(0), _isrunning(false)

{

pthread_mutex_init(&_mtx, nullptr);

pthread_cond_init(&_cond, nullptr);

}

public:

static ThreadPool<T> *GetInstance()

{

}

// 初始化

void Init()

{

}

// 开始运行

void Start()

{

}

// 停止运行

void Stop()

{

}

// 加入任务

void Equeue(T &in)

{

}

~ThreadPool()

{

pthread_mutex_destroy(&_mtx);

pthread_cond_destroy(&_cond);

}

private:

// 线程容器

std::vector<Thread> _threads;

// 任务队列

std::queue<T> _tasks;

// 线程数量

int _thread_num;

// 休眠数量

int _sleep_num;

// 运行判断

bool _isrunning;

// 加锁保护队列

pthread_mutex_t _mtx;

// 条件变量

pthread_cond_t _cond;

};

4.3 HandlerTask函数

我们首先先来完成每个线程创建的新线程都会进行的函数:

首先这个函数需要不断的执行,所以使用while(true)使其不断地轮询然后就是对队列任务的读取,如果队列为空并且线程池还在运行,那么就进入进行等待条件变量唤醒,需要注意的是休眠数需要进行处理如果队列为空了,并且停止运行了,就直接退出!退出前进行解锁!如果队列不为空,并且还在运行,那么就从队列中取出一个任务进行执行!

void HandlerTask(std::string &name)

{

// 运行任务

while (true)

{

LockQueue();

// 队列为空并且正在运行

while (IsEmpty() && _isrunning)

{

// 进行阻塞

_sleep_num++;

LOG(INFO, "%s sleep begin!\n", name.c_str());

Sleep();

LOG(INFO, "%s wakeup!\n", name.c_str());

_sleep_num--;

}

// 如果队列为空 停止运行了

if (IsEmpty() && !_isrunning)

{

// 直接解锁退出

UnlockQueue();

// std::cout << name << " stop 退出!" << std::endl;

LOG(INFO, "%s quit !\n", name.c_str());

break;

}

// 取出一个任务

T t = _tasks.front();

_tasks.pop();

// 解锁

UnlockQueue();

// 临界区之外执行任务

t();

// std::cout << name << " " << t.result() << std::endl;

LOG(DEBUG, "HandlerTask Done, task is : %s\n", t.result().c_str());

}

}

完成!

4.3 基础函数

我们先来实现初始化init , 开始运行 start ,停止运行stop,加入任务

初始化:首先就是创建若干个线程,再将创建的线程存入线程容器中。

// 初始化

void Init()

{

// 进行绑定

func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);

for (int i = 0; i < _thread_num; i++)

{

std::string name = "thread-" + std::to_string(i + 1);

// std::cout << name << " init!" << std::endl;

//_threads.emplace_back(name, test); //测试

_threads.emplace_back(name, func);

LOG(DEBUG, "construct thread : %s done , init success\n", name.c_str());

}

}

开始运行:直接遍历一遍进行开始运行即可!每个线程都来执行HandlerTask

// 开始运行

void Start()

{

_isrunning = true;

for (auto &e : _threads)

{

// std::cout << e.getname() << " start!" << std::endl;

LOG(DEBUG, "start thread %s done\n", e.Name().c_str());

e.Start();

}

停止运行:唤醒所有休眠的线程,并将判断符设置为false

// 停止运行

void Stop()

{

LockQueue();

// std::cout << "void stop()" << std::endl;

WakeUpAll();

_isrunning = false;

UnlockQueue();

LOG(INFO, "ThreadPool Stop success\n");

}

加入任务: 这里会对全局变量进行操作,所以先上锁。在线程池还在运行时才可以进程任务的插入,插入后,如果有休眠的线程就唤醒一个休眠的线程来执行任务!

// 加入任务

void Equeue(T &in)

{

// 临界区操作需要加锁

LockQueue();

// 只有线程池运行才可以进行插入

if (_isrunning)

{

_tasks.push(in);

// std::cout << "加入任务 : " << in.debug() << std::endl;

LOG(INFO, "push task : %s\n", in.debug().c_str());

// 唤醒一个线程

if (_sleep_num > 0)

WakeUp();

}

UnlockQueue();

}

4.4 单例模式改造

单例模式之前的文章有介绍过:设计模式 — 单例模式

单例模式:一个类只能创建一个对象,即单例模式,该模式可以保证系统中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。

接下来我们通过懒汉模式进行优化:

首先我们要做的就是将构造函数私有化,让类外部不能够创建对象,并且封锁赋值重载和拷贝构造!然后类内需要一个静态类对象指针,并且使用单独一个全局锁进行保护完成一个获取唯一类对象指针的函数方法getinstance

static ThreadPool<T> *GetInstance()

{

if (_tp == nullptr)

{

LockGuard lock(&_sig_mtx);

if (_tp == nullptr)

{

LOG(INFO, "create threadpool\n");

// thread-1 thread-2 thread-3....

_tp = new ThreadPool();

_tp->Init();

_tp->Start();

}

else

{

LOG(INFO, "get threadpool\n");

}

}

return _tp;

}

// 单例模式

template <class T>

class ThreadPool

{

//...

static ThreadPool<T> *_tp;

static pthread_mutex_t _sig_mtx;

//...

};

//类外初始化

template <class T>

ThreadPool<T> *ThreadPool<T>::_tp = nullptr;

template <class T>

pthread_mutex_t ThreadPool<T>::_sig_mtx = PTHREAD_MUTEX_INITIALIZER;

这样单例模式就完成了!

4.5 测试运行

#include "ThreadPool.hpp"

#include <iostream>

#include "Task.hpp"

#include <stdlib.h>

#include <unistd.h>

#include <sys/types.h>

#include "Log.hpp"

int main()

{

srand(time(nullptr) ^ getpid());

// ThreadPool<Task> *tp = new ThreadPool<Task>();

// tp->Init();

// tp->Start();

ThreadPool<Task> *tp = ThreadPool<Task>::GetInstance();

int cnt = 5;

while (--cnt)

{

int num1 = rand() % 10;

usleep(1000);

int num2 = rand() % 10;

Task t(num1, num2);

tp->Equeue(t);

LOG(INFO , "Equeue a task , %s\n" , t.debug().c_str());

sleep(1);

}

tp->Stop();

LOG(INFO , "ThreadPool stop! \n");

return 0;

}

我们来进行测试:

在这里插入图片描述

很好的完成测试代码!!!

5 总结

线程的学习就告一段落,接下来我将会完成一个高并发内存池项目,来巩固C++的知识,并为简历增添一笔重要颜色!完成项目之后开启全新篇章 — 计算机网络,欢迎大家支持!!!

接下来还会持续更新算法相关内容,欢迎大家支持!!!



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。