Linux--多路转接之select

诡异森林。 2024-10-15 09:07:03 阅读 70

前言

多路转接(也称为多路复用)是一种用于管理多个I/O通道的技术,它能实现同时监听和处理多个I/O事件,而不是为每个I/O通道创建单独的线程或进程。其中,select是实现多路转接的一种常用方法。

select()

select函数是系统提供的一个多路转接接口,用于让我们的程序同时监视多个文件描述符(file descriptor,简称fd)的状态变化,如读就绪、写就绪或异常。其函数原型如下:

<code>#include <sys/select.h>

#include <sys/time.h>

#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,

fd_set *exceptfds, struct timeval *timeout);

参数说明

nfds:是文件描述符集合中最大文件描述符值加1。这个参数实际上被忽略,因为现在的系统不再需要它来确定文件描述符的范围。readfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看它们是否准备好被读取。writefds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看它们是否准备好被写入。exceptfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看是否有异常条件发生(例如,带外数据到达)。timeout:是一个指向 timeval 结构的指针,该结构指定了函数等待的最大时间长度。如果 timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪。

返回值

成功时,select() 返回准备就绪的文件描述符的总数(不包括 exceptfds 中的文件描述符)。如果在调用时没有任何文件描述符准备就绪,并且 timeout 非空且指定的时间已过,则返回 0。如果出现错误,则返回 -1,并设置 errno 以指示错误原因。

timeval是一个用于表示时间的结构体:

#include <sys/time.h>

struct timeval { -- -->

time_t tv_sec; // 秒

suseconds_t tv_usec; // 微秒

};

tv_sec:秒数,从 Unix 纪元(1970 年 1 月 1 日 00:00:00 UTC)开始计算的秒数(时间戳)。

tv_usec:微秒数,范围从 0 到 999999。

fd_set

fd_set 实际上是一个位图(Bitmask)或位向量(Bitvector),它的每一位代表一个文件描述符。通过设置或清除位的方式,可以将文件描述符添加到或从 fd_set 中移除。

typedef struct {

__fd_mask fds_bits[__FD_SETSIZE/__NFDBITS];

} fd_set;

其中,__fd_mask 通常是一个整数类型(如 long),__FD_SETSIZE 定义了 fd_set 中可以包含的最大文件描述符数量(通常是 1024),__NFDBITS 是 __fd_mask 中包含的位数。

与此同时,系统还提供一些接口来操作fd_set:

FD_ZERO(fd_set *set):将 fd_set 中的所有位清零,即将所有文件描述符从集合中移除。FD_SET(int fd, fd_set *set):将指定的文件描述符 fd 添加到 fd_set 中。FD_CLR(int fd, fd_set *set):将指定的文件描述符 fd 从 fd_set 中移除。FD_ISSET(int fd, fd_set *set):检查指定的文件描述符 fd 是否在 fd_set 中。在调用 select() 后,使用此宏来检测哪些文件描述符已准备就绪。

select的执行过程

对于执行过程,大致分为四个步骤:

1.初始化

创建一个或多个fd_set结构,用于存储需要监视的文件描述符。使用FD_ZERO宏清空这些fd_set结构。使用FD_SET宏将需要监视的文件描述符添加到相应的fd_set结构中。对于添加到位图中的事件, 都会在位图表示成1

fd_set rfds;

FD_ZERO(&rfds);

FD_SET(_fd_array[i], &rfds);

2.调用select函数

将最大的文件描述符值加1作为nfds参数。将之前创建的fd_set结构作为readfdswritefdsexceptfds参数(如果需要监视相应的事件)。设置timeout参数以控制select的等待时间。调用select函数。

struct timeval timeout = { 0, 0};//设置等待时间

int n=select(max_fd+1,&rfds,nullptr,nullptr,&timeout);

3.处理返回结果

此时经过select函数的执行后,已经影响了fd_set 中的之前添加进来的文件描述符,只要没有就绪的话,那么都会被清空为0如果select返回大于0的值,表示有文件描述符的事件已经就绪。利用for循环对每个事件进行检查是否就绪,使用FD_ISSET宏检查哪些文件描述符的事件已经就绪。根据就绪的文件描述符执行相应的操作,如读取数据、写入数据或处理异常事件。

