[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)
努力的青菜 2024-09-08 11:07:06 阅读 92
文章目录
一.网络层与传输层协议sockaddr结构体继承体系(Linux体系)贯穿计算机系统的网络通信架构图示:
二.实现并部署多线程并发Tcp服务器框架线程池模块序列化反序列化工具模块通信信道建立模块服务器主体模块任务回调模块(根据具体应用场景可重构)Tips:DebugC++代码过程中遇到的问题记录
一.网络层与传输层协议
网络层与传输层内置于操作系统的内核中,网络层一般使用<code>ip协议,传输层常用协议为Tcp
协议和Udp
协议,Tcp
协议和Udp
协议拥有各自的特点和应用场景:
sockaddr结构体继承体系(Linux体系)
<code>sockaddr_in结构体用于存储网络通信主机进程的ip
和端口号等信息
贯穿计算机系统的网络通信架构图示:
二.实现并部署多线程并发Tcp服务器框架
小项目的完整文件的gittee链接
<code>Tcp服务器架构:
线程池模块
<code>#pragma once
#include <iostream>
#include <pthread.h>
#include "log.hpp"
#include <semaphore.h>
#include <vector>
#include <cstdio>
template<class T>
class RingQueue{
private:
pthread_mutex_t Clock_;
pthread_mutex_t Plock_;
sem_t Psem_;
sem_t Csem_;
std::vector<T> Queue_;
int Pptr_;
int Cptr_;
int capacity_;
public:
RingQueue(int capacity = 10) : Queue_(capacity),Pptr_(0),Cptr_(0),capacity_(capacity){
sem_init(&Psem_,0,capacity);
sem_init(&Csem_,0,0);
pthread_mutex_init(&Clock_,nullptr);
pthread_mutex_init(&Plock_,nullptr);
}
~RingQueue(){
sem_destroy(&Psem_);
sem_destroy(&Csem_);
pthread_mutex_destroy(&Clock_);
pthread_mutex_destroy(&Plock_);
}
T Pop(){
sem_wait(&Csem_);
pthread_mutex_lock(&Clock_);
T tem = Queue_[Cptr_];
Cptr_++;
Cptr_ %= capacity_;
pthread_mutex_unlock(&Clock_);
sem_post(&Psem_);
return tem;
}
void Push(T t){
sem_wait(&Psem_);
pthread_mutex_lock(&Plock_);
Queue_[Pptr_] = t;
Pptr_++;
Pptr_%= capacity_;
pthread_mutex_unlock(&Plock_);
sem_post(&Csem_);
}
};
#pragma once
#include "sem_cp.cpp"
#include <pthread.h>
#include <iostream>
#include <string>
#include <mutex>
#include "CalTask.cpp"
template<class Task>
class Thread_Pool{
struct Thread_Data{
int Thread_num;
pthread_t tid;
};
private:
RingQueue<Task> Queue_; //线程安全的环形队列
std::vector<Thread_Data> thread_arr; //管理线程的容器
static std::mutex lock_; //单例锁
static Thread_Pool<Task> * ptr_; //单例指针
private:
Thread_Pool(int capacity_Of_queue = 20) : Queue_(capacity_Of_queue){ }
Thread_Pool(const Thread_Pool<Task>& Tp) = delete;
Thread_Pool<Task>& operator=(const Thread_Pool<Task> & Tp) = delete;
public:
~Thread_Pool(){ }
//获取线程池单例-->注意C++的类模板静态成员函数需要在类体外进行定义
static Thread_Pool<Task> * Getinstance();
//创建多线程
void Create_thread(int thread_num = 10){
Thread_Data T_data;
for(int i = 0 ; i < thread_num ; ++i){
//注意线程池对象的this指针传递给线程
pthread_create(&T_data.tid,nullptr,Routine,this);
T_data.Thread_num = i + 1;
thread_arr.push_back(T_data);
}
}
//线程等待
void Thread_join(){
for(int i = 0 ;i < thread_arr.size() ; ++i){
pthread_join(thread_arr[i].tid,nullptr);
}
}
//向线程池中加入任务
void Push(Task T){
Queue_.Push(T);
}
void Push(Task && T){
Queue_.Push(std::forward<Task>(T));
}
private:
//线程函数-->该函数没有在类外调用,所以无须在类体外定义
static void* Routine(void * args){
Thread_Pool<Task> * Pool = static_cast<Thread_Pool<Task> *>(args);
while(true){
std::cout << "Thread prepare to work\n" << std::endl;
Task Thread_Task = Pool->Queue_.Pop();
//要求Task类重载()-->用于执行具体任务
Thread_Task();
}
return nullptr;
}
};
//初始化静态指针
template<class Task>
Thread_Pool<Task> * Thread_Pool<Task>::ptr_ = nullptr;
template<class Task>
std::mutex Thread_Pool<Task>::lock_;
//注意C++的类模板静态成员函数需要在类体外进行定义
template<class Task>
Thread_Pool<Task> * Thread_Pool<Task>::Getinstance(){
if(ptr_ == nullptr){
lock_.lock();
if(ptr_ == nullptr){
ptr_ = new Thread_Pool<Task>;
}
lock_.unlock();
}
return ptr_;
}
序列化反序列化工具模块
序列反序列化是保证通信过程中数据完整性的关键步骤,保证数据语义完整,结构完整
<code>#pragma once
#include <iostream>
#include <string>
// 自定义序列化反序列化协议
const std::string blank_space_sep = " ";
const std::string protocol_sep = "\n";
//封装报文
std::string Encode(std::string &content){
//报文正文字节数
std::string package = std::to_string(content.size());
package += protocol_sep;
package += content; //用分隔符封装正文
package += protocol_sep;
return package;
}
//解析报文package-->"正文长度"\n"正文"\n
bool Decode(std::string &package, std::string& content){
size_t pos = package.find(protocol_sep);
if(pos == std::string::npos) return false;
//解析报文正文长度
size_t Len = std::atoi(package.substr(0,pos).c_str());
//确定报文是否完整
size_t total_Len = pos + Len + 2;
if(package.size() != total_Len) return false;
//获取正文内容
content = package.substr(pos+1,Len);
package.erase(0,total_Len);
return true;
}
//用户层协议请求结构体
class Request{
public:
int x;
int y;
char op;
public:
Request(int data1 , int data2 , char op)
: x(data1),y(data2),op(op){ }
Request(){ }
public:
//请求结构体 序列化 成报文正文字符串 "x op y"
bool Serialize(std::string& out){
std::string content = std::to_string(x);
content += blank_space_sep;
content += op;
content += blank_space_sep;
content += std::to_string(y);
out = content;
return true;
// 等价的jason代码
// Json::Value root;
// root["x"] = x;
// root["y"] = y;
// root["op"] = op;
// // Json::FastWriter w;
// Json::StyledWriter w;
// out = w.write(root);
// return true;
}
//报文正文字符串 反序列化 成请求结构体
// "x op y"
bool Deserialize(const std::string &in) {
size_t left = in.find(blank_space_sep);
if(left == std::string::npos)return false;
x = std::stoi(in.substr(0,left).c_str());
std::size_t right = in.rfind(blank_space_sep);
if (right == std::string::npos)return false;
y = std::atoi(in.substr(right + 1).c_str());
if(left + 2 != right) return false;
op = in[left+1];
return true;
// 等价的jason代码
// Json::Value root;
// Json::Reader r;
// r.parse(in, root);
// x = root["x"].asInt();
// y = root["y"].asInt();
// op = root["op"].asInt();
// return true;
}
void DebugPrint()
{
std::cout << "新请求构建完成: " << x << op << y << "=?" << std::endl;
}
};
//用户层协议请求回应结构体
class Response{
public:
int result;
int code;
public:
Response(int res , int c)
: result(res),code(c){ }
Response(){ }
public:
//请求回应结构体 序列化 成报文正文字符串 "result code"
bool Serialize(std::string& out){
std::string s = std::to_string(result);
s += blank_space_sep;
s += std::to_string(code);
out = s;
return true;
// 等价的jason代码
// Json::Value root;
// root["result"] = result;
// root["code"] = code;
// // Json::FastWriter w;
// Json::StyledWriter w;
// out = w.write(root);
// return true;
}
//"result code"
//报文正文字符串 反序列化 成请求回应结构体
bool Deserialize(const std::string &in)
{
std::size_t pos = in.find(blank_space_sep);
if (pos == std::string::npos)return false;
if(pos == 0 || pos == in.size() - 1) return false;
result = std::stoi(in.substr(0, pos).c_str());
code = std::stoi(in.substr(pos+1).c_str());
return true;
// 等价的jason代码
// Json::Value root;
// Json::Reader r;
// r.parse(in, root);
// result = root["result"].asInt();
// code = root["code"].asInt();
// return true;
}
void DebugPrint()
{
std::cout << "结果响应完成, result: " << result << ", code: "<< code << std::endl;
}
};
通信信道建立模块
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include "log.hpp"
#include <memory.h>
#include <arpa/inet.h>
#include <netinet/in.h>
namespace MySocket{
//Tcp通讯构建器
class TcpServer{
enum{
UsageError = 1,
SocketError,
BindError,
ListenError,
};
private:
int socketfd_ = -1;
std :: string ip_;
uint16_t port_;
int backlog_ = 10;
public:
TcpServer(const std::string& ip = "172.19.29.44", uint16_t port = 8081) : ip_(ip) , port_(port){ }
~TcpServer(){ if(socketfd_ > 0) close(socketfd_);}
public:
//确定通信协议,建立文件描述符
void BuildSocket(){
socketfd_ = socket(AF_INET,SOCK_STREAM,0);
if(socketfd_ < 0){
lg(Fatal,"socket error,%s\n",strerror(errno));
exit(SocketError);
}
}
//文件描述符与服务器ip : 端口号绑定
void SocketBind(){
struct sockaddr_in addr;
memset(&addr,0,sizeof(addr));
addr.sin_port = htons(port_);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip_.c_str());
if(bind(socketfd_,(const sockaddr*)&addr,sizeof(addr)) < 0){
lg(Fatal,"socket bind error,%s\n",strerror(errno));
exit(BindError);
}
lg(Info,"socket bind success\n");
}
//启动服务监听,等待客户端的连接
void Socklisten(){
if(socketfd_ <= 0){
lg(Fatal,"socket error,%s\n",strerror(errno));
exit(SocketError);
}
if(listen(socketfd_,backlog_) < 0){
lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
exit(ListenError);
}
}
//服务器接收客户端的连接-->并创建用于通信的文件描述符-->一个客户端连接对应一个文件描述符
int SockAccept(std::string& cilent_ip, uint16_t& cilent_port){
struct sockaddr_in client_addr; // 输出型参数,用于获取用户的ip : 端口号
memset(&client_addr,0,sizeof(client_addr));
socklen_t Len = sizeof(client_addr);
int newfd = accept(socketfd_,(struct sockaddr*)&client_addr,&Len);
if(newfd < 0){
lg(Warning, "accept error, %s: %d", strerror(errno), errno);
return -1;
}
//提取客户端信息-->输出参数
char ipstr[64];
cilent_ip = inet_ntop(AF_INET,&client_addr.sin_addr,ipstr,sizeof(ipstr));
cilent_ip = ipstr;
cilent_port = ntohs(client_addr.sin_port);
return newfd;
}
public:
int Get_Server_fd(){
return socketfd_;
}
void Close_fd(){
if(socketfd_ > 0){
close(socketfd_);
socketfd_ = -1;
}
}
};
};
服务器主体模块
<code>#pragma once
#include "ThreadPool.cpp"
#include "TcpServer.cpp"
#include "CalTask.cpp"
#include "log.hpp"
#include <signal.h>
//构建计算器服务器
class CalServer{
const int size = 2048;
private:
Thread_Pool<CalTask> * Pool_ptr_;
MySocket::TcpServer Socket_;
int Socket_fd_ = -1;
public:
CalServer(const std::string& de_ip = "172.19.29.44",uint16_t de_port = 8081)
: Socket_(de_ip,de_port)
{
Pool_ptr_ = Thread_Pool<CalTask>::Getinstance();
if(Pool_ptr_ == nullptr){
lg(Fatal,"Pool_ptr_ is nullptr\n");
return;
}
Pool_ptr_->Create_thread();
}
~CalServer(){ }
public:
//建立Tcp连接条件
bool Init(){
Socket_.BuildSocket();
Socket_fd_ = Socket_.Get_Server_fd();
if(Socket_fd_ < 0){
lg(Fatal,"BuildSocket failed\n");
return true;
}
Socket_.SocketBind();
Socket_.Socklisten();
lg(Info, "init server .... done");
return true;
}
//启动服务器
void Start(){
signal(SIGCHLD, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
char ReadBuffer[size];
while(true){
//接受用户请求
std::string client_ip;
uint16_t client_port;
int client_fd = Socket_.SockAccept(client_ip,client_port);
if(client_fd < 0){
lg(Warning,"SockAccept error\n");
continue;
}
lg(Info, "accept a new link, sockfd: %d, clientip: %s, clientport: %d", client_fd, client_ip.c_str(), client_port);
int n = read(client_fd,ReadBuffer,sizeof(ReadBuffer));
ReadBuffer[n] = 0;
std::string TaskStr(ReadBuffer);
printf("receives mess from client : %s",ReadBuffer);
if(n < 0){
lg(Warning,"read error\n");
break;
}
CalTask task(client_fd,client_ip,client_port,TaskStr);
Pool_ptr_->Push(task);
}
}
};
任务回调模块(根据具体应用场景可重构)
#pragma once
#include <string>
#include "ThreadPool.cpp"
#include "Protocol.cpp"
enum{
Div_Zero = 1,
Mod_Zero,
Other_Oper
};
class CalTask{
private:
int socketfd_; //网络通信文件描述符
std :: string ip_; //客户端ip
uint16_t port_; //客户端端口号
std::string package_; //客户请求字符串
public:
CalTask(int socketfd,const std::string& ip , uint16_t & port,std::string & str)
: socketfd_(socketfd),ip_(ip),port_(port),package_(str){ }
CalTask(){ }//类一定要有默认构造函数
~CalTask(){ }
public:
//执行计算任务并将结果发送给用户
void operator() (){
std::cout << "Task Running ... \n" << std::endl;
std::string content;
//将用户发送的报文进行解包获取正文
bool r = Decode(package_, content);
if (!r)return;
//将报文正文进行反序列化
Request req;
r = req.Deserialize(content);
if (!r)return ;
req.DebugPrint();
content = "";
//构建计算结果
Response resp = CalculatorHelper(req);
resp.DebugPrint();
//计算结果序列化成字符串
resp.Serialize(content);
//字符串正文封装成报文发送给用户
std::string ResStr = Encode(content);
write(socketfd_,ResStr.c_str(),ResStr.size());
if(socketfd_ > 0)close(socketfd_);
}
private:
Response CalculatorHelper(const Request &req){
//构建请求回应结构体
Response resp(0, 0);
switch (req.op){
case '+':
resp.result = req.x + req.y;
break;
case '-':
resp.result = req.x - req.y;
break;
case '*':
resp.result = req.x * req.y;
break;
case '/':{
if (req.y == 0)
resp.code = Div_Zero;
else
resp.result = req.x / req.y;
}
break;
case '%':{
if (req.y == 0)
resp.code = Mod_Zero;
else
resp.result = req.x % req.y;
}
break;
default:
resp.code = Other_Oper;
break;
}
return resp;
}
};
Tips:DebugC++代码过程中遇到的问题记录
使用C++
类模板时,若在类模板中定义了静态成员函数,且该静态成员函数在类外被调用,则该静态成员函数必须定义在类外,不然链接器无法找到函数体.注意类模板静态成员的声明格式需要加关键字temlpate<>
声明类模板静态成员时无需特化模版类型参数跨主机并发通信测试:
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。