select poll与epoll

同步I/O

在操作系统中,程序运行的空间分为内核空间和用户空间,用户空间所有对 io 操作的代码(如文件的读写、socket 的收发等)都会通过系统调用进入内核空间完成实际的操作。

而且我们都知道 CPU 的速度远远快于硬盘、网络等 I/O。在一个线程中,CPU 执行代码的速度极快,然而,一旦遇到 I/O 操作,如读写文件、发送网络数据时,就需要等待 I/O 操作完成,才能继续进行下一步操作,这种情况称为同步 I/O。

在某个应用程序运行时,假设需要读写某个文件,此时就发生了 I/O 操作,在 I/O 操作的过程中,系统会将当前线程挂起,而其他需要 CPU 执行的代码就无法被当前线程执行了,这就是同步 I/O 操作,因为一个 IO 操作就阻塞了当前线程,导致其他代码无法执行,所以我们可以使用 多线程或者多进程 来并发执行代码,当某个线程/进程被挂起后,不会影响其他线程或进程。

多线程和多进程虽然解决了这种并发的问题,但是系统不能无上限地增加线程/进程。由于系统切换线程/进程的开销也很大,所以,一旦线程/进程数量过多,CPU 的时间就花在线程/进程切换上了,真正运行代码的时间就少了,这样子的结果也导致系统性能严重下降。

多线程和多进程只是解决这一问题的一种方法,另一种解决 I/O 问题的方法是异步 I/O,当然还有其他解决的方法。

异步I/O

当程序需要对 I/O 进行操作时,它只发出 I/O 操作的指令,并不等待 I/O 操作的结果,然后就去执行其他代码了。一段时间后,当 I/O 返回结果时,再通知 CPU 进行处理。这样子用户空间中的程序不需要等待内核空间中的 I/O 完成实际操作,就可执行其他任务,提高 CPU 的利用率。

简单来说就是,用户不需要等待内核完成实际对io的读写操作就直接返回了。

阻塞I/O

Linux 中,默认情况下所有的 socket 都是阻塞的,一个典型的读操作流程大概是这样:

26_select poll与epoll.png

当用户进程调用了 read()/recvfrom() 等系统调用函数,它会进入内核空间中,当这个网络 I/O 没有数据的时候,内核就要等待数据的到来,而在用户进程这边,整个进程会被阻塞,直到内核空间返回数据。

当内核空间的数据准备好了,它就会将数据从内核空间中拷贝到用户空间,此时用户进程才解除阻塞的的状态,重新运行起来。

所以,阻塞 I/O 的特点就是在 IO 执行的两个阶段(用户空间与内核空间)都被阻塞了。

非阻塞I/O

Linux 下,可以通过设置 socket 使其变为非阻塞模式,这种情况下,当内核空间并无数据的时候,它会马上返回结果而不会阻塞,此时用户进程可以根据这个结果自由配置,比如继续请求数据,或者不再继续请求。当对一个非阻塞 socket 执行读操作时,流程是这个样子:

27_select poll与epoll.png

当用户进程调用 read()/recvfrom() 等系统调用函数时,如果内核空间中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个 error。

对于应用进程来说,它发起一个 read() 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道内核中的数据还没有准备好,那么它可以再次调用 read()/recvfrom() 等函数。

当内核空间的数据准备好了,它就会将数据从内核空间中拷贝到用户空间,此时用户进程也就得到了数据。

所以,非阻塞I/O的特点是用户进程需要不断的 主动询问 内核空间的数据准备好了没有。

多路复用I/O

多路复用 I/O 就是我们说的 select,poll,epoll 等操作,复用的好处就在于单个进程就可以同时处理多个网络连接的 I/O,能实现这种功能的原理就是 select、poll、epoll 等函数会不断的轮询它们所负责的所有 socket ,当某个 socket 有数据到达了,就通知用户进程。

一般来说 I/O 复用多用于以下情况:

  1. 当客户处理多个描述符时。
  2. 服务器在高并发处理网络连接的时候。
  3. 服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到 I/O 复用。
  4. 如果一个服务器即要处理 TCP,又要处理 UDP,一般要使用 I/O 复用。
  5. 如果一个服务器要处理多个服务或多个协议,一般要使用 I/O 复用。

