C++中IO多路复用(select、poll、epoll)的实现
什么是IO多路复用
I/O多路复用(IO multiplexing)是一种并发处理多个I/O操作的机制。它允许一个进程或线程同时监听多个文件描述符(如套接字、管道、标准输入等)的I/O事件,并在有事件发生时进行处理。
传统的I/O模型中,通常使用阻塞I/O和非阻塞I/O来处理单个I/O操作。如果需要同时处理多个I/O操作,那么需要使用多个线程或多个进程来管理和执行这些I/O操作。这种方式会导致系统资源的浪费,且编程复杂度较高。
而I/O多路复用通过提供一个统一的接口,如select
、poll
、epoll
等,来同时监听多个文件描述符的I/O事件。它们会在任意一个文件描述符上有I/O事件发生时立即返回,并告知应用程序哪些文件描述符有事件发生。应用程序可以根据返回的结果来针对有事件发生的文件描述符进行读取、写入或其他操作。
I/O多路复用的优点包括:
- 单个进程或线程可以同时处理多个I/O操作,提高了系统的并发性。
- 避免了大量的进程或线程切换,节约了系统资源
- 使用较少的线程或进程,简化了编程模型和维护工作。
IO多路复用的方式简介
主要的 I/O 多路复用方式有以下几种:
select
:select
是最早的一种 I/O 多路复用方式,可以同时监听多个文件描述符的可读、可写和异常事件。通过在调用select
时传递关注的文件描述符集合,及时返回有事件发生的文件描述符,然后应用程序可以对这些文件描述符进行读写操作。poll
:poll
是select
的一种改进版,也能够同时监听多个文件描述符的可读、可写和异常事件。通过调用poll
时传递关注的文件描述符数组,返回有事件发生的文件描述符,应用程序执行对应的读写操作。epoll
:epoll
是 Linux 特有的一种 I/O 多路复用机制,相较于select
和poll
具有更高的性能,适用于高并发环境。epoll
使用了回调机制来通知应用程序文件描述符上的事件发生,并且支持水平触发(LT,level triggered)和边缘触发(ET,edge triggered)两种模式。
select方式
select
是一种 I/O 多路复用的机制,用于同时监听多个文件描述符的可读、可写和异常事件。它是最早的一种实现,适用于多平台。select几乎在所有的操作系统上都可用,并且拥有相似的接口和语义。这使得应用程序在多个平台上能够以相似的方式使用 select
。
select运行原理
select
函数在阻塞过程中,主要依赖于一个名为 fd_set
的数据结构来表示文件描述符集合。通过向 select
函数传递待检测的 fd_set
集合,可以指定需要检测哪些文件描述符。fd_set
结构一般是通过使用宏函数以及相关操作进行初始化和处理。
fd_set
结构可以用于传递三种不同类型的文件描述符集合,包括读缓冲区、写缓冲区和异常状态。通过将文件描述符放入相应的集合中,程序员可以选择性地检查特定类型的事件或操作。通过使用传出变量,程序员可以获取与就绪状态对应的文件描述符集合,并相应地处理与就绪内容相关的操作。
下面两张图展示了select函数在运行时的逻辑(读缓冲区为例)
select函数使用方法
select函数原型如下:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
nfds
:需要监视的最大文件描述符加1,即待监视的文件描述符的最大值加1。readfds
:可读性检查的文件描述符集合。writefds
:可写性检查的文件描述符集合。exceptfds
:异常条件的文件描述符集合。timeout
:最长等待时间,也可以设置为 NULL,表示一直阻塞直到有事件发生。
函数返回值如下:
- 大于 0:返回值为有事件发生的文件描述符的总数。
- 0:表示超时,没有事件发生。
- -1:出错,可以通过查看全局变量
errno
来获取错误码。
一些值得注意的小细节:
nfds
的值必须是所有待监视文件描述符中最大的值加1。- 在某些平台上,
select
的文件描述符集大小有可能有限制。 - 调用
select
会阻塞等待,直到有事件发生,这会导致效率问题。 - 在多个线程中使用
select
可能需要使用互斥锁来保护传递的文件描述符集。
操作fd_set的API:
void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);
1. FD_ZERO(fd_set *set):清空指定的文件描述符集合 set,将其所有位都置为0。
2. FD_SET(int fd, fd_set *set):将指定的文件描述符 fd 添加到文件描述符集合 set 中,相应的位将被置为1。
3. FD_CLR(int fd, fd_set *set):将指定的文件描述符 fd 从文件描述符集合 set 中移除,相应的位将被清零(置为0)。
4. FD_ISSET(int fd, fd_set *set):检查指定的文件描述符 fd 是否在文件描述符集合 set 中,如果存在,则返回非零值(true);否则,返回零值(false)。
实例
下面是一个利用select实现的客户端与服务器端相互传输的简单示例:
服务器端:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <iostream> using namespace std; int main() // 基于多路复用select函数实现的并行服务器 { // 1 创建监听的fd int lfd = socket(AF_INET, SOCK_STREAM, 0); // 2 绑定 struct sockaddr_in addr; // struct sockaddr_in是用于表示IPv4地址的结构体,它是基于struct sockaddr的扩展。 addr.sin_family = AF_INET; addr.sin_port = htons(9997); addr.sin_addr.s_addr = INADDR_ANY; bind(lfd, (struct sockaddr *)&addr, sizeof(addr)); // 3 设置监听 listen(lfd, 128); // 将监听的fd的状态交给内核检测 int maxfd = lfd; // 初始化检测的读集合 fd_set rdset; fd_set rdtemp; // 清零 FD_ZERO(&rdset); // 将监听的lfd设置到集合当中 FD_SET(lfd, &rdset); // 通过select委托内核检测读集合中的文件描述符状态, 检测read缓冲区有没有数据 // 如果有数据, select解除阻塞返回 while (1) { rdtemp = rdset; int num = select(maxfd + 1, &rdtemp, NULL, NULL, NULL); // 判断连接请求还在不在里面,如果在,则运行accept if (FD_ISSET(lfd, &rdtemp)) { struct sockaddr_in cliaddr; int cliaddrLen = sizeof(cliaddr); int cfd = accept(lfd, (struct sockaddr *)&cliaddr, (socklen_t *)&cliaddrLen); // 得到了有效的客户端文件描述符,将这个文件描述符放入读集合当中,并更新最大值 FD_SET(cfd, &rdset); maxfd = cfd > maxfd ? cfd : maxfd; } // 如果没有建立新的连接,那么就直接通信 for (int i = 0; i < maxfd + 1; i++) { if (i != lfd && FD_ISSET(i, &rdtemp)) { // 接收数据,一次接收10个字节,客户端每次发送100个字节,下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次 // 循环会一直持续, 知道缓冲区数据被读完位置 char buf[10] = {0}; int len = read(i, buf, sizeof(buf)); cout << "len=" <<len<< endl; if (len == 0) // 客户端关闭了连接,,因为如果正好读完,会在select过程中删除 { printf("客户端关闭了连接.....\n"); // 将该文件描述符从集合中删除 FD_CLR(i, &rdset); close(i); } else if (len > 0) // 收到了数据 { // 发送数据 if (len > 2) { write(i, buf, strlen(buf) + 1); cout << "写了一次" << endl; sleep(0.1); } } else { // 异常 perror("read"); FD_CLR(i, &rdset); } } } } return 0; }
客户端:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <iostream> using namespace std; int main() //网络通信的客户端 { // 1 创建用于通信的套接字 int fd=socket(AF_INET,SOCK_STREAM,0); if(fd==-1) { perror("socket"); exit(0); } // 2 连接服务器 struct sockaddr_in addr; addr.sin_family=AF_INET; //ipv4 addr.sin_port=htons(9997);// 服务器监听的端口, 字节序应该是网络字节序 inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr.s_addr); int ret=connect(fd,(struct sockaddr*)&addr,sizeof(addr)); if(ret==-1) { perror("connect"); exit(0); } //通信 while (1) { //读数据 char recvBuf[1024]; //写数据 fgets(recvBuf,sizeof(recvBuf),stdin); write(fd,recvBuf,strlen(recvBuf)+1); int oriLen=strlen(recvBuf)-1; cout<<"strlen(recvBuf)="<<oriLen<<endl; int total_get=0; while (read(fd,recvBuf,sizeof(recvBuf))) { total_get+=10; cout<<"total_get="<<total_get<<" strlen(recvBuf)="<<oriLen<<endl; printf("recv buf: %s\n", recvBuf); if (total_get>=oriLen) { cout<<"out"<<endl; break; } } sleep(1); } close(fd); return 0; }
注意的点
在服务器端中,调用select函数时,因为select函数会将检测的结果写回fd_set,所以如果不做其他操作的话,写回的数据会覆盖掉最初的fd_set,造成错误。所以我们在调用select函数之前可以将fd_set暂时先赋给一个临时变量,如下:
fd_set rdset; fd_set rdtemp; rdtemp = rdset; int num = select(maxfd + 1, &rdtemp, NULL, NULL, NULL);
代码整体工程、在以上内容中加入线程和线程池实现通信的版本可参考:GitHub - BanLi-Official/CppSelect
poll方式
poll方式运行原理
poll
函数是一种 I/O 多路复用机制,类似于 select
函数,但相比 select
更加高效和灵活。poll
通过轮询方式,在用户空间和内核空间之间进行交互。与 select
不同的是,poll
可以支持更大的文件描述符集合,且不会有文件描述符数量限制的问题。同时poll与select不同,select有跨平台的特点,而poll只能在Linux上使用。
poll函数使用方法
poll函数原型如下:
#include <poll.h> struct pollfd { int fd; /* File descriptor to poll. */ short int events; /* Types of events poller cares about. */ short int revents; /* Types of events that actually occurred. */ }; int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds
:一个指向struct pollfd
结构体数组的指针,用于指定待监视的文件描述符及其感兴趣的事件。每个struct pollfd
结构包含一个文件描述符fd
和一个短整型events
,用于指定关注的事件类型。revents
字段在poll
返回时被内核修改,用于指示发生的事件类型。nfds
:表示fds
数组的大小,即待监视的文件描述符数量。timeout
:指定阻塞等待的时间(以毫秒为单位)
poll
函数会阻塞,直到以下三种情况之一发生:
- 有一个或多个文件描述符准备好监听的事件。
- 指定的超时时间到达。
- 发生一个错误。
函数返回值如下:
poll
函数返回一个正整数表示就绪的文件描述符数量,或者返回以下几种特定的值:
- 返回大于 0 的整数:表示有文件描述符就绪的数量。可以通过遍历监视的文件描述符集合,检查
revents
字段来确定哪些文件描述符具体就绪。 - 返回 0:表示在无限等待模式下超时,即指定的超时时间到达,但没有文件描述符就绪。
- 返回 -1:表示发生错误,可以使用
errno
变量获取具体的错误代码。
值得注意的一些小细节:
poll
函数返回后,struct pollfd
结构中的 revents
字段会被修改,以指示每个文件描述符发生的事件类型。可以通过遍历 struct pollfd
数组,在 revents
字段中检查位来判断每个文件描述符的具体就绪事件。在处理 poll
的返回值时,通常的做法是使用 if
或 switch
语句根据每个文件描述符的 revents
值来执行相应的操作,例如读取数据、写入数据、处理异常等。
实例
下面是一个利用poll实现的客户端与服务器端相互传输的简单示例:
服务器端:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <iostream> #include <poll.h> using namespace std; int main() // 基于多路复用select函数实现的并行服务器 { // 1 创建监听的fd int lfd = socket(AF_INET, SOCK_STREAM, 0); // 2 绑定 struct sockaddr_in addr; // struct sockaddr_in是用于表示IPv4地址的结构体,它是基于struct sockaddr的扩展。 addr.sin_family = AF_INET; addr.sin_port = htons(9995); addr.sin_addr.s_addr = INADDR_ANY; bind(lfd, (struct sockaddr *)&addr, sizeof(addr)); // 3 设置监听 listen(lfd, 128); // 将监听的fd的状态交给内核检测 int maxfd = lfd; //创建文件描述符的队列 struct pollfd myfd[100]; for(int i=0;i<100;i++) { myfd[i].fd=-1; myfd[i].events=POLLIN; } myfd[0].fd=lfd; while (1) { //sleep(5); cout<<"poll等待开始"<<endl; int num=poll(myfd,maxfd+1,-1); cout<<"poll等待结束~"<<endl; // 判断连接请求还在不在里面,如果在,则运行accept if(myfd[0].fd && myfd[0].revents==POLLIN) { struct sockaddr_in cliaddr; int cliaddrLen = sizeof(cliaddr); int cfd = accept(lfd, (struct sockaddr *)&cliaddr, (socklen_t *)&cliaddrLen); // 得到了有效的客户端文件描述符,将这个文件描述符放入读集合当中,并更新最大值 for(int i=0 ; i<1024 ;i++)//找到空的位置 { if(myfd[i].fd==-1 && myfd[i].events==POLLIN) { myfd[i].fd=cfd; cout<<"连接成功! fd放在了"<<i<<endl; break; } } maxfd = cfd > maxfd ? cfd : maxfd; } // 如果没有建立新的连接,那么就直接通信 for (int i = 0; i < maxfd + 1; i++) { if (myfd[i].fd && myfd[i].revents==POLLIN && i!=0) { // 接收数据,一次接收10个字节,客户端每次发送100个字节,下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次 // 循环会一直持续, 知道缓冲区数据被读完位置 char buf[10] = {0}; cout<<" 外读"<<endl; int len = read(myfd[i].fd, buf, sizeof(buf)); cout<<"len="<<len<<" i="<<i<<endl; if(len==0) //外部中斷導致的連接中斷 { printf("客户端关闭了连接.....\n"); // 将该文件描述符从集合中删除 myfd[i].fd=-1; break; } cout<<"Get read len="<<len<<endl; if (len == 0) // 客户端关闭了连接,,因为如果正好读完,会在select过程中删除 { printf("客户端关闭了连接.....\n"); // 将该文件描述符从集合中删除 myfd[i].fd=-1; break; } else if (len > 0) // 收到了数据 { // 发送数据 if(len<=2) { cout<<" out!!"<<endl; break; } write(myfd[i].fd, buf, strlen(buf) + 1); if(len<10) { cout<<" out!!"<<endl; break; } sleep(0.1); cout<<"写了一次 寫的內容是:"<<string(buf)<<"###"<<endl; } else { // 异常 perror("read"); myfd[i].fd=-1; break; } } } } return 0; }
客户端:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <iostream> using namespace std; int main() //网络通信的客户端 { // 1 创建用于通信的套接字 int fd=socket(AF_INET,SOCK_STREAM,0); if(fd==-1) { perror("socket"); exit(0); } // 2 连接服务器 struct sockaddr_in addr; addr.sin_family=AF_INET; //ipv4 addr.sin_port=htons(9995);// 服务器监听的端口, 字节序应该是网络字节序 inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr.s_addr); int ret=connect(fd,(struct sockaddr*)&addr,sizeof(addr)); if(ret==-1) { perror("connect"); exit(0); } //通信 while (1) { //读数据 char recvBuf[1024]; //写数据 fgets(recvBuf,sizeof(recvBuf),stdin); write(fd,recvBuf,strlen(recvBuf)+1); int oriLen=strlen(recvBuf)-1; cout<<"strlen(recvBuf)="<<oriLen<<endl; int total_get=0; while (total_get<oriLen) { //cout<<"開始讀"<<endl; read(fd,recvBuf,sizeof(recvBuf)); total_get+=10; //cout<<"total_get="<<total_get<<" strlen(recvBuf)="<<oriLen<<endl; printf("recv buf: %s\n", recvBuf); if (total_get>=oriLen) { cout<<"out"<<endl; break; } } sleep(1); } close(fd); return 0; }
整体工程与线程池版本可以参考:GitHub - BanLi-Official/CppPoll: C++ Network Programming: Linux Operating System Poll Example
epoll方式
epoll运行原理
epoll是Linux下的一种I/O 多路复用机制,可以高效地处理大量的并发连接。
epoll模型使用一个文件描述符(epoll fd)来管理多个其他文件描述符(event fd)。在epoll fd上注册了感兴趣的事件,当有感兴趣的事件发生时,epoll会通知应用程序。相比于传统的select和poll模型,epoll模型有以下几个优势:
高效:在大规模并发连接的场景下,epoll模型可以显著提高效率。使用一个文件描述符来管理多个连接,避免了遍历所有连接的开销。并且epoll使用了“事件通知”的方式,只有在有事件发生时才会通知应用程序,避免了无效轮询。
更快的响应速度:由于epoll是基于事件驱动的模型,在有事件发生时立即通知应用程序,可以更快地响应客户端的请求。
可扩展性好:epoll模型采用了无锁设计,将连接集合的管理交给内核处理,并利用回调函数机制处理连接的读写事件,减少了锁竞争,提高了系统的可扩展性。
epoll使用红黑树来存储和管理注册的事件。红黑树是一种自平衡的二叉搜索树,具有以下特点:
二叉搜索树的性质:红黑树是一棵二叉搜索树,即对于任意一个节点,其左子树的值都小于该节点的值,右子树的值都大于该节点的值。
自平衡性:红黑树通过对节点进行一系列旋转和重新着色操作来保持树的平衡。具体来说,红黑树通过五个性质来保持平衡:根节点是黑色的、叶子节点(NIL节点)是黑色的、红色节点的两个子节点都是黑色的、从任一节点到其叶子节点的所有路径都包含相同数目的黑色节点、新插入的节点是红色的。
红黑树介绍可以参考百度百科:红黑树_百度百科
在epoll模型中,当应用程序调用epoll_ctl函数注册事件时,epoll将会将文件描述符和其对应的事件信息存储到红黑树中,这样可以方便地查询和管理事件。红黑树的高效查询特性可以快速找到特定文件描述符对应的事件信息,并且可以保持事件信息的有序性。
当有事件发生时,epoll调用epoll_wait函数去查询红黑树上已注册的事件,如果有匹配的事件发生,就会通知应用程序进行处理。红黑树是epoll实现高效I/O多路复用的关键技术之一。通过使用红黑树,epoll可以将事件的查询、插入和删除等操作的时间复杂度降低到O(log n),使得在大规模并发连接的场景下也能够高效地处理事件。
epoll函数使用方法
在Linux下,epoll函数主要包括以下几个:
#include <sys/epoll.h> //头文件 int epoll_create(int size); //创建一个epoll实例 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //控制epoll上的事件 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); //阻塞等待事件发生
同时在这些参数中,有一个重要的数据结构epoll_event。epoll_event结构体用于描述事件,包括文件描述符、事件类型和事件数据。其中的定义如下:
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ } __EPOLL_PACKED;
其中,events
是事件类型,包括以下几种:
EPOLLIN
:可读事件,表示连接上有数据可读。EPOLLOUT
:可写事件,表示连接上可以写入数据。EPOLLPRI
:紧急事件,表示连接上有紧急数据可读。EPOLLRDHUP
:连接关闭事件,表示连接已关闭。EPOLLERR
:错误事件,表示连接上发生错误。EPOLLHUP
:挂起事件,表示连接被挂起。
结构体中的epoll_data
是一个联合体,用于在epoll_event
结构体中传递事件数据。它有四个成员变量,可以根据具体的需求选择使用其中的一个。通常可以选择int类型的fd,用于存储发生对应事件的文件描述符
epoll_create函数:创建一个epoll fd,返回一个新的epoll文件描述符。参数size
用于指定监听的文件描述符个数,但是在Linux 2.6.8之后的版本,该参数已经没有实际意义。传入一个大于0的值即可。
int epfd=epoll_create(1);
epoll_ctl函数:用于控制epoll事件的函数之一。它用于向epoll实例中添加、修改或删除关注的文件描述符和对应事件。函数原型如下:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数参数:
epfd
:epoll文件描述符,通过epoll_create
函数创建获得。op
:操作类型,可以是以下三种取值之一:EPOLL_CTL_ADD
:将文件描述符添加到epoll实例中。EPOLL_CTL_MOD
:修改已添加到epoll实例中的文件描述符的关注事件。EPOLL_CTL_DEL
:从epoll实例中删除文件描述符。
fd
:要控制的文件描述符。event
:指向epoll_event
结构体的指针,用于指定要添加、修改或删除的事件。
函数返回值:
- 成功时返回0,表示操作成功。
- 失败时返回-1,并设置errno错误码来指示具体错误原因。
epoll_wait函数:用于等待事件的发生。它会一直阻塞直到有事件发生或超时。函数原型如下:
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
函数参数:
epfd
:epoll文件描述符,通过epoll_create
函数创建获得。events
:用于接收事件的epoll_event
结构体数组。maxevents
:events
数组的大小,表示最多可以接收多少个事件。timeout
:超时时间,单位为毫秒,表示epoll_wait函数阻塞的最长时间。常用的取值有以下三种:-1
:表示一直阻塞,直到有事件发生。0
:表示立即返回,不管有没有事件发生。> 0
:表示等待指定的时间(以毫秒为单位),如果在指定时间内没有事件发生,则返回。
函数返回值:
- 成功时返回接收到的事件的数量。如果超时时间为0并且没有事件发生,则返回0。
- 失败时返回-1,并设置errno错误码来指示具体错误原因。
一些要注意的点:
在epoll_wait函数中用于接收事件的epoll_event
结构体数组是一个传出参数,需要定义一个epoll_event的数组,比如:
struct epoll_event evens[100];//用于接取传出的内容 int len=sizeof(evens)/sizeof(struct epoll_event);
工作模式
epoll
的工作模式可以分为两种:边缘触发(Edge Triggered, ET)模式和水平触发(Level Triggered, LT)模式。一般epoll运行的模式默认是水平触发模式。
水平模式
有事件就一直不断通知(默认就是这个)
- 当被监控的文件描述符上的状态发生变化时,
epoll
会不断通知应用程序,直到应用程序处理完事件并返回。 - 如果应用程序没有处理完事件,而文件描述符上的状态再次发生变化,
epoll
会再次通知应用程序。 - 应用程序可以使用阻塞或非阻塞I/O来处理事件。
- 水平触发模式适合处理低并发的I/O场景。
实例
服务器端:
// server.c #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <iostream> using namespace std; int main() { // 1. 创建监听的套接字 int lfd = socket(AF_INET, SOCK_STREAM, 0); if(lfd == -1) { perror("socket"); exit(0); } // 2. 将socket()返回值和本地的IP端口绑定到一起 struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(9996); // 大端端口 // INADDR_ANY代表本机的所有IP, 假设有三个网卡就有三个IP地址 // 这个宏可以代表任意一个IP地址 // 这个宏一般用于本地的绑定操作 addr.sin_addr.s_addr = INADDR_ANY; // 这个宏的值为0 == 0.0.0.0 // inet_pton(AF_INET, "192.168.8.161", &addr.sin_addr.s_addr); int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr)); if(ret == -1) { perror("bind"); exit(0); } // 3. 设置监听 ret = listen(lfd, 128); if(ret == -1) { perror("listen"); exit(0); } int epfd=epoll_create(1); struct epoll_event even; even.events=EPOLLIN; //用水平触发模式来检测 even.data.fd=lfd; ret=epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&even); struct epoll_event evens[100];//用于接取传出的内容 int len=sizeof(evens)/sizeof(struct epoll_event); while (1) { cout<<" 開始等待!!!"<<endl; int num=epoll_wait(epfd,evens,len,-1); cout<<" 等待結束!!!"<<" num="<<num<<endl; for(int i=0;i<num;i++)//取出所有的检测到的事件 { int curfd = evens[i].data.fd; if(evens[i].data.fd==lfd) { struct sockaddr_in *add; int len=sizeof(struct sockaddr_in); int cfd=accept(evens[i].data.fd,NULL,NULL); struct epoll_event even; even.events=EPOLLIN; even.data.fd=cfd; //将接收到的cfd放入epoll检测的红黑树当中 ret=epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&even); if(ret==-1) { cout<<"登錄失敗"<<endl; } else { cout<<"登陸成功,已加入紅黑樹"<<endl; } } else { // 接收数据 char buf[10]; memset(buf, 0, sizeof(buf)); cout<<"正在讀!!!!"<<endl; int len = read(evens[i].data.fd, buf, sizeof(buf)); if(len > 0) { // 发送数据 if(len<=2) { cout<<" out!!"<<endl; break; } printf("客户端say: %s\n", buf); write(evens[i].data.fd, buf, len); sleep(0.1); } else if(len == 0) { printf("客户端断开了连接...\n"); ret=epoll_ctl(epfd,EPOLL_CTL_DEL,evens[i].data.fd,NULL); close(curfd); //break; } else { perror("read"); ret=epoll_ctl(epfd,EPOLL_CTL_DEL,evens[i].data.fd,NULL); close(curfd); //break; } } } } return 0; }
客户端:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <iostream> using namespace std; int main() //网络通信的客户端 { // 1 创建用于通信的套接字 int fd=socket(AF_INET,SOCK_STREAM,0); if(fd==-1) { perror("socket"); exit(0); } // 2 连接服务器 struct sockaddr_in addr; addr.sin_family=AF_INET; //ipv4 addr.sin_port=htons(9996);// 服务器监听的端口, 字节序应该是网络字节序 inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr.s_addr); int ret=connect(fd,(struct sockaddr*)&addr,sizeof(addr)); if(ret==-1) { perror("connect"); exit(0); } //通信 while (1) { //读数据 char recvBuf[1024]; //写数据 fgets(recvBuf,sizeof(recvBuf),stdin); write(fd,recvBuf,strlen(recvBuf)+1); int oriLen=strlen(recvBuf)-1; cout<<"strlen(recvBuf)="<<oriLen<<endl; int total_get=0; while (total_get<oriLen) { //cout<<"開始讀"<<endl; char recvBuf2[1024]; read(fd,recvBuf2,sizeof(recvBuf2)); total_get+=10; cout<<"total_get="<<total_get<<" strlen(recvBuf)="<<oriLen<<endl; printf("recv buf: %s\n", recvBuf2); if (total_get>=oriLen) { cout<<"out"<<endl; break; } } sleep(1); } close(fd); return 0; }
边沿模式
有事件只通知一次,后续一次处理没解决玩的内容需要程序员自己解决
- 仅当被监控的文件描述符上的状态发生变化时,
epoll
才会通知应用程序。 - 当文件描述符上有数据可读或可写时,
epoll
会立即通知应用程序,并且保证应用程序能够全部读取或写入数据,直到读写缓冲区为空。 - 应用程序需要使用非阻塞I/O来处理事件,以避免阻塞其他文件描述符的事件通知。
- 边缘触发模式适合处理高并发的网络通信场景。
实例
服务器端:
// server.c #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <iostream> #include <fcntl.h> #include <errno.h> using namespace std; int main() { // 1. 创建监听的套接字 int lfd = socket(AF_INET, SOCK_STREAM, 0); if (lfd == -1) { perror("socket"); exit(0); } // 2. 将socket()返回值和本地的IP端口绑定到一起 struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(9996); // 大端端口 // INADDR_ANY代表本机的所有IP, 假设有三个网卡就有三个IP地址 // 这个宏可以代表任意一个IP地址 // 这个宏一般用于本地的绑定操作 addr.sin_addr.s_addr = INADDR_ANY; // 这个宏的值为0 == 0.0.0.0 // inet_pton(AF_INET, "192.168.8.161", &addr.sin_addr.s_addr); int ret = bind(lfd, (struct sockaddr *)&addr, sizeof(addr)); if (ret == -1) { perror("bind"); exit(0); } // 3. 设置监听 ret = listen(lfd, 128); if (ret == -1) { perror("listen"); exit(0); } int epfd = epoll_create(1); struct epoll_event even; even.events = EPOLLIN | EPOLLET; //使用边沿触发模式检测 even.data.fd = lfd; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &even); struct epoll_event evens[100]; // 用于接取传出的内容 int len = sizeof(evens) / sizeof(struct epoll_event); while (1) { cout << " 開始等待!!!" << endl; int num = epoll_wait(epfd, evens, len, -1); cout << " 等待結束!!!" << " num=" << num << endl; for (int i = 0; i < num; i++) // 取出所有的检测到的事件 { int curfd = evens[i].data.fd; if (evens[i].data.fd == lfd) { struct sockaddr_in *add; int len = sizeof(struct sockaddr_in); int cfd = accept(evens[i].data.fd, NULL, NULL); // 将这个文件标识符改为非阻塞模式 int flag = fcntl(cfd, F_GETFL); // 获取该文件描述符的状态标志 flag = O_NONBLOCK; // 设置为 O_NONBLOCK,即非阻塞模式。 fcntl(cfd, F_SETFL, flag); // 将新的状态标志设置为非阻塞模式。 struct epoll_event even; even.events = EPOLLIN | EPOLLET;//使用边沿触发模式检测 even.data.fd = cfd; // 将接收到的cfd放入epoll检测的红黑树当中 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &even); if (ret == -1) { cout << "登錄失敗" << endl; } else { cout << "登陸成功,已加入紅黑樹" << endl; } } else { // 接收数据 char buf[10]; memset(buf, 0, sizeof(buf)); cout << "正在讀!!!!" << endl; while (1) // 应对Epoll的ET模式而用的循环read,read要将文件标识符改为非阻塞版本 { int len = read(evens[i].data.fd, buf, sizeof(buf)); if (len > 0) { // 发送数据 if (len <= 2) { cout << " out!!" << endl; break; } printf("客户端say: %s\n", buf); write(evens[i].data.fd, buf, len); sleep(0.1); } else if (len == 0) { printf("客户端断开了连接...\n"); ret = epoll_ctl(epfd, EPOLL_CTL_DEL, evens[i].data.fd, NULL); close(curfd); break; } else { perror("read"); //ret = epoll_ctl(epfd, EPOLL_CTL_DEL, evens[i].data.fd, NULL); //close(curfd); if (errno == EAGAIN) { cout << "接收完毕!" << endl; break; } // break; } } } } } return 0; }
客户端:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <iostream> using namespace std; int main() //网络通信的客户端 { // 1 创建用于通信的套接字 int fd=socket(AF_INET,SOCK_STREAM,0); if(fd==-1) { perror("socket"); exit(0); } // 2 连接服务器 struct sockaddr_in addr; addr.sin_family=AF_INET; //ipv4 addr.sin_port=htons(9996);// 服务器监听的端口, 字节序应该是网络字节序 inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr.s_addr); int ret=connect(fd,(struct sockaddr*)&addr,sizeof(addr)); if(ret==-1) { perror("connect"); exit(0); } //通信 while (1) { //读数据 char recvBuf[1024]; //写数据 fgets(recvBuf,sizeof(recvBuf),stdin); write(fd,recvBuf,strlen(recvBuf)+1); int oriLen=strlen(recvBuf)-1; cout<<"strlen(recvBuf)="<<oriLen<<endl; int total_get=0; while (total_get<oriLen) { //cout<<"開始讀"<<endl; char recvBuf2[1024]; read(fd,recvBuf2,sizeof(recvBuf2)); total_get+=10; cout<<"total_get="<<total_get<<" strlen(recvBuf)="<<oriLen<<endl; printf("recv buf: %s\n", recvBuf2); if (total_get>=oriLen) { cout<<"out"<<endl; break; } } sleep(1); } close(fd); return 0; }
边沿模式需要注意的点
由于边沿模式只通知一次事件发生,所以当我们服务器端接收来自客户端的较为长的内容时,可能会出现,一次无法完全接收的情况。而边沿模式又只通知一次,所以此时没读取完的内容可能无法及时读取。为了应对这个问题,我们可以采取循环接收的方法,如:
while (1) // 应对Epoll的ET模式而用的循环read, { int len = read(evens[i].data.fd, buf, sizeof(buf)); if (len > 0) { // 发送数据 } else if (len == 0) { printf("客户端断开了连接...\n"); break; } else { perror("read"); break; } }
应用程序在处理事件时需要使用非阻塞I/O,确保能够立即处理事件并避免阻塞其他事件的通知。需要注意将被监控的文件描述符设置为非阻塞状态,以确保事件的及时处理。可以使用fcntl
函数的O_NONBLOCK
标志来将文件描述符设置为非阻塞模式。(因为如果不设置为非阻塞模式的话,服务器端在循环读取客户端发来的内容时,如果读完了内容,应用程序就会阻塞在read函数部分)将其设置为非阻塞模式后,我们在读取完内容之后,就可以根据read返回的EAGAIN错误(接收缓冲区为空时会报)来跳出循环。设置方式如下:
int cfd = accept(evens[i].data.fd, NULL, NULL); // 将这个文件标识符改为非阻塞模式 int flag = fcntl(cfd, F_GETFL); // 获取该文件描述符的状态标志 flag = O_NONBLOCK; // 设置为 O_NONBLOCK,即非阻塞模式。 fcntl(cfd, F_SETFL, flag); // 将新的状态标志设置为非阻塞模式。 struct epoll_event even; even.events = EPOLLIN | EPOLLET; //使用边沿触发模式检测读缓冲区 even.data.fd = cfd; // 将接收到的cfd放入epoll检测的红黑树当中 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &even);
退出时判断的EAGAIN错误,存在erron.h库中。errno(error number)是C语言标准库(C Standard Library)提供的一个全局变量,用于表示上一次发生的错误代码。errno库提供了一些宏定义和函数,用于获取和处理错误代码。需要注意的是,errno是全局变量,在多线程环境下需要注意线程安全。如:
while (1) // 应对Epoll的ET模式而用的循环read, { int len = read(evens[i].data.fd, buf, sizeof(buf)); if (len > 0) { // 发送数据 } else if (len == 0) { printf("客户端断开了连接...\n"); break; } else { perror("read"); if (errno == EAGAIN) //判断是否读取完毕 { cout << "接收完毕!" << endl; break; } } }
整体工程与线程池版本可以参考:https://github.com/BanLi-Official/CppEpoll
参考资料
感谢苏丙榅大佬的教程
(C++通讯架构学习笔记):epoll介绍及原理详解_c++ epoll-CSDN博客
C++网络编程select函数原理详解_c++ select-CSDN博客
到此这篇关于C++中IO多路复用(select、poll、epoll)的实现的文章就介绍到这了,更多相关C++ IO多路复用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
最新评论