if(n>0)

{

for (int i = 0; i <1024; i++)

{

if (FD_ISSET(_fd_array[i], &rfds))

{

//处理对应的就绪事件

}

}

}

4.重新设置并继续监视

由于select函数会修改传入的fd_set结构,因此在每次调用select之前都需要重新设置这些结构。根据需要更新timeout参数。重复执行上述步骤以继续监视文件描述符的状态变化。

while(1)

{

//包含以上内容

}

注意事项

每次调用select之前都需要重新设置fd_set结构和timeout参数。select函数只负责等待文件描述符的状态变化,并不负责数据的拷贝。数据的拷贝需要使用如read、write等函数来完成。select函数有一个限制,即它能够监视的文件描述符数量是有限的,通常取决于fd_set结构的大小(在32位系统上通常为1024个)。

利用select()建立一个Server服务器

InetAddr.hpp

包含网络地址的头文件:

#pragma once

#include <iostream>

#include <sys/types.h>

#include <sys/socket.h>

#include <arpa/inet.h>

#include <netinet/in.h>

class InetAddr

{

private:

void GetAddress(std::string *ip, uint16_t *port)

{

*port = ntohs(_addr.sin_port);

*ip = inet_ntoa(_addr.sin_addr);

}

public:

InetAddr(const struct sockaddr_in &addr) : _addr(addr)

{

GetAddress(&_ip, &_port);

}

InetAddr(const std::string &ip, uint16_t port) : _ip(ip), _port(port)

{

_addr.sin_family = AF_INET;

_addr.sin_port = htons(_port);

_addr.sin_addr.s_addr = inet_addr(_ip.c_str());

}

InetAddr()

{ }

std::string Ip()

{

return _ip;

}

bool operator == (const InetAddr &addr)

{

// if(_ip == addr._ip)

if(_ip == addr._ip && _port == addr._port) // 方便测试

{

return true;

}

return false;

}

// bool operator = (const struct sockaddr_in &addr)

// {

// _addr = addr;

// }

struct sockaddr_in Addr()

{

return _addr;

}

uint16_t Port()

{

return _port;

}

~InetAddr()

{

}

private:

struct sockaddr_in _addr;

std::string _ip;

uint16_t _port;

};

Log.hpp

打印日志的头文件:

#pragma once

#include<iostream>

#include<fstream>

#include<ctime>

#include<cstdarg>

#include<string>

#include<sys/types.h>

#include<unistd.h>

#include<cstdio>

#include"LockGuard.hpp"

using namespace std;

bool gIsSave=false;//默认输出到屏幕

const string logname="log.txt";code>

//1.日志是有等级的

enum Level

{ -- -->

DEBUG=0,

INFO,

WARNING,

ERROR,

FATAL

};

void SaveFile(const string& filename,const string& messages)

{

ofstream out(filename,ios::app);

if(!out.is_open())

{

return;

}

out<<messages;

out.close();

}

//等级转化为字符串

string LevelToString(int level)

{

switch (level)

{

case DEBUG:

return "Debug";

case INFO:

return "Info";

case WARNING:

return "Warning";

case ERROR:

return "Error";

case FATAL:

return "Fatal";

default:

return "Unkonwn";

}

}

//获取当前时间

string GetTimeString()

{

time_t curr_time=time(nullptr);//时间戳

struct tm* format_time=localtime(&curr_time);//转化为时间结构

if(format_time==nullptr)

return "None";

char time_buffer[1024];

snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",

format_time->tm_year + 1900,

format_time->tm_mon + 1,

format_time->tm_mday,

format_time->tm_hour,

format_time->tm_min,

format_time->tm_sec);

return time_buffer;

}

pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;

//获取日志信息

void LogMessage(string filename,int line,bool issave,int level,char* format,...)

{

string levelstr=LevelToString(level);

string timestr=GetTimeString();

pid_t selfid=getpid();

char buffer[1024];

va_list arg;

va_start(arg,format);

vsnprintf(buffer,sizeof(buffer),format,arg);

va_end(arg);

string message= "[" + timestr + "]" + "[" + levelstr + "]" +

"[" + std::to_string(selfid) + "]" +

"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";

LockGuard lockguard(&lock);

if(!issave)

{

cout<<message;

}

else

{

SaveFile(logname,message);

}

}