与多进程和多线程技术相比, I/O 多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程 ,从而大大减小了系统的开销。但 select,poll,epoll 本质上都是同步 I/O,因为他们都需要 在读写事件就绪后自己负责进行读写 ,也就是说这个读写过程是阻塞的,而异步 I/O 则无需自己负责进行读写,异步 I/O 的实现会负责把数据从内核拷贝到用户空间。

select

用直白的话来介绍 select:select 机制会监听它所负责的所有 socket,当其中一个 socket 或者多个 socket 可读或者可写的时候,它就会返回,而如果所有的 socket 都是不可读或者不可写的时候,这个进程就会被阻塞,直到超时或者 socket 可读写,当 select 函数返回后,可以通过遍历 fdset,来找到就绪的描述符。

select整个处理过程如下

28_select poll与epoll.png

  1. 用户进程调用 select() 函数,如果当前没有可读写的 socket,则用户进程进入阻塞状态。
  2. 对于内核空间来说,它会从用户空间拷贝 fd_set 到内核空间,然后在内核中遍历一遍所有的 socket 描述符,如果没有满足条件的 socket 描述符,内核将进行休眠,当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的内核进程,即在 socket 可读写时唤醒,或者在超时后唤醒。
  3. 返回 select() 函数的调用结果给用户进程,返回就绪 socket 描述符的数目,超时返回 0,出错返回 -1。
  4. 注意,在 select() 函数返回后还是需要轮询去找到就绪的 socket 描述符的,此时用户进程才可以去操作 socket 。

select 目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。当然 select 也有很多缺点:

  1. 单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 上一般为 1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
  2. 需要维护一个用来存放大量 fd 的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。
  3. 每次在有 socket 描述符活跃的时候,都需要遍历一遍所有的 fd 找到该描述符,这会带来大量的时间消耗(时间复杂度是O(n),并且伴随着描述符越多,这开销呈线性增长)

对应内核来说,整个处理的流程如下:

29_select poll与epoll.png

select函数原型:

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)

参数说明:

  • maxfdp1 指定感兴趣的 socket 描述符个数,它的值是套接字最大 socket 描述符加 1,socket 描述符 0、1、2 … maxfdp1-1 均将被设置为感兴趣(即会查看他们是否可读、可写)。
  • readset:指定这个 socket 描述符是可读的时候才返回。
  • writeset:指定这个 socket 描述符是可写的时候才返回。
  • exceptset:指定这个 socket 描述符是异常条件时候才返回。
  • timeout:指定了超时的时间,当超时了也会返回。

如果对某一个的条件不感兴趣,就可以把它设为空指针。

返回值:就绪 socket 描述符的数目,超时返回 0,出错返回 -1。

select常用函数

select 是三者当中最底层的,它的事件的轮训机制是基于比特位的。每次查询都要遍历整个事件列表。

理解 select,首先要理解 select 要处理的 fd_set 数据结构,每个 select 都要处理一个 fd_set 结构。

fd_set 简单地理解为一个长度是 1024 的比特位,每个比特位表示一个需要处理的 FD,如果是 1,那么表示这个 FD 有需要处理的 I/O 事件,否则没有。Linux 为了简化位操作,定义了一组宏函数来处理这个比特位数组。

void FD_CLR(int fd, fd_set *set); // 清空fd在fd_set上的映射,说明select不在处理该fd int FD_ISSET(int fd, fd_set *set); // 查询fd指示的fd_set是否是有事件请求 void FD_SET(int fd, fd_set *set); // 把fd指示的fd_set置1 void FD_ZERO(fd_set *set); // 清空整个fd_set,一般用于初始化

select案例

