12. I/O 复用
2025/12/16大约 8 分钟
12. I/O 复用
12.1 基于I/O 复用的服务器端
复用的概念
- “在一个通信频道中传递多个数据(信号)的技术”
- “为了提高物理设备的效率,用最少的物理要素传递最多数据时使用的技术”
- 纸杯电话系统模型:多个纸杯,一条串联起来的连接线
在服务端中的应用:一个服务端进程同时连接多个客户端
复用的特点:
- 不能同时进行通讯,即虽然同时连接了多个客户端,但是需要按顺序逐一处理
12.2 理解select函数并实现服务器端
NAME
select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing
SYNOPSIS
/* According to POSIX.1-2001, POSIX.1-2008 */
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
The time structures involved are defined in <sys/time.h> and look like
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};| 参数/函数 | 值 | 用途 | 说明 |
|---|---|---|---|
| readfds | be watched to see if characters become available for reading | 在readfds中的文件描述符,被监视是否可读 对应于服务端socket是否受到数据 | |
| writefds | be watched to see if space is available for write | 是否可写 | |
| exceptfds | be watched for exceptions. | 是否发生异常 | |
| timeout | The timeout argument specifies the interval that select() should block waiting for a file descriptor to become ready. | select()会阻塞,直到以下三种中的任意一种情况发生: * a file descriptor becomes ready; * the call is interrupted by a signal handler; or * the timeout expires. select() 都会返回 | |
| nfds | is the highest-numbered file descriptor in any of the three sets, plus 1. | 比三种监视的文件描述符中的最大编号fd 大1 | |
| FD_CLR | remove a given file descriptor from a set. | ||
| FD_ZERO | clears a set. | ||
| FD_SET | add a given file descriptor from a set | ||
| FD_ISSET | tests to see if a file descriptor is part of the set; | 判断文件描述符对应的bit位值是否为1,在select()返回后判断文件描述符是否变化 | |
| tv_sec | 超时时间中的秒数 | ||
| tv_usec | 超时时间中的微秒数 | ||
| RETURN | On success, select() and pselect() return the number of file descriptors contained in the three returned descriptor sets 0: if the timeout expires before anything interesting happens. |
根据select的作用,需要在调用前设置好监视的fd,以及超时时间
设置文件按描述符
fd_set 的定义:
/* The fd_set member is required to be an array of longs. */
typedef long int __fd_mask;
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
/* Number of descriptors that can fit in an `fd_set'. */
#define __FD_SETSIZE 1024
/* fd_set for select and pselect. */
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;fd_set中bit 位序号对应文件描述符,如bit0对应文件描述符0,bit1 对应文件描述符1。如果文件描述符在监视访问内,则对应的bit位的值为1.
使用宏设置fd_set
| 代码 | fd0 | fd1 | fd2 | fd[0] |
|---|---|---|---|---|
| fd_set set; FD_ZERO(&set); | 0 | 0 | 0 | 0 |
| FD_SET(1, &set); | 0 | 1 | 0 | 1 |
| FD_SET(2, &set); | 0 | 1 | 1 | 6 |
| FD_CLR(2, &set); | 0 | 1 | 0 | 2 |
设置检查范围及超时
timeval timeout;
timeout.tv_sec = 2;
timeout.tv_usec = 0;调用select 函数后查看结果
| fd0 | fd1 | fd2 | 说明 | |
|---|---|---|---|---|
| 初始 | 0 | 1 | 1 | 监视fd1和fd2 |
| 第一次调用 | 0 | 1 | 0 | fd1满足条件,fd2不满足 |
| 第二次调用 | 0 | 0 | 1 | fd2满足条件,fd1不满足 |
相关信息
可以认为值仍为1的位置上的文件描述符发生了变化
应该是满足监视条件的的文件描述符对应的fd_set中的bit位值为1.
示例
从标准输入读取数据
实现:
int main(int argc, char* argv[])
{
fd_set set, temp_set;
FD_ZERO(&set);
FD_SET(0,&set);
while(1) {
temp_set = set;
timeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;
int result = select(1, &temp_set, NULL, NULL, &timeout);
if(result == -1) {
error_handling("select() error!");
}
else if(result == 0) {
printf("Time-out occurred! No data after 5 seconds.\n");
}
else {
printf("time sec:%d, usec:%d\n", (int)timeout.tv_sec, (int)timeout.tv_usec);
if(FD_ISSET(0, &temp_set)) {
char buf[BUF_SIZE];
fgets(buf, BUF_SIZE, stdin);
printf("Data received: %s", buf);
}
}
}
return 0;
}| 行号 | 功能 | 说明 |
|---|---|---|
| 3-5 | 设置标准输入为要监视的文件 | |
| 8 | 重置要监视的文件描述符集合 | 因为select()会修改监视的文件描述符集合,将状态没有变化的文件描述符位置位0,所以每次select()前均需重置为初始集合 |
| 8-11 | 设置超时时间 | 每次select()前均需重新设置,因为调用select()后timeval 的成员值被替换为超时前剩余时间。如果不修改,那么超时时间会越来越短,导致最后select基本不会等待 |
| 19-26 | 读取stdin数据并打印 |
效果:
ming@ubuntu:/media/sf_share/Network/build$ ./Network
abc
time sec:3, usec:814954
Data received: abc
123
time sec:3, usec:588714
Data received: 123
Time-out occurred! No data after 5 seconds.实现 I/O 复用服务器端
参考
int main(int argc, char *argv[])
{
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
struct timeval timeout;
fd_set reads, cpy_reads;
socklen_t adr_sz;
int fd_max, str_len, fd_num, i;
char buf[BUF_SIZE];
if(argc!=2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}
serv_sock=socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family=AF_INET;
serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_adr.sin_port=htons(atoi(argv[1]));
if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
error_handling("bind() error");
if(listen(serv_sock, 5)==-1)
error_handling("listen() error");
FD_ZERO(&reads);
FD_SET(serv_sock, &reads);
fd_max=serv_sock;
while(1)
{
cpy_reads=reads;
timeout.tv_sec=5;
timeout.tv_usec=5000;
if((fd_num=select(fd_max+1, &cpy_reads, 0, 0, &timeout))==-1)
break;
if(fd_num==0)
continue;
for(i=0; i<fd_max+1; i++)
{
if(FD_ISSET(i, &cpy_reads))
{
if(i==serv_sock) // connection request!
{
adr_sz=sizeof(clnt_adr);
clnt_sock=
accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
FD_SET(clnt_sock, &reads);
if(fd_max<clnt_sock)
fd_max=clnt_sock;
printf("connected client: %d \n", clnt_sock);
}
else // read message!
{
str_len=read(i, buf, BUF_SIZE);
if(str_len==0) // close request!
{
FD_CLR(i, &reads);
close(i);
printf("closed client: %d \n", i);
}
else
{
write(i, buf, str_len); // echo!
}
}
}
}
}
close(serv_sock);
return 0;
}| 行号 | 功能 | 说明 |
|---|---|---|
| 27-29 | 监听socket可以读表示有连接请求 | |
| 40、41 | 如果是因为超时导致select返回,则继续等待连接 | |
| 47-56 | 建立连接,添加fd,更新fd_max | |
| 57-70 | 已连接socket处理 |
我的
每次接收客户端连接时将其加入到监视的fd_set 中,在连接断开时将其从fd_set 中清除。
实现:
int main(int argc, char* argv[])
{
if(argc!=3){
printf("Usage : %s <IP> <port>\n", argv[0]);
exit(1);
}
int sock;
struct sockaddr_in serv_addr;
char message[BUF_SIZE];
int str_len;
sock = socket(PF_INET, SOCK_STREAM, 0);
if(sock==-1)
error_handling("socket() error");
int option = true;
socklen_t optlen = sizeof(option);
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*)&option, optlen);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=inet_addr(argv[1]);
serv_addr.sin_port=htons(atoi(argv[2]));
if(bind(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr))==-1)
error_handling("bind() error");
if(listen(sock, 5)==-1)
error_handling("listen() error");
int flags = fcntl(sock, F_GETFL, 0);
fcntl(sock, F_SETFL, flags | O_NONBLOCK);
fd_set set, temp_set;
FD_ZERO(&set);
int fd_max = -1;
while(1)
{
int clnt_sock;
struct sockaddr_in clnt_addr;
socklen_t clnt_addr_size = sizeof(clnt_addr);
clnt_sock = accept(sock, (struct sockaddr*)&clnt_addr, &clnt_addr_size);
if(clnt_sock == -1) {
if(errno != EWOULDBLOCK && errno != EAGAIN) {
error_handling("accept() error");
}
}
else {
FD_SET(clnt_sock, &set);
if(clnt_sock > fd_max) {
fd_max = clnt_sock;
}
printf("New client connected addr:%s, port:%d, fd_max updated to: %d\n", inet_ntoa(clnt_addr.sin_addr), ntohs(clnt_addr.sin_port), fd_max);
}
temp_set = set;
timeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;
int result = select(fd_max + 1, &temp_set, NULL, NULL, &timeout);
printf("result of select(): %d\n", result);
if(result < 0) {
error_handling("select() error");
} else if(result == 0) {
printf("Timeout occurred! No data within 5 seconds.\n");
continue;
} else {
for(int i=0; i <= fd_max; i++)
{
if(FD_ISSET(i, &temp_set))
{
int readlen = read(i, message, BUF_SIZE);
if(readlen < 0) {
error_handling("read() error");
}
else if(readlen == 0) {
close(i);
FD_CLR(i, &set);
} else {
message[readlen] = '\0';
printf("Received from client %d: %s\n", i, message);
}
}
}
}
}
return 0;
}| 行号 | 功能 | 说明 |
|---|---|---|
| 8-29 | 创建并设置服务端socket | 设置其选项为 SO_REUSEADDR 便于测试 |
| 31、32 | 设置socket 为非阻塞类型 | 设置flag为 O_NONBLOCK 后accept就不会阻塞等待有连后才返回 |
| 45、46 | 判断accept出错类型,对于非阻塞的accept,在没有连接时返回-1,errno 被设置为 EWOULDBLOCK 或 EAGAIN,这里需要排除掉这两种 | |
| 50 | 将新的连接条件到监视集合中 | |
| 51-53 | 更新最大的监视fd值 | 首先正常的fd值是自然数,大于fd_max的初始。其次如果新的连接的fd 大于已连接的fd的最大值,则更新fd_max |
| 57 | 初始临时的set | |
| 70 | 遍历set查看发生变化的fd | 问:这里能够利用select()的返回值吗?难道必须遍历整个fd_set直到fd_max? |
| 79、80 | 如果read 返回0,表示断开连接。关闭socket,并从fd_set中清除fd(不再监视其状态) | |
| 82、83 | 读取数据并打印 |
问:新建的连接socket其fd 一定大于之前连接socket的fd?
效果:
ming@ubuntu:/media/sf_share/Network/build$ ./Network 192.168.56.101 10086
result of select(): 0
Timeout occurred! No data within 5 seconds.
New client connected addr:192.168.56.1, port:7640, fd_max updated to: 4
result of select(): 1
Received from client 4: 123
New client connected addr:192.168.56.1, port:7641, fd_max updated to: 5
result of select(): 1
Received from client 5: 789
result of select(): 1
Client 4 disconnected.
result of select(): 1
Received from client 5: abc
result of select(): 1
Client 5 disconnected.
result of select(): 0
Timeout occurred! No data within 5 seconds.| 行号 | 说明 |
|---|---|
| 5 | 连接到客户端0,对应的socket fd为4 |
| 7 | 接收到客户端0的消息:123 |
| 8 | 连接到客户端1,对应的socket fd为5 |
| 10 | 受到客户端1的消息:789 |
| 12 | 客户端0断开连接 |
| 16 | 客户端1断开连接 |
对比参考:
- 监听socket 本身可以被监视,其可读表示有新的连接请求
- 不同类型的fd 可以放到一个fd_set 中进行处理,但是在处理时要进行区分