#define LOG(level,format,...) \

do \

{ \

LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__); \

} while (0)

#define EnableFile() \

do \

{ \

gIsSave=true; \

} while (0)

#define EnableScreen() \

do \

{ \

gIsSave=false; \

} while (0)

LockGuard.hpp

互斥锁的头文件:

#ifndef __LOCK_GUARD_HPP__

#define __LOCK_GUARD_HPP__

#include <iostream>

#include <pthread.h>

class LockGuard

{

public:

LockGuard(pthread_mutex_t* mutex):_mutex(mutex)

{

pthread_mutex_lock(_mutex);

}

~LockGuard()

{

pthread_mutex_unlock(_mutex);

}

private:

pthread_mutex_t *_mutex;

};

#endif

Socket.hpp

包含一系列Socket套接字的接口函数,还有TcpSocket专门的接口:

#include <iostream>

#include <string>

#include <functional>

#include <sys/types.h> /* See NOTES */

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <unistd.h>

#include <cstring>

#include <pthread.h>

#include <sys/types.h>

#include <memory>

#include "InetAddr.hpp"

#include "Log.hpp"

namespace socket_ns

{

class Socket;

const static int gbacklog=8;

using socket_sptr=std::shared_ptr<Socket>;//套接字指针

enum

{

SOCKET_ERROR = 1,

BIND_ERROR,

LISTEN_ERROR,

USAGE_ERROR

};

//在基类创建一系列虚函数,只要派生类能用到就在这里创建

class Socket

{

public:

virtual void CreateSocketOrDie() =0; //创建套接字

virtual void BindSocketOrDie(InetAddr& addr) =0; //绑定套接字

virtual void ListenSocketOrDie()=0; //监听套接字

virtual int Accepter(InetAddr* addr) =0; //接受客户端

virtual bool Connector(InetAddr &addr) = 0; //连接客户端

virtual void SetSocketAddrReuse() = 0; // 重启指定端口

virtual int SockFd() = 0; //获取Sockfd

virtual int Recv(std::string *out) = 0; //接收对方信息

virtual int Send(const std::string &in) = 0; //发送给对方信息

public:

//创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建

void BuildListenSocket(InetAddr& addr)

{

CreateSocketOrDie();

SetSocketAddrReuse();

BindSocketOrDie(addr);

ListenSocketOrDie();

}

bool BuildClientSocket(InetAddr &addr)

{

CreateSocketOrDie();

return Connector(addr);

}

};

class TcpSocket : public Socket

{

public:

TcpSocket(int sockfd=-1)

:_sockfd(sockfd)

{ }

void CreateSocketOrDie() override //override明确的重写基类函数

{

_sockfd=socket(AF_INET,SOCK_STREAM,0);

if(_sockfd<0)

{

LOG(FATAL, "socket error");

exit(SOCKET_ERROR);

}

LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);

}

void BindSocketOrDie(InetAddr& addr) override

{

struct sockaddr_in local;

memset(&local, 0, sizeof(local));

local.sin_family = AF_INET;

local.sin_port = htons(addr.Port());

local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());

int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));

if (n < 0)

{

LOG(FATAL, "bind error\n");

exit(BIND_ERROR);

}

LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);

}

void ListenSocketOrDie() override

{

int n=listen(_sockfd,gbacklog);

if (n < 0)

{

LOG(FATAL, "listen error\n");

exit(LISTEN_ERROR);

}

LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);

}

int Accepter(InetAddr* addr) override

{

struct sockaddr_in peer;

socklen_t len=sizeof(peer);

int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);

if (sockfd < 0)

{

LOG(WARNING, "accept error\n");

return -1;

}

*addr=peer;

return sockfd;

}

virtual bool Connector(InetAddr& addr)

{

struct sockaddr_in server;

memset(&server,0,sizeof(server));

server.sin_family=AF_INET;

server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());

server.sin_port=htons(addr.Port());

int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));

if (n < 0)

{

std::cerr << "connect error" << std::endl;

return false;

}