int main() { fd_set read_fs, write_fs; struct timeval timeout; int max_sd = 0; // 用于记录最大的fd,在轮询中时刻更新即可 /* * 这里进行一些初始化的设置, * 包括socket建立,地址的设置等, * 同时记得初始化max_sd */ // 初始化比特位 FD_ZERO(&read_fs); FD_ZERO(&write_fs); int rc = 0; int desc_ready = 0; // 记录就绪的事件,可以减少遍历的次数 while (1) { // 这里进行阻塞 rc = select(max_sd + 1, &read_fd, &write_fd, NULL, &timeout); if (rc < 0) { // 这里进行错误处理机制 } if (rc == 0) { // 这里进行超时处理机制 } desc_ready = rc; // 遍历所有的比特位,轮询事件 for (int i = 0; i <= max_sd && desc_ready; ++i) { if (FD_ISSET(i, &read_fd)) { --desc_ready; // 这里处理read事件,别忘了更新max_sd } if (FD_ISSET(i, &write_fd)) { // 这里处理write事件,别忘了更新max_sd } } } }

select的缺点

  1. 每次调用 select ,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大。
  2. 同时每次调用 select 都需要在内核遍历传递进来的所有 fd ,这个开销在 fd 很多时也很大。
  3. 每次在 select() 函数返回后,都要通过遍历文件描述符来获取已经就绪的 socket 。
  4. select 支持的文件描述符数量太小了,默认是 1024。

poll

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同, poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 不限制 socket 描述符的个数,因为它是使用链表维护这些 socket 描述符的,其他的都差不多和 select() 函数一样。

poll() 函数返回后,需要轮询 pollfd 来获取就绪的描述符,根据描述符的状态进行处理,但是 poll 没有最大文件描述符数量的限制。 poll 和 select 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

函数原型

还是先了解 poll 底层操作的数据结构 pollfd:

struct pollfd { int fd; // 需要监视的文件描述符 short events; // 需要内核监视的事件 short revents; // 实际发生的事件 };

在使用该结构的时候,不用进行比特位的操作,而是对事件本身进行操作就行。同时还可以自定义事件的类型。具体可以参考手册。

同样的,事件默认初始化全部都是 0,通过 bzero 或者 memset 统一初始化即可,之后在 events 上注册感兴趣的事件,监听的时候在 revents 上监听即可。注册事件使用 | 操作,查询事件使用 & 操作。比如想要注册 POLLIN 数据到来的事件,需要 pfd.events |= POLLIN,注册多个事件进行多次 | 操作即可。取消事件进行 ~ 操作,比如 pfd.events ~= POLLIN。查询事件:pfd.revents & POLLIN。

使用 poll 函数进行操作:

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

参数说明:

  • fds:一个 pollfd 队列的队头指针,我们先把需要监视的文件描述符和他们上面的事件放到这个队列中
  • nfds:队列的长度
  • timeout:事件操作,设置指定正数的阻塞事件,0 表示非阻塞模式,-1 表示永久阻塞。

案例

// 先宏定义长度 #define MAX_POLLFD_LEN 200 int main() { /* * 在这里进行一些初始化的操作, * 比如初始化数据和socket等。 */ int rc = 0; pollfd fds[MAX_POLL_LEN]; memset(fds, 0, sizeof(fds)); int ndfs = 1; // 队列的实际长度,是一个随时更新的,也可以自定义其他的 int timeout = 0; /* * 在这里进行一些感兴趣事件的注册, * 每个pollfd可以注册多个类型的事件, * 使用 | 操作即可,就行博文提到的那样。 * 根据需要设置阻塞时间 */ int current_size = ndfs; int compress_array = 0; // 压缩队列的标记 while (1) { rc = poll(fds, nfds, timeout); if (rc < 0) { // 这里进行错误处理 } if (rc == 0) { // 这里进行超时处理 } for (int i = 0; i < current_size; ++i) { if (fds[i].revents == 0){ // 没有事件可以处理 continue; } if (fds[i].revents & POLLIN) { // 简单的例子,比如处理写事件 } /* * current_size 是为了降低复杂度的,可以随时进行更新 * ndfs如果要更新,应该是最后统一进行 */ } if (compress_array) { // 如果需要压缩队列 compress_array = 0; for (int i = 0; i < ndfs; ++i) { for (int j = i; j < ndfs; ++j) { fds[i].fd = fds[j + i].fd; } --i; --ndfs; } } } }

epoll

epoll的原理

其实相对于 select 和 poll 来说, epoll 更加灵活,但是核心的原理都是当 socket 描述符就绪(可读、可写、出现异常),就会通知应用进程,告诉他哪个 socket 描述符就绪,只是通知处理的方式不同而已。

epoll 使用一个 epfd (epoll 文件描述符)管理多个 socket 描述符,epoll 不限制 socket 描述符的个数, 将用户空间的 socket 描述符的事件存放到内核的一个事件表中 ,这样在用户空间和内核空间的 copy 只需一次。当 epoll 记录的 socket 产生就绪的时候,epoll 会通过 callback 的方式来激活这个 fd,这样子在 epoll_wait 便可以收到通知,告知应用层哪个 socket 就绪了,这种通知的方式是可以直接得到那个 socket 就绪的,因此相比于 select 和 poll ,它不需要遍历 socket 列表,时间复杂度是 O(1),不会因为记录的 socket 增多而导致开销变大。

epoll的操作模式

epoll 对 socket 描述符的操作有两种模式: LT(level trigger)和ET(edge trigger) 。LT 模式是默认模式,LT 模式与 ET 模式的区别如下:

  • LT模式:即水平出发模式,当 epoll_wait 检测到 socket 描述符处于就绪时就通知应用程序,应用程序可以不立即处理它。下次调用 epoll_wait 时,还会再次产生通知。
  • ET模式:即边缘触发模式,当 epoll_wait 检测到 socket 描述符处于就绪时就通知应用程序,应用程序必须立即处理它。如果不处理,下次调用 epoll_wait 时,不会再次产生通知。

ET 模式在很大程度上减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高 。epoll 工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

epoll函数

epoll 只有 epoll_create()、epoll_ctl()、epoll_wait() 3 个系统调用函数。

epoll_create()

int epoll_create(int size);

创建一个 epoll 的 epfd (epoll 文件描述符,或者称之为句柄),当创建好 epoll 句柄后,它就是会占用一个 fd 值,必须调用 close() 关闭,否则可能导致 fd 被耗尽,这也是为什么我们前面所讲的是: epoll 使用一个 epfd 管理多个 socket 描述符 。

size 参数用来告诉内核这个监听的数目一共有多大,它其实是在内核申请一空间,用来存放用户想监听的 socket fd 上是否可读可行或者其他异常,只要有足够的内存空间,size 可以随意设置大小,1G 的内存上能监听约 10 万个端口。

epoll_ctl()

该函数用于控制某个 epoll 文件描述符上的事件,可以注册事件,修改事件,以及删除事件。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数:

  • epdf:由 epoll_create() 函数返回的 epoll 文件描述符(句柄)。
  • op:op 是操作的选项,目前有以下三个选项:
    • EPOLL_CTL_ADD:注册要监听的目标 socket 描述符 fd 到 epoll 句柄中。
    • EPOLL_CTL_MOD:修改 epoll 句柄已经注册的 fd 的监听事件。
    • EPOLL_CTL_DEL:从 epoll 句柄删除已经注册的 socket 描述符。
  • fd:指定监听的 socket 描述符。
  • event:event 结构如下:
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
  • events 可以是以下几个宏的集合:
  • EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)。
  • EPOLLOUT:表示对应的文件描述符可以写。
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)。
  • EPOLLERR:表示对应的文件描述符发生错误。
  • EPOLLHUP:表示对应的文件描述符被挂断。
  • EPOLLET: 将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。

