在操作系统中,程序运行的空间分为内核空间和用户空间,用户空间所有对 io 操作的代码(如文件的读写、socket 的收发等)都会通过系统调用进入内核空间完成实际的操作。
而且我们都知道 CPU 的速度远远快于硬盘、网络等 I/O。在一个线程中,CPU 执行代码的速度极快,然而,一旦遇到 I/O 操作,如读写文件、发送网络数据时,就需要等待 I/O 操作完成,才能继续进行下一步操作,这种情况称为同步 I/O。
在某个应用程序运行时,假设需要读写某个文件,此时就发生了 I/O 操作,在 I/O 操作的过程中,系统会将当前线程挂起,而其他需要 CPU 执行的代码就无法被当前线程执行了,这就是同步 I/O 操作,因为一个 IO 操作就阻塞了当前线程,导致其他代码无法执行,所以我们可以使用 多线程或者多进程 来并发执行代码,当某个线程/进程被挂起后,不会影响其他线程或进程。
多线程和多进程虽然解决了这种并发的问题,但是系统不能无上限地增加线程/进程。由于系统切换线程/进程的开销也很大,所以,一旦线程/进程数量过多,CPU 的时间就花在线程/进程切换上了,真正运行代码的时间就少了,这样子的结果也导致系统性能严重下降。
多线程和多进程只是解决这一问题的一种方法,另一种解决 I/O 问题的方法是异步 I/O,当然还有其他解决的方法。
当程序需要对 I/O 进行操作时,它只发出 I/O 操作的指令,并不等待 I/O 操作的结果,然后就去执行其他代码了。一段时间后,当 I/O 返回结果时,再通知 CPU 进行处理。这样子用户空间中的程序不需要等待内核空间中的 I/O 完成实际操作,就可执行其他任务,提高 CPU 的利用率。
简单来说就是,用户不需要等待内核完成实际对io的读写操作就直接返回了。
在 Linux 中,默认情况下所有的 socket 都是阻塞的,一个典型的读操作流程大概是这样:
当用户进程调用了 read()/recvfrom() 等系统调用函数,它会进入内核空间中,当这个网络 I/O 没有数据的时候,内核就要等待数据的到来,而在用户进程这边,整个进程会被阻塞,直到内核空间返回数据。
当内核空间的数据准备好了,它就会将数据从内核空间中拷贝到用户空间,此时用户进程才解除阻塞的的状态,重新运行起来。
所以,阻塞 I/O 的特点就是在 IO 执行的两个阶段(用户空间与内核空间)都被阻塞了。
Linux 下,可以通过设置 socket 使其变为非阻塞模式,这种情况下,当内核空间并无数据的时候,它会马上返回结果而不会阻塞,此时用户进程可以根据这个结果自由配置,比如继续请求数据,或者不再继续请求。当对一个非阻塞 socket 执行读操作时,流程是这个样子:
当用户进程调用 read()/recvfrom() 等系统调用函数时,如果内核空间中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个 error。
对于应用进程来说,它发起一个 read() 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道内核中的数据还没有准备好,那么它可以再次调用 read()/recvfrom() 等函数。
当内核空间的数据准备好了,它就会将数据从内核空间中拷贝到用户空间,此时用户进程也就得到了数据。
所以,非阻塞I/O的特点是用户进程需要不断的 主动询问 内核空间的数据准备好了没有。
多路复用 I/O 就是我们说的 select,poll,epoll 等操作,复用的好处就在于单个进程就可以同时处理多个网络连接的 I/O,能实现这种功能的原理就是 select、poll、epoll 等函数会不断的轮询它们所负责的所有 socket ,当某个 socket 有数据到达了,就通知用户进程。
一般来说 I/O 复用多用于以下情况:
与多进程和多线程技术相比, I/O 多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程 ,从而大大减小了系统的开销。但 select,poll,epoll 本质上都是同步 I/O,因为他们都需要 在读写事件就绪后自己负责进行读写 ,也就是说这个读写过程是阻塞的,而异步 I/O 则无需自己负责进行读写,异步 I/O 的实现会负责把数据从内核拷贝到用户空间。
用直白的话来介绍 select:select 机制会监听它所负责的所有 socket,当其中一个 socket 或者多个 socket 可读或者可写的时候,它就会返回,而如果所有的 socket 都是不可读或者不可写的时候,这个进程就会被阻塞,直到超时或者 socket 可读写,当 select 函数返回后,可以通过遍历 fdset,来找到就绪的描述符。
select整个处理过程如下
select 目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。当然 select 也有很多缺点:
对应内核来说,整个处理的流程如下:
select函数原型:
int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)
参数说明:
如果对某一个的条件不感兴趣,就可以把它设为空指针。
返回值:就绪 socket 描述符的数目,超时返回 0,出错返回 -1。
select常用函数
select 是三者当中最底层的,它的事件的轮训机制是基于比特位的。每次查询都要遍历整个事件列表。
理解 select,首先要理解 select 要处理的 fd_set 数据结构,每个 select 都要处理一个 fd_set 结构。
fd_set 简单地理解为一个长度是 1024 的比特位,每个比特位表示一个需要处理的 FD,如果是 1,那么表示这个 FD 有需要处理的 I/O 事件,否则没有。Linux 为了简化位操作,定义了一组宏函数来处理这个比特位数组。
void FD_CLR(int fd, fd_set *set); // 清空fd在fd_set上的映射,说明select不在处理该fd
int FD_ISSET(int fd, fd_set *set); // 查询fd指示的fd_set是否是有事件请求
void FD_SET(int fd, fd_set *set); // 把fd指示的fd_set置1
void FD_ZERO(fd_set *set); // 清空整个fd_set,一般用于初始化
select案例
int main() {
fd_set read_fs, write_fs;
struct timeval timeout;
int max_sd = 0; // 用于记录最大的fd,在轮询中时刻更新即可
/*
* 这里进行一些初始化的设置,
* 包括socket建立,地址的设置等,
* 同时记得初始化max_sd
*/
// 初始化比特位
FD_ZERO(&read_fs);
FD_ZERO(&write_fs);
int rc = 0;
int desc_ready = 0; // 记录就绪的事件,可以减少遍历的次数
while (1) {
// 这里进行阻塞
rc = select(max_sd + 1, &read_fd, &write_fd, NULL, &timeout);
if (rc < 0) {
// 这里进行错误处理机制
}
if (rc == 0) {
// 这里进行超时处理机制
}
desc_ready = rc;
// 遍历所有的比特位,轮询事件
for (int i = 0; i <= max_sd && desc_ready; ++i) {
if (FD_ISSET(i, &read_fd)) {
--desc_ready;
// 这里处理read事件,别忘了更新max_sd
}
if (FD_ISSET(i, &write_fd)) {
// 这里处理write事件,别忘了更新max_sd
}
}
}
}
select的缺点
poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同, poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 不限制 socket 描述符的个数,因为它是使用链表维护这些 socket 描述符的,其他的都差不多和 select() 函数一样。
poll() 函数返回后,需要轮询 pollfd 来获取就绪的描述符,根据描述符的状态进行处理,但是 poll 没有最大文件描述符数量的限制。 poll 和 select 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
函数原型
还是先了解 poll 底层操作的数据结构 pollfd:
struct pollfd {
int fd; // 需要监视的文件描述符
short events; // 需要内核监视的事件
short revents; // 实际发生的事件
};
在使用该结构的时候,不用进行比特位的操作,而是对事件本身进行操作就行。同时还可以自定义事件的类型。具体可以参考手册。
同样的,事件默认初始化全部都是 0,通过 bzero 或者 memset 统一初始化即可,之后在 events 上注册感兴趣的事件,监听的时候在 revents 上监听即可。注册事件使用 | 操作,查询事件使用 & 操作。比如想要注册 POLLIN 数据到来的事件,需要 pfd.events |= POLLIN,注册多个事件进行多次 | 操作即可。取消事件进行 ~ 操作,比如 pfd.events ~= POLLIN。查询事件:pfd.revents & POLLIN。
使用 poll 函数进行操作:
int poll (struct pollfd *fds, unsigned int nfds, int timeout);
参数说明:
案例
// 先宏定义长度
#define MAX_POLLFD_LEN 200
int main() {
/*
* 在这里进行一些初始化的操作,
* 比如初始化数据和socket等。
*/
int rc = 0;
pollfd fds[MAX_POLL_LEN];
memset(fds, 0, sizeof(fds));
int ndfs = 1; // 队列的实际长度,是一个随时更新的,也可以自定义其他的
int timeout = 0;
/*
* 在这里进行一些感兴趣事件的注册,
* 每个pollfd可以注册多个类型的事件,
* 使用 | 操作即可,就行博文提到的那样。
* 根据需要设置阻塞时间
*/
int current_size = ndfs;
int compress_array = 0; // 压缩队列的标记
while (1) {
rc = poll(fds, nfds, timeout);
if (rc < 0) {
// 这里进行错误处理
}
if (rc == 0) {
// 这里进行超时处理
}
for (int i = 0; i < current_size; ++i) {
if (fds[i].revents == 0){ // 没有事件可以处理
continue;
}
if (fds[i].revents & POLLIN) { // 简单的例子,比如处理写事件
}
/*
* current_size 是为了降低复杂度的,可以随时进行更新
* ndfs如果要更新,应该是最后统一进行
*/
}
if (compress_array) { // 如果需要压缩队列
compress_array = 0;
for (int i = 0; i < ndfs; ++i) {
for (int j = i; j < ndfs; ++j) {
fds[i].fd = fds[j + i].fd;
}
--i;
--ndfs;
}
}
}
}
epoll的原理
其实相对于 select 和 poll 来说, epoll 更加灵活,但是核心的原理都是当 socket 描述符就绪(可读、可写、出现异常),就会通知应用进程,告诉他哪个 socket 描述符就绪,只是通知处理的方式不同而已。
epoll 使用一个 epfd (epoll 文件描述符)管理多个 socket 描述符,epoll 不限制 socket 描述符的个数, 将用户空间的 socket 描述符的事件存放到内核的一个事件表中 ,这样在用户空间和内核空间的 copy 只需一次。当 epoll 记录的 socket 产生就绪的时候,epoll 会通过 callback 的方式来激活这个 fd,这样子在 epoll_wait 便可以收到通知,告知应用层哪个 socket 就绪了,这种通知的方式是可以直接得到那个 socket 就绪的,因此相比于 select 和 poll ,它不需要遍历 socket 列表,时间复杂度是 O(1),不会因为记录的 socket 增多而导致开销变大。
epoll的操作模式
epoll 对 socket 描述符的操作有两种模式: LT(level trigger)和ET(edge trigger) 。LT 模式是默认模式,LT 模式与 ET 模式的区别如下:
ET 模式在很大程度上减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高 。epoll 工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
epoll函数
epoll 只有 epoll_create()、epoll_ctl()、epoll_wait() 3 个系统调用函数。
epoll_create()
int epoll_create(int size);
创建一个 epoll 的 epfd (epoll 文件描述符,或者称之为句柄),当创建好 epoll 句柄后,它就是会占用一个 fd 值,必须调用 close() 关闭,否则可能导致 fd 被耗尽,这也是为什么我们前面所讲的是: epoll 使用一个 epfd 管理多个 socket 描述符 。
size 参数用来告诉内核这个监听的数目一共有多大,它其实是在内核申请一空间,用来存放用户想监听的 socket fd 上是否可读可行或者其他异常,只要有足够的内存空间,size 可以随意设置大小,1G 的内存上能监听约 10 万个端口。
epoll_ctl()
该函数用于控制某个 epoll 文件描述符上的事件,可以注册事件,修改事件,以及删除事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
epoll_wait()
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
epoll_wait() 函数的作用就是等待监听的事件的发生,类似于调用 select() 函数。
参数:
函数的返回值表示需要处理的事件数目,如返回 0 表示已超时。
案例
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <iostream>
using namespace std;
#define MAX_EVENTS 500
struct myevent_s
{
int fd;
void (*call_back)(int fd, int events, void *arg);
int events;
void *arg;
int status; // 1: in epoll wait list, 0 not in
char buff[128]; // recv data buffer
int len, s_offset;
long last_active; // last active time
};
// set event
void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
bzero(ev->buff, sizeof(ev->buff));
ev->s_offset = 0;
ev->len = 0;
ev->last_active = time(NULL);
}
// add/mod an event to epoll
void EventAdd(int epollFd, int events, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if(ev->status == 1){
op = EPOLL_CTL_MOD;
}
else{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)
printf("Event Add failed[fd=%d], evnets[%d]\n", ev->fd, events);
else
printf("Event Add OK[fd=%d], op=%d, evnets[%0X]\n", ev->fd, op, events);
}
// delete an event from epoll
void EventDel(int epollFd, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1) return;
epv.data.ptr = ev;
ev->status = 0;
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
}
int g_epollFd;
myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd
void RecvData(int fd, int events, void *arg);
void SendData(int fd, int events, void *arg);
// accept new connections from clients
void AcceptConn(int fd, int events, void *arg)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
int nfd, i;
// accept
if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{
}
printf("%s: accept, %d", __func__, errno);
return;
}
do
{
for(i = 0; i < MAX_EVENTS; i++)
{
if(g_Events[i].status == 0)
{
break;
}
}
if(i == MAX_EVENTS)
{
printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);
break;
}
// set nonblocking
int iret = 0;
if((iret = fcntl(nfd, F_SETFL, O_NONBLOCK)) < 0)
{
printf("%s: fcntl nonblocking failed:%d", __func__, iret);
break;
}
// add a read event for receive data
EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);
EventAdd(g_epollFd, EPOLLIN, &g_Events[i]);
}while(0);
printf("new conn[%s:%d][time:%d], pos[%d]\n", inet_ntoa(sin.sin_addr),
ntohs(sin.sin_port), g_Events[i].last_active, i);
}
// receive data
void RecvData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// receive data
len = recv(fd, ev->buff+ev->len, sizeof(ev->buff)-1-ev->len, 0);
EventDel(g_epollFd, ev);
if(len > 0)
{
ev->len += len;
ev->buff[len] = '\0';
printf("C[%d]:%s\n", fd, ev->buff);
// change to send event
EventSet(ev, fd, SendData, ev);
EventAdd(g_epollFd, EPOLLOUT, ev);
}
else if(len == 0)
{
close(ev->fd);
printf("[fd=%d] pos[%d], closed gracefully.\n", fd, ev-g_Events);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
}
// send data
void SendData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// send data
len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, 0);
if(len > 0)
{
printf("send[fd=%d], [%d<->%d]%s\n", fd, len, ev->len, ev->buff);
ev->s_offset += len;
if(ev->s_offset == ev->len)
{
// change to receive event
EventDel(g_epollFd, ev);
EventSet(ev, fd, RecvData, ev);
EventAdd(g_epollFd, EPOLLIN, ev);
}
}
else
{
close(ev->fd);
EventDel(g_epollFd, ev);
printf("send[fd=%d] error[%d]\n", fd, errno);
}
}
void InitListenSocket(int epollFd, short port)
{
int listenFd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking
printf("server listen fd=%d\n", listenFd);
EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);
// add listen socket
EventAdd(epollFd, EPOLLIN, &g_Events[MAX_EVENTS]);
// bind & listen
sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(listenFd, (const sockaddr*)&sin, sizeof(sin));
listen(listenFd, 5);
}
int main(int argc, char **argv)
{
unsigned short port = 12345; // default port
if(argc == 2){
port = atoi(argv[1]);
}
// create epoll
g_epollFd = epoll_create(MAX_EVENTS);
if(g_epollFd <= 0) printf("create epoll failed.%d\n", g_epollFd);
// create & bind listen socket, and add to epoll, set non-blocking
InitListenSocket(g_epollFd, port);
// event loop
struct epoll_event events[MAX_EVENTS];
printf("server running:port[%d]\n", port);
int checkPos = 0;
while(1){
// a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
long now = time(NULL);
for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd
{
if(checkPos == MAX_EVENTS) checkPos = 0; // recycle
if(g_Events[checkPos].status != 1) continue;
long duration = now - g_Events[checkPos].last_active;
if(duration >= 60) // 60s timeout
{
close(g_Events[checkPos].fd);
printf("[fd=%d] timeout[%d--%d].\n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);
EventDel(g_epollFd, &g_Events[checkPos]);
}
}
// wait for events to happen
int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);
if(fds < 0){
printf("epoll_wait error, exit\n");
break;
}
for(int i = 0; i < fds; i++){
myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
// free resource
return 0;
}
epoll为什么更高效
select,poll,epoll 都是 IO 多路复用的机制。I/O 多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
但 select,poll,epoll 本质上都是同步 I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步 I/O 则无需自己负责进行读写,异步 I/O 的实现会负责把数据从内核拷贝到用户空间。
poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,其他的都差不多。
epoll 既然是对 select 和 poll 的改进,就应该能避免上述的三个缺点。那 epoll 都是怎么解决的呢?在此之前,我们先看一下 epoll 和 select 和 poll 的调用接口上的不同,select 和 poll 都只提供了一个函数—— select 或者 poll 函数。而 epoll 提供了三个函数,epoll_create,epoll_ctl 和 epoll_wait,epoll_create 是创建一个 epoll 句柄;epoll_ctl 是注册要监听的事件类型;epoll_wait 则是等待事件的产生。
对于第一个缺点,epoll 的解决方案在 epoll_ctl 函数中。每次注册新的事件到 epoll 句柄中时(在 epoll_ctl 中指定 EPOLL_CTL_ADD),会把所有的 fd 拷贝进内核,而不是在 epoll_wait 的时候重复拷贝。epoll 保证了每个 fd 在整个过程中只会拷贝一次。
对于第二个缺点,epoll 的解决方案不像 select 或 poll 一样每次都把 current 轮流加入 fd 对应的设备等待队列中,而只在 epoll_ctl 时把 current 挂一遍(这一遍必不可少)并为每个 fd 指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的 fd 加入一个就绪链表)。epoll_wait 的工作实际上就是在这个就绪链表中查看有没有就绪的 fd(利用 schedule_timeout()实 现睡一会,判断一会的效果,和 select 实现中的第 7 步是类似的)。
对于第三个缺点,epoll 没有这个限制,它所支持的 FD 上限是最大可以打开文件的数目,这个数字一般远大于 2048,举个例子,在 1GB 内存的机器上大约是 10 万左右,具体数目可以 cat /proc/sys/fs/file-max 察看,一般来说这个数目和系统内存关系很大。