Linux--多路转接之select
诡异森林。 2024-10-15 09:07:03 阅读 70
前言
多路转接(也称为多路复用)是一种用于管理多个I/O通道的技术,它能实现同时监听和处理多个I/O事件,而不是为每个I/O通道创建单独的线程或进程。其中,select是实现多路转接的一种常用方法。
select()
select函数是系统提供的一个多路转接接口,用于让我们的程序同时监视多个文件描述符(file descriptor,简称fd)的状态变化,如读就绪、写就绪或异常。其函数原型如下:
<code>#include <sys/select.h>
#include <sys/time.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
参数说明
nfds:是文件描述符集合中最大文件描述符值加1。这个参数实际上被忽略,因为现在的系统不再需要它来确定文件描述符的范围。readfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看它们是否准备好被读取。writefds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看它们是否准备好被写入。exceptfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看是否有异常条件发生(例如,带外数据到达)。timeout:是一个指向 timeval 结构的指针,该结构指定了函数等待的最大时间长度。如果 timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪。
返回值
成功时,select() 返回准备就绪的文件描述符的总数(不包括 exceptfds 中的文件描述符)。如果在调用时没有任何文件描述符准备就绪,并且 timeout 非空且指定的时间已过,则返回 0。如果出现错误,则返回 -1,并设置 errno 以指示错误原因。
timeval是一个用于表示时间的结构体:
#include <sys/time.h>
struct timeval { -- -->
time_t tv_sec; // 秒
suseconds_t tv_usec; // 微秒
};
tv_sec:秒数,从 Unix 纪元(1970 年 1 月 1 日 00:00:00 UTC)开始计算的秒数(时间戳)。
tv_usec:微秒数,范围从 0 到 999999。
fd_set
fd_set
实际上是一个位图(Bitmask)或位向量(Bitvector),它的每一位代表一个文件描述符。通过设置或清除位的方式,可以将文件描述符添加到或从 fd_set
中移除。
typedef struct {
__fd_mask fds_bits[__FD_SETSIZE/__NFDBITS];
} fd_set;
其中,__fd_mask 通常是一个整数类型(如 long),__FD_SETSIZE 定义了 fd_set 中可以包含的最大文件描述符数量(通常是 1024),__NFDBITS 是 __fd_mask 中包含的位数。
与此同时,系统还提供一些接口来操作
fd_set
:
FD_ZERO(fd_set *set)
:将 fd_set 中的所有位清零,即将所有文件描述符从集合中移除。FD_SET(int fd, fd_set *set)
:将指定的文件描述符 fd 添加到 fd_set 中。FD_CLR(int fd, fd_set *set)
:将指定的文件描述符 fd 从 fd_set 中移除。FD_ISSET(int fd, fd_set *set)
:检查指定的文件描述符 fd 是否在 fd_set 中。在调用 select() 后,使用此宏来检测哪些文件描述符已准备就绪。
select的执行过程
对于执行过程,大致分为四个步骤:
1.初始化:
创建一个或多个
fd_set
结构,用于存储需要监视的文件描述符。使用FD_ZERO
宏清空这些fd_set结构。使用FD_SET
宏将需要监视的文件描述符添加到相应的fd_set结构中。对于添加到位图中的事件, 都会在位图表示成1
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(_fd_array[i], &rfds);
2.调用select函数:
将最大的文件描述符值加1作为nfds参数。将之前创建的fd_set结构作为
readfds
、writefds
和exceptfds
参数(如果需要监视相应的事件)。设置timeout
参数以控制select的等待时间。调用select函数。
struct timeval timeout = { 0, 0};//设置等待时间
int n=select(max_fd+1,&rfds,nullptr,nullptr,&timeout);
3.处理返回结果
此时经过select函数的执行后,已经影响了
fd_set
中的之前添加进来的文件描述符,只要没有就绪的话,那么都会被清空为0如果select返回大于0的值,表示有文件描述符的事件已经就绪。利用for循环对每个事件进行检查是否就绪,使用FD_ISSET
宏检查哪些文件描述符的事件已经就绪。根据就绪的文件描述符执行相应的操作,如读取数据、写入数据或处理异常事件。
if(n>0)
{
for (int i = 0; i <1024; i++)
{
if (FD_ISSET(_fd_array[i], &rfds))
{
//处理对应的就绪事件
}
}
}
4.重新设置并继续监视:
由于select函数会修改传入的
fd_set
结构,因此在每次调用select之前都需要重新设置这些结构。根据需要更新timeout
参数。重复执行上述步骤以继续监视文件描述符的状态变化。
while(1)
{
//包含以上内容
}
注意事项:
每次调用select之前都需要重新设置fd_set结构和timeout参数。select函数只负责等待文件描述符的状态变化,并不负责数据的拷贝。数据的拷贝需要使用如read、write等函数来完成。select函数有一个限制,即它能够监视的文件描述符数量是有限的,通常取决于fd_set结构的大小(在32位系统上通常为1024个)。
利用select()建立一个Server服务器
InetAddr.hpp
包含网络地址的头文件:
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
class InetAddr
{
private:
void GetAddress(std::string *ip, uint16_t *port)
{
*port = ntohs(_addr.sin_port);
*ip = inet_ntoa(_addr.sin_addr);
}
public:
InetAddr(const struct sockaddr_in &addr) : _addr(addr)
{
GetAddress(&_ip, &_port);
}
InetAddr(const std::string &ip, uint16_t port) : _ip(ip), _port(port)
{
_addr.sin_family = AF_INET;
_addr.sin_port = htons(_port);
_addr.sin_addr.s_addr = inet_addr(_ip.c_str());
}
InetAddr()
{ }
std::string Ip()
{
return _ip;
}
bool operator == (const InetAddr &addr)
{
// if(_ip == addr._ip)
if(_ip == addr._ip && _port == addr._port) // 方便测试
{
return true;
}
return false;
}
// bool operator = (const struct sockaddr_in &addr)
// {
// _addr = addr;
// }
struct sockaddr_in Addr()
{
return _addr;
}
uint16_t Port()
{
return _port;
}
~InetAddr()
{
}
private:
struct sockaddr_in _addr;
std::string _ip;
uint16_t _port;
};
Log.hpp
打印日志的头文件:
#pragma once
#include<iostream>
#include<fstream>
#include<ctime>
#include<cstdarg>
#include<string>
#include<sys/types.h>
#include<unistd.h>
#include<cstdio>
#include"LockGuard.hpp"
using namespace std;
bool gIsSave=false;//默认输出到屏幕
const string logname="log.txt";code>
//1.日志是有等级的
enum Level
{ -- -->
DEBUG=0,
INFO,
WARNING,
ERROR,
FATAL
};
void SaveFile(const string& filename,const string& messages)
{
ofstream out(filename,ios::app);
if(!out.is_open())
{
return;
}
out<<messages;
out.close();
}
//等级转化为字符串
string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unkonwn";
}
}
//获取当前时间
string GetTimeString()
{
time_t curr_time=time(nullptr);//时间戳
struct tm* format_time=localtime(&curr_time);//转化为时间结构
if(format_time==nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//获取日志信息
void LogMessage(string filename,int line,bool issave,int level,char* format,...)
{
string levelstr=LevelToString(level);
string timestr=GetTimeString();
pid_t selfid=getpid();
char buffer[1024];
va_list arg;
va_start(arg,format);
vsnprintf(buffer,sizeof(buffer),format,arg);
va_end(arg);
string message= "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
LockGuard lockguard(&lock);
if(!issave)
{
cout<<message;
}
else
{
SaveFile(logname,message);
}
}
#define LOG(level,format,...) \
do \
{ \
LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__); \
} while (0)
#define EnableFile() \
do \
{ \
gIsSave=true; \
} while (0)
#define EnableScreen() \
do \
{ \
gIsSave=false; \
} while (0)
LockGuard.hpp
互斥锁的头文件:
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include <iostream>
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
#endif
Socket.hpp
包含一系列Socket套接字的接口函数,还有TcpSocket专门的接口:
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
namespace socket_ns
{
class Socket;
const static int gbacklog=8;
using socket_sptr=std::shared_ptr<Socket>;//套接字指针
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR
};
//在基类创建一系列虚函数,只要派生类能用到就在这里创建
class Socket
{
public:
virtual void CreateSocketOrDie() =0; //创建套接字
virtual void BindSocketOrDie(InetAddr& addr) =0; //绑定套接字
virtual void ListenSocketOrDie()=0; //监听套接字
virtual int Accepter(InetAddr* addr) =0; //接受客户端
virtual bool Connector(InetAddr &addr) = 0; //连接客户端
virtual void SetSocketAddrReuse() = 0; // 重启指定端口
virtual int SockFd() = 0; //获取Sockfd
virtual int Recv(std::string *out) = 0; //接收对方信息
virtual int Send(const std::string &in) = 0; //发送给对方信息
public:
//创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建
void BuildListenSocket(InetAddr& addr)
{
CreateSocketOrDie();
SetSocketAddrReuse();
BindSocketOrDie(addr);
ListenSocketOrDie();
}
bool BuildClientSocket(InetAddr &addr)
{
CreateSocketOrDie();
return Connector(addr);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket(int sockfd=-1)
:_sockfd(sockfd)
{ }
void CreateSocketOrDie() override //override明确的重写基类函数
{
_sockfd=socket(AF_INET,SOCK_STREAM,0);
if(_sockfd<0)
{
LOG(FATAL, "socket error");
exit(SOCKET_ERROR);
}
LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
}
void BindSocketOrDie(InetAddr& addr) override
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(addr.Port());
local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
if (n < 0)
{
LOG(FATAL, "bind error\n");
exit(BIND_ERROR);
}
LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
}
void ListenSocketOrDie() override
{
int n=listen(_sockfd,gbacklog);
if (n < 0)
{
LOG(FATAL, "listen error\n");
exit(LISTEN_ERROR);
}
LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
}
int Accepter(InetAddr* addr) override
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);
if (sockfd < 0)
{
LOG(WARNING, "accept error\n");
return -1;
}
*addr=peer;
return sockfd;
}
virtual bool Connector(InetAddr& addr)
{
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());
server.sin_port=htons(addr.Port());
int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));
if (n < 0)
{
std::cerr << "connect error" << std::endl;
return false;
}
return true;
}
void SetSocketAddrReuse() override
{
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); //快速重启端口
}
int Recv(std::string *out) override
{
char inbuffer[1024];
ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);
if (n > 0)
{
inbuffer[n] = 0;
*out += inbuffer; // 接收次数可能不只一次,一般是多次的,
}
return n;
}
int Send(const std::string &in) override
{
int n = send(_sockfd,in.c_str(),in.size(),0);
return n;
}
int SockFd() override
{
return _sockfd;
}
~TcpSocket()
{ }
private:
int _sockfd;
};
}
SelectServer.hpp
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "Socket.hpp"
using namespace socket_ns;
//select服务器要正确的编写,需要借助一个第三方数组来完成,保存合法的,所有的fd到数组中,方便后期批量化统一添加
class SelectServer
{
const static int defaultfd = -1; //默认sockfd
const static int N = sizeof(fd_set) * 8; //监视文件描述符的最大值
public:
SelectServer(uint16_t port)
:_port(port),
_listensock(make_unique<TcpSocket>())
{
InetAddr addr("0", _port); //网络地址初始化
_listensock->BuildListenSocket(addr);//创建监听套接字
//初始化辅助数组
for (int i = 0; i < N; i++)
{
_fd_array[i] = defaultfd;
}
_fd_array[0] = _listensock->SockFd();
}
void AcceptClient()
{
// 我们今天只关心了读,而读有:listensock 和 normal sockfd
InetAddr clientaddr;
int sockfd = _listensock->Accepter(&clientaddr); // 这里调用accept会不会阻塞呢??不会。因为事件已经就绪了
if (sockfd < 0)
return;
LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());
//select托管(监视状态):将新fd放入辅助数组中
int pos = 1;
for (; pos < N; pos++)
{
if (_fd_array[pos] == defaultfd)
break;
}//让pos到辅助数组的空缺位置
if (pos == N)//说明监视的文件描述符满了
{
::close(sockfd); // sockfd->Close();
LOG(WARNING, "server is full!\n");
return;
}
else
{
_fd_array[pos] = sockfd;
LOG(DEBUG, "%d add to select array!\n", sockfd);
}
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
void ServiceIO(int pos)
{
char buffer[1024];
ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0); // 这里读取会不会被阻塞?不会
if (n > 0)//处理接收数据
{
buffer[n] = 0;
std::cout << "client say# " << buffer << std::endl;
std::string echo_string = "[server echo]# ";
echo_string += buffer;
::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);//返回给客户端
}
else if (n == 0)//说明对方已断开连接
{
LOG(DEBUG, "%d is closed\n", _fd_array[pos]);
::close(_fd_array[pos]);
_fd_array[pos] = defaultfd;
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
else//出现错误
{
LOG(DEBUG, "%d recv error\n", _fd_array[pos]);
::close(_fd_array[pos]);
_fd_array[pos] = defaultfd;
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
}
//处理准备就绪的事件
void HandlerEvent(fd_set &rfds)
{
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
if (FD_ISSET(_fd_array[i], &rfds))
{
if (_fd_array[i] == _listensock->SockFd())//新的连接
{
AcceptClient();
}
else
{
// 普通的sockfd读事件就绪
ServiceIO(i);
}
}
}
}
void Loop()
{
while(true)
{
//监听套接字在等待对方发送连接
//新的连接 == 读事件就绪
//要将listensock添加到select中!
fd_set rfds; //一个记录文件描述符状态的集合
FD_ZERO(&rfds);//将所有文件描述符移除集合
int max_fd = defaultfd;//最大的文件描述符值
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
if (max_fd < _fd_array[i])
{
max_fd = _fd_array[i]; // 更新出最大的fd的值
}
}
struct timeval timeout = { 0, 0};//设置等待时间
int n=select(max_fd+1,&rfds,nullptr,nullptr,nullptr);
//timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪
switch (n)
{
case 0://指定时间内没有任何文件描述符准备就绪
LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1://出现错误
LOG(ERROR, "select error...\n");
break;
default://成功状态
LOG(DEBUG, "Event Happen. n : %d\n", n); // 底层有一个事件就绪,select为什么会一直通知我?因为:我们没有处理!
HandlerEvent(rfds);
break;
}
}
}
//打印出已存在的fd
std::string RfdsToString()
{
std::string fdstr;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
fdstr += std::to_string(_fd_array[i]);
fdstr += " ";
}
return fdstr;
}
~SelectServer()
{
}
private:
uint16_t _port; //端口号
std::unique_ptr<Socket> _listensock;//监听socket
int _fd_array[N]; // 辅助数组
};
成员:
辅助数组:由上面select执行过程可以知道,当select执行后,会对fd_set的文件描述符产生影响,所以为了能够在循环中多次调用select函数,就需要一个数组来进行对文件描述符的保存;
初始化:
对于辅助数组来说,只要没有新的fd进来,那么文件描述符将保持为负;而对于Select来说,监听fd就是第一个事件;所以要在初始化就添加进来;
Loop:
这里操作流程就是跟上面的执行过程是一样的,只是增加了一些细节:
N是1024,指的是Select能存储的最大fd的数目;我们需要让select监听我们想要监听的事件,就需要通过循环来一个一个添加到rfds中;
select()返回值:
根据<code>select()的返回值执行不同的代码;
这里所说的底层事件就绪,如果没有处理已就绪的事件,那么select就会一直监测到事件就绪,一直执行
default
语句的内容;
HandlerEvent():
在for循环里面通过FD_ISSET函数找出每个已经就绪的事件,然后再判断是不是监听事件的;
Main.cc
<code>#include "SelectSever.hpp"
#include <memory>
// ./selectserver port
int main(int argc, char *argv[])
{ -- -->
if (argc != 2)
{
std::cout << "Usage: " << argv[0] << " port" << std::endl;
return 0;
}
uint16_t port = std::stoi(argv[1]);//获取端口号
EnableScreen();
std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);
svr->Loop();
return 0;
}
通过telnet进行测试:
开启Server服务:
telnet进行访问
Server的监听socket收到就绪的事件,创建一个新的IO服务客户端:
客户端任意发送内容:
客户端服务端都能通过Select的底层就绪互相接收发送:
Select()的优缺点
优点
多路复用:select()函数能够同时监视多个文件描述符,实现I/O多路复用,从而提高了程序的并发处理能力和资源利用率。简单易用:select()函数的接口相对简单,易于理解和使用,特别是对于初学者来说。灵活性:select()函数允许程序根据文件描述符的读、写、异常等事件进行灵活的处理,满足不同的I/O需求。
缺点
文件描述符数量的限制:select()函数能够监视的文件描述符数量有限,通常在Linux上默认为1024个(尽管可以通过修改宏定义或重新编译内核来提升这一限制,但这样做可能会降低效率)。这对于需要监视大量文件描述符的应用程序来说是一个显著的限制。性能瓶颈:当监视的文件描述符数量较多时,select()函数的性能可能会成为瓶颈。因为每次调用select()时,内核都需要扫描所有被监视的文件描述符,这会导致不必要的开销。内核拷贝开销:在select()调用过程中,由于每次都要事先准本<code>fd_set结构内容,内核与用户空间之间需要进行内存拷贝操作,以传递文件描述符集合和结果。这会增加额外的开销,并可能影响性能。
因此,在选择是否使用select()函数时,需要根据具体的应用场景和需求进行权衡。对于需要监视大量文件描述符或追求高性能的应用程序来说,可能需要考虑使用更高级的I/O多路复用机制,如poll()或epoll()等。
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。