epoll_wait()

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll_wait() 函数的作用就是等待监听的事件的发生,类似于调用 select() 函数。

参数:

  • events:用来从内核得到事件的集合。
  • maxevents:告知内核这个 events 有多大,这个 maxevents 的值不能大于创建 epoll_create() 时的指定的 size。
  • timeout:超时时间。

函数的返回值表示需要处理的事件数目,如返回 0 表示已超时。

案例

#include <sys/socket.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <stdio.h> #include <errno.h> #include <iostream> using namespace std; #define MAX_EVENTS 500 struct myevent_s { int fd; void (*call_back)(int fd, int events, void *arg); int events; void *arg; int status; // 1: in epoll wait list, 0 not in char buff[128]; // recv data buffer int len, s_offset; long last_active; // last active time }; // set event void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg) { ev->fd = fd; ev->call_back = call_back; ev->events = 0; ev->arg = arg; ev->status = 0; bzero(ev->buff, sizeof(ev->buff)); ev->s_offset = 0; ev->len = 0; ev->last_active = time(NULL); } // add/mod an event to epoll void EventAdd(int epollFd, int events, myevent_s *ev) { struct epoll_event epv = {0, {0}}; int op; epv.data.ptr = ev; epv.events = ev->events = events; if(ev->status == 1){ op = EPOLL_CTL_MOD; } else{ op = EPOLL_CTL_ADD; ev->status = 1; } if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0) printf("Event Add failed[fd=%d], evnets[%d]\n", ev->fd, events); else printf("Event Add OK[fd=%d], op=%d, evnets[%0X]\n", ev->fd, op, events); } // delete an event from epoll void EventDel(int epollFd, myevent_s *ev) { struct epoll_event epv = {0, {0}}; if(ev->status != 1) return; epv.data.ptr = ev; ev->status = 0; epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv); } int g_epollFd; myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd void RecvData(int fd, int events, void *arg); void SendData(int fd, int events, void *arg); // accept new connections from clients void AcceptConn(int fd, int events, void *arg) { struct sockaddr_in sin; socklen_t len = sizeof(struct sockaddr_in); int nfd, i; // accept if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1) { if(errno != EAGAIN && errno != EINTR) { } printf("%s: accept, %d", __func__, errno); return; } do { for(i = 0; i < MAX_EVENTS; i++) { if(g_Events[i].status == 0) { break; } } if(i == MAX_EVENTS) { printf("%s:max connection limit[%d].", __func__, MAX_EVENTS); break; } // set nonblocking int iret = 0; if((iret = fcntl(nfd, F_SETFL, O_NONBLOCK)) < 0) { printf("%s: fcntl nonblocking failed:%d", __func__, iret); break; } // add a read event for receive data EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]); EventAdd(g_epollFd, EPOLLIN, &g_Events[i]); }while(0); printf("new conn[%s:%d][time:%d], pos[%d]\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), g_Events[i].last_active, i); } // receive data void RecvData(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s*)arg; int len; // receive data len = recv(fd, ev->buff+ev->len, sizeof(ev->buff)-1-ev->len, 0); EventDel(g_epollFd, ev); if(len > 0) { ev->len += len; ev->buff[len] = '\0'; printf("C[%d]:%s\n", fd, ev->buff); // change to send event EventSet(ev, fd, SendData, ev); EventAdd(g_epollFd, EPOLLOUT, ev); } else if(len == 0) { close(ev->fd); printf("[fd=%d] pos[%d], closed gracefully.\n", fd, ev-g_Events); } else { close(ev->fd); printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } } // send data void SendData(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s*)arg; int len; // send data len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, 0); if(len > 0) { printf("send[fd=%d], [%d<->%d]%s\n", fd, len, ev->len, ev->buff); ev->s_offset += len; if(ev->s_offset == ev->len) { // change to receive event EventDel(g_epollFd, ev); EventSet(ev, fd, RecvData, ev); EventAdd(g_epollFd, EPOLLIN, ev); } } else { close(ev->fd); EventDel(g_epollFd, ev); printf("send[fd=%d] error[%d]\n", fd, errno); } } void InitListenSocket(int epollFd, short port) { int listenFd = socket(AF_INET, SOCK_STREAM, 0); fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking printf("server listen fd=%d\n", listenFd); EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]); // add listen socket EventAdd(epollFd, EPOLLIN, &g_Events[MAX_EVENTS]); // bind & listen sockaddr_in sin; bzero(&sin, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(port); bind(listenFd, (const sockaddr*)&sin, sizeof(sin)); listen(listenFd, 5); } int main(int argc, char **argv) { unsigned short port = 12345; // default port if(argc == 2){ port = atoi(argv[1]); } // create epoll g_epollFd = epoll_create(MAX_EVENTS); if(g_epollFd <= 0) printf("create epoll failed.%d\n", g_epollFd); // create & bind listen socket, and add to epoll, set non-blocking InitListenSocket(g_epollFd, port); // event loop struct epoll_event events[MAX_EVENTS]; printf("server running:port[%d]\n", port); int checkPos = 0; while(1){ // a simple timeout check here, every time 100, better to use a mini-heap, and add timer event long now = time(NULL); for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd { if(checkPos == MAX_EVENTS) checkPos = 0; // recycle if(g_Events[checkPos].status != 1) continue; long duration = now - g_Events[checkPos].last_active; if(duration >= 60) // 60s timeout { close(g_Events[checkPos].fd); printf("[fd=%d] timeout[%d--%d].\n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now); EventDel(g_epollFd, &g_Events[checkPos]); } } // wait for events to happen int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000); if(fds < 0){ printf("epoll_wait error, exit\n"); break; } for(int i = 0; i < fds; i++){ myevent_s *ev = (struct myevent_s*)events[i].data.ptr; if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event { ev->call_back(ev->fd, events[i].events, ev->arg); } if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event { ev->call_back(ev->fd, events[i].events, ev->arg); } } } // free resource return 0; }