return true;

}

void SetSocketAddrReuse() override

{

int opt = 1;

setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); //快速重启端口

}

int Recv(std::string *out) override

{

char inbuffer[1024];

ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);

if (n > 0)

{

inbuffer[n] = 0;

*out += inbuffer; // 接收次数可能不只一次,一般是多次的,

}

return n;

}

int Send(const std::string &in) override

{

int n = send(_sockfd,in.c_str(),in.size(),0);

return n;

}

int SockFd() override

{

return _sockfd;

}

~TcpSocket()

{ }

private:

int _sockfd;

};

}

SelectServer.hpp

#pragma once

#include <iostream>

#include <string>

#include <memory>

#include "Socket.hpp"

using namespace socket_ns;

//select服务器要正确的编写,需要借助一个第三方数组来完成,保存合法的,所有的fd到数组中,方便后期批量化统一添加

class SelectServer

{

const static int defaultfd = -1; //默认sockfd

const static int N = sizeof(fd_set) * 8; //监视文件描述符的最大值

public:

SelectServer(uint16_t port)

:_port(port),

_listensock(make_unique<TcpSocket>())

{

InetAddr addr("0", _port); //网络地址初始化

_listensock->BuildListenSocket(addr);//创建监听套接字

//初始化辅助数组

for (int i = 0; i < N; i++)

{

_fd_array[i] = defaultfd;

}

_fd_array[0] = _listensock->SockFd();

}

void AcceptClient()

{

// 我们今天只关心了读,而读有:listensock 和 normal sockfd

InetAddr clientaddr;

int sockfd = _listensock->Accepter(&clientaddr); // 这里调用accept会不会阻塞呢??不会。因为事件已经就绪了

if (sockfd < 0)

return;

LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());

//select托管(监视状态):将新fd放入辅助数组中

int pos = 1;

for (; pos < N; pos++)

{

if (_fd_array[pos] == defaultfd)

break;

}//让pos到辅助数组的空缺位置

if (pos == N)//说明监视的文件描述符满了

{

::close(sockfd); // sockfd->Close();

LOG(WARNING, "server is full!\n");

return;

}

else

{

_fd_array[pos] = sockfd;

LOG(DEBUG, "%d add to select array!\n", sockfd);

}

LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());

}

void ServiceIO(int pos)

{

char buffer[1024];

ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0); // 这里读取会不会被阻塞?不会

if (n > 0)//处理接收数据

{

buffer[n] = 0;

std::cout << "client say# " << buffer << std::endl;

std::string echo_string = "[server echo]# ";

echo_string += buffer;

::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);//返回给客户端

}

else if (n == 0)//说明对方已断开连接

{

LOG(DEBUG, "%d is closed\n", _fd_array[pos]);

::close(_fd_array[pos]);

_fd_array[pos] = defaultfd;

LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());

}

else//出现错误

{

LOG(DEBUG, "%d recv error\n", _fd_array[pos]);

::close(_fd_array[pos]);

_fd_array[pos] = defaultfd;

LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());

}

}

//处理准备就绪的事件

void HandlerEvent(fd_set &rfds)

{

for (int i = 0; i < N; i++)

{

if (_fd_array[i] == defaultfd)

continue;

if (FD_ISSET(_fd_array[i], &rfds))

{

if (_fd_array[i] == _listensock->SockFd())//新的连接

{

AcceptClient();

}

else

{

// 普通的sockfd读事件就绪

ServiceIO(i);

}

}

}

}

void Loop()

{

while(true)

{

//监听套接字在等待对方发送连接

//新的连接 == 读事件就绪

//要将listensock添加到select中!

fd_set rfds; //一个记录文件描述符状态的集合

FD_ZERO(&rfds);//将所有文件描述符移除集合

int max_fd = defaultfd;//最大的文件描述符值

for (int i = 0; i < N; i++)

{

if (_fd_array[i] == defaultfd)

continue;

FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中

if (max_fd < _fd_array[i])

{

max_fd = _fd_array[i]; // 更新出最大的fd的值

}

}

struct timeval timeout = { 0, 0};//设置等待时间

int n=select(max_fd+1,&rfds,nullptr,nullptr,nullptr);

//timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪

switch (n)

{

case 0://指定时间内没有任何文件描述符准备就绪

LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);

break;

case -1://出现错误

LOG(ERROR, "select error...\n");

break;

default://成功状态

LOG(DEBUG, "Event Happen. n : %d\n", n); // 底层有一个事件就绪,select为什么会一直通知我?因为:我们没有处理!

HandlerEvent(rfds);

break;

}

}

}

