linux网络编程之I/O多路复用学习

I/O多路复用就是通过一种机制,使一个进程可以监视多个描述符,一旦某个描述符读写就绪,能够通知程序进行相应的读写操作。I/O多路复用本质上都是同步I/O,它们都需要在读写事件就绪后自己负责读写,这个读写过程是阻塞的。异步I/O则无需自己负责读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程或线程,也不必维护这些进程或线程,从而大大减小了系统的开销。

linux下实现I/O多路复用最常用的方式是select和epoll。

本文的文字主要都是抄自这篇

select、epoll简介

select和epoll都提供I/O多路复用的解决方案。epoll是linux特有,只有在2.6内核以上才支持,而select是POSIX规定的,一般操作系统均有实现。

select

select函数监视的文件描述符分3类,分别是writefds,readfds,exceptfds。调用select函数后会阻塞,直到有描述符就绪或者超时,函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。

select支持所有操作系统,具有跨平台优点。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在linux一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但会造成效率降低。

select本质上是通过设置或检查存放fd标志位的数据结构来进行下一步处理。这样带来的缺点是:

  1. select最大的缺陷就是单个进程所打开的fd是有一定限制的,它由FD_SETSIZE设置,默认是1024。
  2. 对fd进行扫描时是线性扫描,采用轮询的方式,效率较低。
  3. 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。

epoll

epoll更灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需要一次。

基本原理:epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就绪态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。

epoll的优点:

  1. 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。
  2. 效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。
  3. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。

epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

  • LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
  • ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

注意:如果没有大量的idle-connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle-connection,就会发现epoll的效率大大高于select/poll。

select和epoll对比

x 最大连接 效率 消息传递方式
select 有限制 fd多时效率低 内核要将消息传递到用户空间,都需要内核拷贝动作
epoll 无限制 fd多时效率高 epoll通过内核和用户空间共享一块内存来实现

总结:

  1. 表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。
  2. select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善。

select实例

select1.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int main(void)
{
fd_set rfds;
struct timeval tv;
int retval;
FD_ZERO(&rfds);
FD_SET(0, &rfds);
tv.tv_sec = 5;
tv.tv_usec = 500000;
retval = select(1, &rfds, 0, 0, &tv);
if (retval == -1)
perror("select()");
else if (retval > 0)
printf("data is available now.\n");
else
printf("no data within 5.5 seconds.\n");
return 0;
}

select2.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/time.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <unistd.h>
int main()
{
char buffer[128];
int result, nread;
fd_set inputs, testfds;
struct timeval timeout;
FD_ZERO(&inputs);
FD_SET(0, &inputs);
while (1) {
testfds = inputs;
timeout.tv_sec = 2;
timeout.tv_usec = 500000;
result = select(FD_SETSIZE, &testfds, 0, 0, &timeout);
switch (result) {
case 0:
printf("timeout\n");
break;
case -1:
perror("select");
exit(1);
default:
if (FD_ISSET(0, &testfds)) {
ioctl(0, FIONREAD, &nread);
if (nread == 0) {
printf("keyboard done\n");
exit(1);
}
nread = read(0, buffer, nread);
buffer[nread] = 0;
printf("read %d from keyboard: %s", nread, buffer);
}
break;
}
}
return 0;
}

select-server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int main()
{
int server_sockfd, client_sockfd;
int server_len, client_len;
struct sockaddr_in server_address, client_address;
int result;
fd_set readfds, testfds;
server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
server_len = sizeof(server_address);
memset(&server_address, 0, server_len);
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(8888);
bind(server_sockfd, (struct sockaddr *)&server_address, server_len);
listen(server_sockfd, 10);
FD_ZERO(&readfds);
FD_SET(server_sockfd, &readfds);
while (1) {
char ch;
int fd;
int nread;
testfds = readfds;
printf("server waiting\n");
result = select(FD_SETSIZE, &testfds, 0, 0, 0);
if (result < 1) {
perror("select");
exit(1);
}
for (fd = 0; fd < FD_SETSIZE; fd++) {
if (FD_ISSET(fd, &testfds)) {
if (fd == server_sockfd) {
client_len = sizeof(client_address);
client_sockfd = accept(server_sockfd,
(struct sockaddr *)&client_address, &client_len);
FD_SET(client_sockfd, &readfds);
printf("add client on fd %d\n", client_sockfd);
} else {
ioctl(fd, FIONREAD, &nread);
if (nread == 0) {
close(fd);
FD_CLR(fd, &readfds);
printf("remove client on fd %d\n", fd);
} else {
read(fd, &ch, 1);
sleep(5);
printf("serving client on fd %d\n", fd);
ch++;
write(fd, &ch, 1);
}
}
}
}
}
return 0;
}

select-client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int main()
{
int client_sockfd;
int len;
struct sockaddr_in address;
int result;
char ch = 'A';
client_sockfd = socket(AF_INET, SOCK_STREAM, 0);
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = inet_addr("127.0.0.1");
address.sin_port = htons(8888);
result = connect(client_sockfd, (struct sockaddr *)&address, sizeof(address));
write(client_sockfd, &ch, 1);
read(client_sockfd, &ch, 1);
printf("char from server: %c\n", ch);
close(client_sockfd);
return 0;
}

epoll实例

epoll-server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/resource.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define MAXEPOLLSIZE 10000
#define MAXLINE 10240

int handle(int connfd);
int setnonblocking(int sockfd)
{
if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK) == -1)
return -1;
return 0;
}

int main()
{
int servPort = 8888;
int listenq = 1024;

int listenfd, connfd, kdpfd, nfds, n, nread, curfds, acceptCount = 0;
struct sockaddr_in servaddr, cliaddr;
socklen_t socklen = sizeof(struct sockaddr_in);
struct epoll_event ev;
struct epoll_event events[MAXEPOLLSIZE];
struct rlimit rt;
char buf[MAXLINE];
//设置每个进程允许打开的最大文件数
rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
setrlimit(RLIMIT_NOFILE, &rt);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(servPort);

listenfd = socket(AF_INET, SOCK_STREAM, 0);

int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

setnonblocking(listenfd);

bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

listen(listenfd, listenq);
//创建epoll句柄,把listenfd加入到epoll集合里
kdpfd = epoll_create(MAXEPOLLSIZE);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = listenfd;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listenfd, &ev);
curfds = 1;

printf("epollserver startup, port %d, max connection is %d, backlog is %d\n",
servPort, MAXEPOLLSIZE, listenq);
while (1) {
//等待有事件发生
nfds = epoll_wait(kdpfd, events, curfds, -1);
//处理所有事件
for (n = 0; n < nfds; n++) {
if (events[n].data.fd == listenfd) {
connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &socklen);
sprintf(buf, "accept from %s:%d\n",
inet_ntoa(cliaddr.sin_addr),
cliaddr.sin_port);//ntohs
printf("%d:%s", ++acceptCount, buf);
if (curfds >= MAXEPOLLSIZE) {
fprintf(stderr, "too many connection, more than %d\n",
MAXEPOLLSIZE);
close(connfd);
continue;
}
setnonblocking(connfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, connfd, &ev);
curfds++;
continue;
}
//处理客户端请求
if (handle(events[n].data.fd) < 0) {
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd, &ev);
curfds--;
}
}
}
close(listenfd);
return 0;
}

int handle(int connfd)
{
int nread;
char buf[MAXLINE];
nread = read(connfd, buf, MAXLINE);
if (nread == 0) {
printf("client close the connection.\n");
close(connfd);
return -1;
}
if (nread < 0) {
perror("read");
close(connfd);
return -1;
}
write(connfd, buf, nread);
return 0;
}

总结