epoll为什么更高效

  1. 当我们调用 epoll_wait() 函数返回的不是实际的描述符,而是一个代表就绪描述符数量的值,这个时候需要去 epoll 指定的一个数组中依次取得相应数量的socket描述符即可,而不需要遍历扫描所有的socket描述符,因此这里的时间复杂度是O(1)。
  2. 此外还使用了内存映射( mmap )技术,这样便彻底省掉了这些socket描述符在系统调用时拷贝的开销(因为从用户空间到内核空间需要拷贝操作)。mmap将用户空间的一块地址和内核空间的一块地址同时映射到相同的一块物理内存地址(不管是用户空间还是内核空间都是虚拟地址,最终要通过地址映射映射到物理地址),使得这块物理内存对内核和对用户均可见,减少用户态和内核态之间的数据交换,不需要依赖拷贝,这样子内核可以直接看到epoll监听的socket描述符,效率极高。
  3. 另一个本质的改进在于 epoll 采用基于事件的就绪通知方式。在 select/poll 中,进程只有在调用一定的方法后,内核才对所有监视的socket描述符进行扫描,而 epoll 事先通过 epoll_ctl() 来注册一个socket描述符,一旦检测到epoll管理的socket描述符就绪时,内核会采用类似 callback 的回调机制,迅速激活这个socket描述符,当进程调用 epoll_wait() 时便可以得到通知,也就是说epoll最大的优点就在于它 只管就绪的socket描述符,而跟socket描述符的总数无关 。

