Linux--线程池(包含日志的解释)

诡异森林。 2024-07-30 10:37:01 阅读 62

线程系列:

Linux–线程的认识(一)

Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)

线程的互斥:临界资源只能在同一时间被一个线程使用

生产消费模型

信号量

线程池

线程池(Thread Pool)是一种基于池化技术设计用于管理复用线程的机制

在多线程编程中,频繁地创建和销毁线程会消耗大量的系统资源,并且由于线程的创建和销毁需要时间,这也会降低程序的执行效率。线程池通过预先创建一定数量的线程并放入池中,需要时从池中取出线程执行任务,执行完毕后线程并不销毁而是重新放回池中等待下一次使用,从而避免了线程的频繁创建和销毁所带来的开销。

主要优点

降低资源消耗:通过复用已存在的线程,减少线程创建和销毁的开销。提高响应速度:当任务到达时,可以立即分配线程进行处理,减少了等待时间。提高线程的可管理性:线程池可以统一管理线程,包括线程的创建、调度、执行和销毁等。

关键参数

核心线程数:线程池维护线程的最少数量,即使这些线程处于空闲状态,线程池也不会回收它们(一般为主线程)。最大线程数:线程池中允许的最大线程数。阻塞队列:用于存放待执行的任务的队列。当线程池中的所有线程都忙时,新任务会被添加到这个队列中等待处理。非核心线程空闲存活时间:当线程池中的线程数量超过核心线程数时,如果线程空闲时间超过这个时间,多余的线程将被终止。线程工厂:用于创建新线程的工厂。拒绝策略:当线程池和队列都满了时,对于新来的任务将采取的拒绝策略。

实现原理

线程池的实现原理通常包括以下几个关键部分:

线程池管理器:负责创建并管理线程池,包括初始化线程池、创建工作线程、销毁线程池等。工作线程:线程池中的线程,负责执行具体的任务。工作线程通常会不断从任务队列中取出任务并执行,直- 到线程池被销毁或所有任务都执行完毕。任务接口:每个任务必须实现的接口,用于定义任务的执行逻辑。在Java中,这通常是通过实现Runnable或Callable接口来实现的。任务队列:用于存放待处理的任务。当线程池中的工作线程数量达到最大值时,新到达的任务会被放入任务队列中等待处理。任务队列的实现通常依赖于Java的BlockingQueue接口。

实现流程

当有新任务提交给线程池时,线程池会首先判断当前正在运行的线程数量是否小于核心线程数。如果是,则直接创建新的线程来执行任务;否则,将任务加入任务队列等待处理。如果任务队列已满,且当前正在运行的线程数量小于最大线程数(maximumPoolSize),则创建新的线程来处理任务;如果线程数量已经达到最大线程数,则根据配置的拒绝策略来处理新任务(如抛出异常、直接丢弃等)。当一个线程完成任务后,它会从任务队列中取下一个任务来执行;如果没有任务可供执行,并且线程池中的线程数量超过了核心线程数,且这些线程空闲时间超过了设定的存活时间,则这些线程会被销毁,直到线程池中的线程数量减少到核心线程数为止。

实例

Thread.hpp

<code>#ifndef __THREAD_HPP__

#define __THREAD_HPP__

#include<iostream>

#include<string>

#include<pthread.h>

#include<functional>

#include<unistd.h>

using namespace std;

namespace ThreadMdule

{

using func_t = std::function<void(string)>;

class Thread

{

public:

void Excute()

{

_func(_threadname);

}

Thread(func_t func, const std::string &name="none-name")code>

: _func(func), _threadname(name), _stop(true)

{ }

static void* threadroutine(void* args)

{

Thread* self=static_cast<Thread*>(args);

self->Excute();

return nullptr;

}

bool start()

{

int n=pthread_create(&_tid,nullptr,threadroutine,this);

if(!n)

{

_stop = false;

return true;

}

else

{

return false;

}

}

void Detach()

{

if(!_stop)

{

pthread_detach(_tid);

}

}

void Join()

{

if(!_stop)

{

pthread_join(_tid,nullptr);

}

}

string name()

{

return _threadname;

}

void Stop()

{

_stop = true;

}

~Thread() { }

private:

pthread_t _tid;

std::string _threadname;

func_t _func;

bool _stop;

};

}

#endif

ThreadPool.hpp

#pragma once

#include<iostream>

#include<vector>

#include<queue>

#include<pthread.h>

#include"Thread.hpp"

#include"Log.hpp"

#include"LockGuard.hpp"

using namespace ThreadMdule;

using namespace std;

const static int gdefaultthreadnum=3;//默认线程池的线程数

template <class T>

class ThreadPool

{

public:

ThreadPool(int threadnum=gdefaultthreadnum) :_threadnum(threadnum),_waitnum(0),_isrunning(false)

{

pthread_mutex_init(&_mutex,nullptr);

pthread_cond_init(&_cond,nullptr);

LOG(INFO,"ThreadPool COnstruct.");

}

//各个线程独立的任务函数

void HandlerTask(string name)

{

LOG(INFO,"%s is running...",name.c_str());

while(true)

{

LockQueue();//开启保护

//等到有任务时才退出循环执行下列语句

while(_task_queue.empty()&&_isrunning)

{

_waitnum++;

ThreadSleep();

_waitnum--;

}

//当任务队列空并且线程池停止时线程退出

if(_task_queue.empty()&&!_isrunning)

{

UnlockQueue();

cout<<name<<" quit "<<endl;

sleep(1);

break;

}

//1.任务队列不为空&&线程池开启

//2.任务队列不为空&&线程池关闭,直到任务队列为空

//所以,只要有任务,就要处理任务

T t=_task_queue.front();//取出对应任务

_task_queue.pop();

UnlockQueue();

LOG(DEBUG,"%s get a task",name.c_str());

//处理任务

t();

LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString().c_str());

}

}

