目录
一、高级IO
1.1 概念
了解了网络通信相关的知识后,我们也许能直到,通信的本质就是IO,通信的核心是在两个或多个设备之间传输数据,这与计算机系统中的输入输出操作类似。
当通信时使用接收端口例如 recv 时,系统等待网络数据包从远程服务器通过网络传输到本地机器,数据包从网卡的硬件缓冲区复制到系统内存中的应用程序缓冲区;当文件读取时,系统等待磁盘将所请求的数据读取到磁盘缓冲区中,数据从磁盘缓冲区复制到系统内存中的用户空间。
所以换种说法,IO = 等待 + 拷贝
那么如何提高IO的效率呢?
当缩小了等待的时间后,IO的效率就会提高。
1.2 五种IO模型
然后,从一个钓鱼的实例引入今天的主题:
将上面的例子抽象成通信IO:
水池:OS内部缓冲区
水桶:用户缓冲区
鱼:数据
鱼竿:文件描述符
上面的五个人物分别对应了五种IO模型:
其中,前四个人都属于同步IO,即只要参与了IO的过程,那就是同步IO。田七将钓鱼的工作交给了另一个人,并没有参与IO,所以是异步IO。
阻塞 IO:在内核将数据准备好之前,系统调用会一直等待。所有的套接字,默认都是阻塞方式。阻塞 IO 是最常见的 IO 模型。
非阻塞 IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回 EWOULDBLOCK 错误码。非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询。这对 CPU 来说是较大的浪费,一般只有特定场景下才使用。
信号驱动 IO:内核将数据准备好的时候,使用 SIGIO 信号通知应用程序进行 IO 操作。
IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似。实际上最核心在于 IO 多路转接能够同时等待多个文件描述符的就绪状态。
异步 IO: 由内核在数据拷贝完成时,通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)。
1.3 小结
任何 IO 过程中,都包含两个步骤。第一是等待,第二是拷贝。而且在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间。让 IO 更高效,最核心的办法就是让等待的时间尽量少。
二、多路转接的老派
上面介绍的五人中,只有赵六真正实现了减少等待的时间,所以在IO中可以使用多路转接以达到高校IO,这里我们要介绍的就是多路转接中的老派—— select ,即使它有很多缺点,但是以为其出现的时间比较早,所以基本很多程序都会兼容它,一些比较古早的程序中也仍使用它。
2.1 select 的作用
select的主要作用是监视一组文件描述符,以查看其中哪些文件描述符处于可读、可写或有错误状态。当有时间就绪,就进行任务的派发。
2.2 select 的接口
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
其中,除了第一个参数,后三个均为输出型参数。
用户通过传入 fd_set 来告诉系统需要帮用户关注哪些文件描述符,系统则通过修改比特位告诉用户哪些文件描述符已经满足用户条件。在位图中,位数表示fd的值,0或1表示是否就绪。
用户通过传入 timeout 来表示等待的规则,struct timeval 是一个结构体,由一个表示秒的变量与一个表示微妙的变量组合,为 select 设置等待规则,如果用户设置等待 5s ,实际花费了 3s,则返回的是剩余时间 2s
- 如果
timeout
为NULL
,select
将会无限等待,直到至少一个文件描述符变得可用。 - 如果
timeout
为一个有效的timeval
结构,select
会等待指定的时间长度。如果超时,select
将返回0。 - 如果
timeout
为一个零时间的timeval
结构(即tv_sec
和tv_usec
都为0),select
会立即返回,不会等待。
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
-
FD_CLR(int fd, fd_set *set)
:- 作用:从文件描述符集合
set
中移除文件描述符fd
。将fd
在集合中的对应位清零。 - 使用场景:当不再需要监视某个文件描述符时,使用该函数将其从集合中移除。
- 作用:从文件描述符集合
-
FD_ISSET(int fd, fd_set *set)
:- 作用:检查文件描述符
fd
是否在集合set
中。如果fd
在集合中,则返回非零值;否则返回零。 - 使用场景:在
select
返回后,用于检测哪个文件描述符有事件发生。
- 作用:检查文件描述符
-
FD_SET(int fd, fd_set *set)
:- 作用:将文件描述符
fd
添加到集合set
中。将fd
在集合中的对应位置为1。 - 使用场景:在调用
select
之前,将需要监视的文件描述符添加到集合中。
- 作用:将文件描述符
-
FD_ZERO(fd_set *set)
:- 作用:初始化文件描述符集合
set
,将集合中所有的位清零。 - 使用场景:在使用
fd_set
之前,首先需要使用该函数初始化集合,以确保集合中的所有位都是零。
- 作用:初始化文件描述符集合
三、select 的编写
这里以 select_echo_server 入手,来认识熟悉 select 。
因为本篇博客是关于网络的信息,所以以下所说的文件描述符与套接字都是一个意思,即 sockfd
3.1 类的预先准备
select 需要有端口号与套接字,在套接字这里,我们选择使用TCP套接字,同时将之前编写过的TCP服务端进行进一步封装,这里使用自己封装过的类,可以省去了在程序中直接对套接字的创建、初始化与监听等工作。
下面先来看一下我们封装的Socket类,
Socket.hpp:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
namespace socket_ns
{
class Socket;
const static int gbacklog = 8;
using socket_sptr = std::shared_ptr<Socket>;
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR
};
class Socket
{
public:
virtual void CreateSocketOrDie() = 0;
virtual void BindSocketOrDie(InetAddr &addr) = 0;
virtual void ListenSocketOrDie() = 0;
virtual int Accepter(InetAddr *addr) = 0;
virtual bool Connetcor(InetAddr &addr) = 0;
virtual void SetSocketAddrReuse() = 0;
virtual int SockFd() = 0;
virtual int Recv(std::string *out) = 0;
virtual int Send(const std::string &in) = 0;
virtual void Close() = 0;
public:
void BuildListenSocket(InetAddr &addr)
{
CreateSocketOrDie();
SetSocketAddrReuse();
BindSocketOrDie(addr);
ListenSocketOrDie();
}
bool BuildClientSocket(InetAddr &addr)
{
CreateSocketOrDie();
return Connetcor(addr);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket(int fd = -1) : _sockfd(fd)
{
}
void CreateSocketOrDie() override
{
// 1. 创建流式套接字
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket error");
exit(SOCKET_ERROR);
}
LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
}
void BindSocketOrDie(InetAddr &addr) override
{
// 2. bind
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(addr.Port());
local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
int n = ::bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
if (n < 0)
{
LOG(FATAL, "bind error\n");
exit(BIND_ERROR);
}
LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
}
void ListenSocketOrDie() override
{
int n = ::listen(_sockfd, gbacklog);
if (n < 0)
{
LOG(FATAL, "listen error\n");
exit(LISTEN_ERROR);
}
LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
}
int Accepter(InetAddr *addr) override
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sockfd = ::accept(_sockfd, (struct sockaddr *)&peer, &len);
if (sockfd < 0)
{
LOG(WARNING, "accept error\n");
return -1;
}
*addr = peer;
return sockfd;
}
virtual bool Connetcor(InetAddr &addr)
{
struct sockaddr_in server;
// 构建目标主机的socket信息
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(addr.Port());
server.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
std::cerr << "connect error" << std::endl;
return false;
}
return true;
}
void SetSocketAddrReuse() override
{
int opt = 1;
::setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}
int Recv(std::string *out) override
{
char inbuffer[4096];
ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);
if (n > 0)
{
inbuffer[n] = 0;
*out = inbuffer; // ??? +=
}
return n;
}
int Send(const std::string &in) override
{
int n = ::send(_sockfd, in.c_str(), in.size(), 0);
return n;
}
int SockFd() override
{
return _sockfd;
}
void Close() override
{
if (_sockfd > -1)
::close(_sockfd);
}
private:
int _sockfd;
};
}
这里还用到了之前封装的 InetAddr 类与日志宏,详情可以看下面的博客:
Linux网络——套接字与UdpServer-CSDN博客
Linux网络——TcpServer-CSDN博客
日志宏的编写与线程池的结合-CSDN博客
Log.hpp:
#pragma once
#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "LockGuard.hpp"
bool gIsSave = false;
const std::string logname = "log.txt";
// 1. 日志是由等级的
enum Level
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL
};
void SaveFile(const std::string &filename, const std::string &message)
{
std::ofstream out(filename, std::ios::app);
if (!out.is_open())
{
return;
}
out << message;
out.close();
}
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
std::string GetTimeString()
{
time_t curr_time = time(nullptr);
struct tm *format_time = localtime(&curr_time);
if (format_time == nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
std::string levelstr = LevelToString(level);
std::string timestr = GetTimeString();
pid_t selfid = getpid();
char buffer[1024];
va_list arg;
va_start(arg, format);
vsnprintf(buffer, sizeof(buffer), format, arg);
va_end(arg);
std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer;
LockGuard lockguard(&lock);
if (!issave)
{
std::cout << message;
}
else
{
SaveFile(logname, message);
}
}
#define LOG(level, format, ...) \
do \
{ \
LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
} while (0)
#define EnableFile() \
do \
{ \
gIsSave = true; \
} while (0)
#define EnableScreen() \
do \
{ \
gIsSave = false; \
} while (0)
InetAddr:
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
class InetAddr
{
private:
void GetAddress(std::string *ip, uint16_t *port)
{
*port = ntohs(_addr.sin_port);
*ip = inet_ntoa(_addr.sin_addr);
}
public:
InetAddr(const struct sockaddr_in &addr) : _addr(addr)
{
GetAddress(&_ip, &_port);
}
InetAddr(const std::string &ip, uint16_t port) : _ip(ip), _port(port)
{
_addr.sin_family = AF_INET;
_addr.sin_port = htons(_port);
_addr.sin_addr.s_addr = inet_addr(_ip.c_str());
}
InetAddr()
{}
std::string Ip()
{
return _ip;
}
bool operator == (const InetAddr &addr)
{
if(_ip == addr._ip && _port == addr._port) // 方便测试
{
return true;
}
return false;
}
struct sockaddr_in Addr()
{
return _addr;
}
uint16_t Port()
{
return _port;
}
~InetAddr()
{
}
private:
struct sockaddr_in _addr;
std::string _ip;
uint16_t _port;
};
为了保护日志宏的线程安全,我们又使用到了之前封装的锁:
LockGuard.hpp:
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include <iostream>
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex); // 构造加锁
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
#endif
3.2 类的整体框架
我们知道 select 可以监视一组套接字,所以类的内部就需要一个数组来辅助,同时包括上面说的端口号与Tcp套接字。
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "Socket.hpp"
using namespace socket_ns;
class SelectServer
{
const static int N = sizeof(fd_set) * 8;
const static int defaultfd = -1;
public:
SelectServer(uint16_t port)
: _port(port),
_listensock(std::make_unique<TcpSocket>())
{
InetAddr addr("0", _port);
_listensock->BuildListenSocket(addr);
for (int i = 0; i < N; i++)
{
_fd_array[i] = defaultfd;
}
_fd_array[0] = _listensock->SockFd();
}
~SelectServer()
{
}
private:
uint16_t _port;
std::unique_ptr<Socket> _listensock;
int _fd_array[N]; // 辅助数组
};
在初始化时,初始化端口号是必须的,紧接着根据端口号初始化InetAddr,再根据端口号创建套接字并执行监听:
随后,要对辅助数组中的元素进行初始化,因为文件标识符不小于0,所以使用 -1 进行初始化,以后的代码只要判断数组中的该位置是否 <0 ,即可判断是否为有效的 fd 。
3.3 类的执行 Loop
在使用main函数时,只需要调用该函数就可以完成相关的操作。
这里就根据 select 的返回值进行相应的操作,比如成功、出错或超时。
int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1:
LOG(ERROR, "select error...\n");
break;
default:
LOG(DEBUG, "Event Happen. n : %d\n", n);
HandlerEvent(rfds);
break;
}
HandlerEvent是我们后续要写的一个回调函数,select 的参数中,max_fd 与 rfds 就是我们提前要进行的工作,其中,每次 select 每次都会将已就绪的套接字添加到 rfds 中。
接下来,根据 select 的传参,我们要进行两个变量的定义:
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = defaultfd;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
if (max_fd < _fd_array[i])
{
max_fd = _fd_array[i]; // 更新出最大的fd的值
}
}
首先是定义了一个 fd_set 的集合 rfds ,在select 中传入表示我们只在意读就绪的套接字。同时,当 select 不断更新已就绪的套接字,我们每次也要重新进行更新,要知道在 rfds 中保存的可能不是连续的数字,而是会自动分配当前最小的文件描述符,比如文件描述符 10 已经分配了但是用户未退,而 5 已经退了,此时再进来一个新的连接,会分配 5 而不是 11。这一步基本是使用 select 时必做的一个操作。
同时,上述的操作我们需要一直进行,每次一有新连接,价于对方给我发送数据!我们作为读事件同一处理,也就是说新连接到来等价于读事件就绪!所以我们要一直重复,把它放在 while 中:
void Loop()
{
while (true)
{
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = defaultfd;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
if (max_fd < _fd_array[i])
{
max_fd = _fd_array[i]; // 更新出最大的fd的值
}
}
struct timeval timeout = {0, 0};
int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1:
LOG(ERROR, "select error...\n");
break;
default:
LOG(DEBUG, "Event Happen. n : %d\n", n);
HandlerEvent(rfds);
break;
}
}
}
四、Loop 中的回调函数
4.1 HandlerEvent
下面根据回调函数的逻辑画了一张简略的流程图:
首先,遍历整个类成员——存放 sockfd 的数组;其次,使用 FD_ISSET 函数来确保该sockfd已就绪;随后,判断该文件描述符是否是用户的套接字,即判断是否是TCP中的 sockfd ,若不是,才会去执行最后的回调函数。
根据 FD_ISSET 的参数,很显然我们设计的该回调函数应该有 fd_set 的集合,故需要传入 Loop() 中的 fd_set 。
void HandlerEvent(fd_set &rfds)
{
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
if (FD_ISSET(_fd_array[i], &rfds))
{
if (_fd_array[i] == _listensock->SockFd())
{
AcceptClient();
}
else
{
ServiceIO(i);
}
}
}
}
4.2 AcceptClient
如果该 sockfd 是 listen 监听到的套接字,那么服务端就需要对其进行 accept 的处理,表示服务端已经收到了来自客户端的第三次握手请求,此时的返回值就是以后要使用 select 处理的返回值。
关于 accept 的介绍可以参考一下博客:Linux网络——TcpServer-CSDN博客
也就是说,使用 accept 后,返回的套接字信息,才是以后真正要进行处理的。所以这时候,又需要一次遍历,来为其返回值找到一个合适的位置。最后,还要判断该位置是否为合法位置,若合法才能进入数组,否则,添加失败。
void AcceptClient()
{
InetAddr clientaddr;
int sockfd = _listensock->Accepter(&clientaddr);
if (sockfd < 0)
return;
LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());
int pos = 1;
for (; pos < N; pos++)
{
if (_fd_array[pos] == defaultfd)
break;
}
if (pos == N)
{
::close(sockfd);
LOG(WARNING, "server is full!\n");
return;
}
else
{
_fd_array[pos] = sockfd;
LOG(DEBUG, "%d add to select array!\n", sockfd);
}
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
4.3 ServiceIO
当程序执行到这里的时候,基本可以判断该文件描述符是 select 要进行处理的 sockfd 了,这时定义的回调函数就可以根据要求任意定义了:
void ServiceIO(int pos)
{
char buffer[1024];
ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << "client say# " << buffer << std::endl;
std::string echo_string = "[server echo]# ";
echo_string += buffer;
::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);
}
else if (n == 0)
{
LOG(DEBUG, "%d is closed\n", _fd_array[pos]);
::close(_fd_array[pos]);
_fd_array[pos] = defaultfd;
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
else
{
LOG(DEBUG, "%d recv error\n", _fd_array[pos]);
::close(_fd_array[pos]);
_fd_array[pos] = defaultfd;
LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
}
}
其中,为了方便阅读,特意写了一个 ToString 的函数:
std::string RfdsToString()
{
std::string fdstr;
for (int i = 0; i < N; i++)
{
if (_fd_array[i] == defaultfd)
continue;
fdstr += std::to_string(_fd_array[i]);
fdstr += " ";
}
return fdstr;
}
五、select 的优缺点
5.1 优点
可以同时等待多个 sockfd ,提高了IO的效率
5.2 缺点
1. 等待的 sockfd 数量有限,一般为1024个。
2. select 调用中,输入输出参数混合,每次使用前后都要进行更新。
3. 内核与用户之间的数据拷贝。
4. select 底层监视多个 sockfd 的时候,操作系统会在内部遍历检测所有的 sockfd ,在监视大量文件描述符时,性能可能会下降。