select poll与epoll总结

select,poll,epoll 都是 IO 多路复用的机制。I/O 多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

但 select,poll,epoll 本质上都是同步 I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步 I/O 则无需自己负责进行读写,异步 I/O 的实现会负责把数据从内核拷贝到用户空间。

select的几大缺点

  1. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大。
  2. 同时每次调用 select 都需要在内核遍历传递进来的所有 fd,这个开销在 fd 很多时也很大。
  3. select 支持的文件描述符数量太小了,默认是 1024。

poll实现

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,其他的都差不多。

epoll

epoll 既然是对 select 和 poll 的改进,就应该能避免上述的三个缺点。那 epoll 都是怎么解决的呢?在此之前,我们先看一下 epoll 和 select 和 poll 的调用接口上的不同,select 和 poll 都只提供了一个函数—— select 或者 poll 函数。而 epoll 提供了三个函数,epoll_create,epoll_ctl 和 epoll_wait,epoll_create 是创建一个 epoll 句柄;epoll_ctl 是注册要监听的事件类型;epoll_wait 则是等待事件的产生。

对于第一个缺点,epoll 的解决方案在 epoll_ctl 函数中。每次注册新的事件到 epoll 句柄中时(在 epoll_ctl 中指定 EPOLL_CTL_ADD),会把所有的 fd 拷贝进内核,而不是在 epoll_wait 的时候重复拷贝。epoll 保证了每个 fd 在整个过程中只会拷贝一次。

对于第二个缺点,epoll 的解决方案不像 select 或 poll 一样每次都把 current 轮流加入 fd 对应的设备等待队列中,而只在 epoll_ctl 时把 current 挂一遍(这一遍必不可少)并为每个 fd 指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的 fd 加入一个就绪链表)。epoll_wait 的工作实际上就是在这个就绪链表中查看有没有就绪的 fd(利用 schedule_timeout()实 现睡一会,判断一会的效果,和 select 实现中的第 7 步是类似的)。

对于第三个缺点,epoll 没有这个限制,它所支持的 FD 上限是最大可以打开文件的数目,这个数字一般远大于 2048,举个例子,在 1GB 内存的机器上大约是 10 万左右,具体数目可以 cat /proc/sys/fs/file-max 察看,一般来说这个数目和系统内存关系很大。