//线程池中线程的构建

void InitThreadPool()

{

for(int i=0;i<_threadnum;i++)

{

string name="thread-"+to_string(i+1);code>

_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);

LOG(INFO,"init thread %s done",name.c_str());

}

_isrunning=true;

}

//线程池的启动

void Start()

{

for(auto& thread:_threads)

{

thread.start();

}

}

//线程池停止

void Stop()

{

LockQueue();

_isrunning=false;

ThreadWakeupAll();

UnlockQueue();

}

void Wait()

{

for(auto& thread:_threads)

{

thread.Join();

LOG(INFO,"%s is quit...",thread.name().c_str());

}

}

//将任务入队列

bool Enqueue(const T& t)

{

bool ret=false;

LockQueue();

if(_isrunning)

{

_task_queue.push(t);

//如果有空闲的线程,那么唤醒线程让其执行任务

if(_waitnum>0)

{

ThreadWakeup();

}

LOG(DEBUG,"enqueue task success");

ret=true;

}

UnlockQueue();

return ret;

}

~ThreadPool()

{

pthread_mutex_destroy(&_mutex);

pthread_cond_destroy(&_cond);

}

private:

void LockQueue()

{

pthread_mutex_lock(&_mutex);

}

void UnlockQueue()

{

pthread_mutex_unlock(&_mutex);

}

void ThreadSleep()

{

pthread_cond_wait(&_cond, &_mutex);

}

void ThreadWakeup()

{

pthread_cond_signal(&_cond);

}

void ThreadWakeupAll()

{

pthread_cond_broadcast(&_cond);

}

int _threadnum;//线程数

vector<Thread> _threads;//存储线程的vector

queue<T> _task_queue;//输入的任务队列

pthread_mutex_t _mutex;//互斥锁

pthread_cond_t _cond;//条件变量

int _waitnum;//空闲的线程数

bool _isrunning;//表示线程池是否启动

};

Task.hpp

#include<iostream>

#include<string>

#include<functional>

using namespace std;

class Task

{

public:

Task(){ }

Task(int a,int b): _a(a),_b(b),_result(0)

{ }

void Excute()

{

_result=_a+_b;

}

string ResultToString()

{

return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result);

}

string DebugToString()

{

return to_string(_a) + "+" + to_string(_b) + "= ?";

}

void operator()()

{

Excute();

}

private:

int _a;

int _b;

int _result;

};

main.cc

int main()

{

srand(time(nullptr)^getpid()^pthread_self());

//EnableScreen();

EnableFile();

unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5));

tp->InitThreadPool();

tp->Start();

int tasknum=10;

while(tasknum)

{

sleep(1);

int a=rand()%10+1;

usleep(1024);

int b=rand()%20+1;

Task t(a,b);

LOG(INFO,"main thread push task: %s",t.DebugToString().c_str());

tp->Enqueue(t);

tasknum--;

}

tp->Stop();

tp->Wait();

}

Loh.hpp

#pragma once

#include<iostream>

#include<fstream>

#include<ctime>

#include<cstdarg>

#include<string>

#include<sys/types.h>

#include<unistd.h>

#include<cstdio>

#include"LockGuard.hpp"

using namespace std;

bool gIsSave=false;//默认输出到屏幕

const string logname="log.txt";code>

//1.日志是有等级的

enum Level

{

DEBUG=0,

INFO,

WARNING,

ERROR,

FATAL

};

void SaveFile(const string& filename,const string& messages)

{

ofstream out(filename,ios::app);

if(!out.is_open())

{

return;

}

out<<messages;

out.close();

}

//等级转化为字符串

string LevelToString(int level)

{

switch (level)

{

case DEBUG:

return "Debug";

case INFO:

return "Info";

case WARNING:

return "Warning";

case ERROR:

return "Error";

case FATAL:

return "Fatal";

default:

return "Unkonwn";

}

}

//获取当前时间

string GetTimeString()

{

time_t curr_time=time(nullptr);//时间戳

struct tm* format_time=localtime(&curr_time);//转化为时间结构

if(format_time==nullptr)

return "None";

char time_buffer[1024];

snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",

format_time->tm_year + 1900,

format_time->tm_mon + 1,

format_time->tm_mday,

format_time->tm_hour,

format_time->tm_min,

format_time->tm_sec);

return time_buffer;

}

pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;

//获取日志信息

void LogMessage(string filename,int line,bool issave,int level,char* format,...)

{

string levelstr=LevelToString(level);

string timestr=GetTimeString();

pid_t selfid=getpid();

char buffer[1024];

va_list arg;

va_start(arg,format);

vsnprintf(buffer,sizeof(buffer),format,arg);

va_end(arg);

string message= "[" + timestr + "]" + "[" + levelstr + "]" +

"[" + std::to_string(selfid) + "]" +

"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";

LockGuard lockguard(&lock);

if(!issave)

{

cout<<message;

}

else

{

SaveFile(logname,message);

}

}

#define LOG(level,format,...) \

do \

{ \

LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__); \

} while (0)

#define EnableFile() \

do \

{ \

gIsSave=true; \

} while (0)

#define EnableScreen() \

do \

{ \

gIsSave=false; \

} while (0)

线程池解释(代码)

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

日志解释(代码)

日志是系统或程序记录所有发生事件的文件:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。