//打印出已存在的fd

std::string RfdsToString()

{

std::string fdstr;

for (int i = 0; i < N; i++)

{

if (_fd_array[i] == defaultfd)

continue;

fdstr += std::to_string(_fd_array[i]);

fdstr += " ";

}

return fdstr;

}

~SelectServer()

{

}

private:

uint16_t _port; //端口号

std::unique_ptr<Socket> _listensock;//监听socket

int _fd_array[N]; // 辅助数组

};

成员:

在这里插入图片描述

辅助数组:由上面select执行过程可以知道,当select执行后,会对fd_set的文件描述符产生影响,所以为了能够在循环中多次调用select函数,就需要一个数组来进行对文件描述符的保存;

初始化:

在这里插入图片描述

对于辅助数组来说,只要没有新的fd进来,那么文件描述符将保持为负;而对于Select来说,监听fd就是第一个事件;所以要在初始化就添加进来;

Loop:

这里操作流程就是跟上面的执行过程是一样的,只是增加了一些细节:

在这里插入图片描述

N是1024,指的是Select能存储的最大fd的数目;我们需要让select监听我们想要监听的事件,就需要通过循环来一个一个添加到rfds中;

select()返回值:

在这里插入图片描述

根据<code>select()的返回值执行不同的代码;

这里所说的底层事件就绪,如果没有处理已就绪的事件,那么select就会一直监测到事件就绪,一直执行default语句的内容;

HandlerEvent():

在这里插入图片描述

在for循环里面通过FD_ISSET函数找出每个已经就绪的事件,然后再判断是不是监听事件的;

Main.cc

<code>#include "SelectSever.hpp"

#include <memory>

// ./selectserver port

int main(int argc, char *argv[])

{ -- -->

if (argc != 2)

{

std::cout << "Usage: " << argv[0] << " port" << std::endl;

return 0;

}

uint16_t port = std::stoi(argv[1]);//获取端口号

EnableScreen();

std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);

svr->Loop();

return 0;

}

通过telnet进行测试:

开启Server服务:

在这里插入图片描述

telnet进行访问

在这里插入图片描述

Server的监听socket收到就绪的事件,创建一个新的IO服务客户端:

在这里插入图片描述

客户端任意发送内容:

在这里插入图片描述

客户端服务端都能通过Select的底层就绪互相接收发送:

在这里插入图片描述

Select()的优缺点

优点

多路复用:select()函数能够同时监视多个文件描述符,实现I/O多路复用,从而提高了程序的并发处理能力和资源利用率。简单易用:select()函数的接口相对简单,易于理解和使用,特别是对于初学者来说。灵活性:select()函数允许程序根据文件描述符的读、写、异常等事件进行灵活的处理,满足不同的I/O需求。

缺点

文件描述符数量的限制:select()函数能够监视的文件描述符数量有限,通常在Linux上默认为1024个(尽管可以通过修改宏定义或重新编译内核来提升这一限制,但这样做可能会降低效率)。这对于需要监视大量文件描述符的应用程序来说是一个显著的限制。性能瓶颈:当监视的文件描述符数量较多时,select()函数的性能可能会成为瓶颈。因为每次调用select()时,内核都需要扫描所有被监视的文件描述符,这会导致不必要的开销。内核拷贝开销:在select()调用过程中,由于每次都要事先准本<code>fd_set结构内容,内核与用户空间之间需要进行内存拷贝操作,以传递文件描述符集合和结果。这会增加额外的开销,并可能影响性能。

因此,在选择是否使用select()函数时,需要根据具体的应用场景和需求进行权衡。对于需要监视大量文件描述符或追求高性能的应用程序来说,可能需要考虑使用更高级的I/O多路复用机制,如poll()或epoll()等。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。