admin管理员组

文章数量:1037775

【Linux网络】多路转接:select、poll、epoll

1、select

在Linux中,常见的多路转接/复用有 selectpollepoll

多路转接的核心作用就是:对多个文件描述符进行等待(手段),通知上层哪些文件描述符已经就绪,本质是一种对IO事件就绪的通知机制。

select 函数原型:

代码语言:javascript代码运行次数:0运行复制
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
		   fd_set *exceptfds, struct timeval *timeout);

参数:

  • nfds 是需要监视的最大文件描述符值+1
  • readfds、writefds、exceptfds分别对应需要检测的可读、可写、异常文件描述符的集合(这三个参数是输入输出型参数,每次调用select,都要对输入参数重新设置。
  • 参数 timeout 为结构 timeval, 用来设置 select()的等待时间

timeval 结构:

代码语言:javascript代码运行次数:0运行复制
struct timeval {
    time_t tv_sec;    // 秒
    suseconds_t tv_usec; // 微秒
};

timeval 结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回, 返回值为 0。

参数 timeout 取值:

  • NULL: 表示 select() 没有 timeoutselect 将一直被阻塞,直到某个文件描述符上发生了事件
  • {0,0} (非阻塞等待): 仅检测描述符集合的状态, 然后立即返回, 并不等待外部事件的发生
  • 特定的时间值: 如果在指定的时间段里没有事件发生, select 将超时返回,否则返回还剩余多少时间

函数返回值:

  • n > 0:就绪了多少个fd
  • n == 0:底层没有fd就绪,超时了,没有返回
  • n < 0:select等待失败了

关于select

  • fd_set 就是一个 struct+数组 的位图
  • fd_set 是一种具体的数据类型,大小是固定的,所以 fd_set 能够包含的fd的个数是有上限的,即 select 能管理的fd个数是有上限的
  • fd_set 中的每一个比特位对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个文件描述符

操作 fd_set 的接口:

代码语言:javascript代码运行次数:0运行复制
void FD_CLR(int fd, fd_set *set); // 清除描述词组set中相关fd的位
int FD_ISSET(int fd, fd_set *set); // 测试描述词组set中相关fd的位是否为真
void FD_SET(int fd, fd_set *set); // 设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 清除描述词组set的全部位

select 核心代码:

代码语言:javascript代码运行次数:0运行复制
fd_set rfds;
FD_SET(fd, &rfds);
select(fd + 1, &rfds, nullptr, nullptr, nullptr);
if(FD_ISSET(fd, rfds))
{...}

select 执行过程:

  1. 执行 fd_set rfdsFD_ZERO(&rfdst),则rfds用位表示是 0000 0000
  2. 若 fd= 5,执行 FD_SET(fd, &rfds),后rfds变为 0001 0000
  3. 若再加入 fd= 2, fd=1,则rfds变为 0001 0011
  4. 执行 select(6, &rfds, 0, 0, 0)阻塞等待
  5. 若 fd=1,fd=2 上都发生可读事件, 则 select 返回, 此时rfds变为0000 0011,而没有事件发生的 fd=5 被清空

select 的特点

  • select 可管理的fd有上限,可监控的文件描述符个数取决于 sizeof(fd_set) 的值。
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array (辅助数组)保存到 select 监控集中的 fd:
    1. 一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断
    2. 二是 select 返回后会把以前加入的但并无事件发生的 fd 清空, 则每次开始select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先), 扫描 array 的同时 取得 fd 最大值 maxfd, 用于 select 的第一个参数。

select 的缺点:

  • 每次调用 select 都需要手动设置 fd 集合,不方便
  • 每次调用 select 都需要在内核遍历传递进来的所有 fd,把 fd 集合从用户态拷贝到内核态, fd很多时开销很大
  • select 支持的文件描述符数量太小

select 使用示例:

代码语言:javascript代码运行次数:0运行复制
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include "Log.hpp"
#include "Socket.hpp"

#define NUM sizeof(fd_set) * 8
using namespace SocketModule;
using namespace LogModule;

const int gdefaultfd = -1;

class SelectServer
{
public:
    SelectServer(uint16_t port)
        : _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        // 初始化辅助数组
        for (int i = 0; i < NUM; i++)
        {
            _fd_array[i] = gdefaultfd;
        }

        // 将listensockfd添加到数组中
        _fd_array[0] = _listen_socket->Fd();
    }

    void Loop()
    {
        fd_set rfds; // 读文件描述符集
        _isrunning = true;
        while (_isrunning)
        {
            // 1.清空rfds
            FD_ZERO(&rfds);
            int maxfd = gdefaultfd;

            // // 2.将listensockfd添加到rfds中
            // FD_SET(_listen_socket->Fd(), &rfds);
            for (int i = 0; i < NUM; i++)
            {
                if (_fd_array[i] == gdefaultfd)
                    continue;
                // 将合法的fd设置进入集合中
                FD_SET(_fd_array[i], &rfds);
                if (_fd_array[i] > maxfd)
                {
                    maxfd = _fd_array[i];
                }
            }

            // InetAddr client;
            // // accept 是阻塞式的
            // // 从listensockfd获取新链接,本质也是一种IO
            // int newsockfd = _listen_socket->Accepter(&client);

            struct timeval timeout = {10, 0};
            // 我们不能让accept来阻塞检测链接到来,而应该让select负责进行就绪事件的检测
            int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("select");
                break;
            default:
                std::cout << "有事件就绪了..., timeout: " << timeout.tv_sec << std::endl;
                Dispatcher(rfds); // 把已经就绪的fd,派发给指定的模块
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter()
    {
        InetAddr client;
        // 不会阻塞了,也就是不用等了,只用拷贝就行
        int newfd = _listen_socket->Accepter(&client);
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
            // 把新的newfd添加到辅助数组中
            int pos = -1;
            for (int i = 0; i < NUM; i++)
            {
                if (_fd_array[i] == gdefaultfd)
                {
                    pos = i;
                    break;
                }
            }
            if (pos == -1)
            {
                LOG(LogLevel::ERROR) << "服务器已经满了...";
                close(newfd);
            }
            else
            {
                _fd_array[pos] = newfd;
            }
        }
    }

    void Recver(int who)
    {
        char buffer[1024];
        ssize_t n = recv(_fd_array[who], buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;

            std::string message = "echo# ";
            message += buffer;
            send(_fd_array[who], message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << _fd_array[who];
            close(_fd_array[who]);
            _fd_array[who] = gdefaultfd;
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << _fd_array[who];
            close(_fd_array[who]);
            _fd_array[who] = gdefaultfd;
        }
    }

    void Dispatcher(fd_set &rfds)
    {
        for (int i = 0; i < NUM; i++)
        {
            if (_fd_array[i] == gdefaultfd)
                continue;
            if (_fd_array[i] == _listen_socket->Fd())
            {
                // 是listensockfd,并且已经添加的集合中了
                if (FD_ISSET(_fd_array[i], &rfds))
                {
                    Accepter();
                }
            }
            else
            {
                // 就绪的普通fd
                if (FD_ISSET(_fd_array[i], &rfds))
                {
                    Recver(i);
                }
            }
        }
    }
    ~SelectServer()
    {
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    int _fd_array[NUM]; // 辅助数组
};

select 是一种通用、简便的I/O多路复用机制,适用于小规模连接和处理简单的并发场景。


2、poll

select 一样,poll的定位也是对多个 fd IO事件的等待机制,达到事件派发的目的。

  • 调用poll(fd + events):用户告诉内核,你要帮我关心哪个fd上的哪些事件
  • poll返回(fd + revents):内核告诉用户,你要关心的哪个fd上的哪些事件已经就绪了

参数说明:

  • fds 是一个 poll 函数监听的结构列表,每一个元素中,包含了三部分内容:文件描 述符,监听的事件集合,返回的事件集合
  • nfds 表示 fds 数组的长度(也就是说poll支持的文件描述符理论上没有上限)
  • timeout 表示 poll 函数的超时时间,单位是毫秒。>0:timeout 时间内阻塞式,==0非阻塞,<0阻塞式。

返回值:

  • < 0:出错
  • == 0:等待超时
  • > 0:有n个 fd 就绪

poll 解决了 select 两个最主要的问题:1、支持的文件描述符有限,2、每次调用都 要重新设置参数。

poll 使用示例:

代码语言:javascript代码运行次数:0运行复制
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include <poll.h>
#include "Log.hpp"
#include "Socket.hpp"

#define MAX 4096
using namespace SocketModule;
using namespace LogModule;

const int gdefaultfd = -1;

class PollServer
{
public:
    PollServer(uint16_t port)
        : _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        // 初始化结构体数组
        for (int i = 0; i < MAX; i++)
        {
            _fds[i].fd = gdefaultfd;
            _fds[i].events = 0;
            _fds[i].revents = 0;
        }
        // 将listensockfd添加到数组中
        _fds[0].fd = _listen_socket->Fd();
        _fds[0].events |= POLLIN;
    }

    void Loop()
    {
        _isrunning = true;
        while (_isrunning)
        {
            int timeout = 1000;
            int n = poll(_fds, MAX, timeout);
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("select");
                break;
            default:
                std::cout << "有事件就绪了..." << std::endl;
                Dispatcher(); // 把已经就绪的fd,派发给指定的模块
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter()
    {
        InetAddr client;
        // 不会阻塞了,也就是不用等了,只用拷贝就行
        int newfd = _listen_socket->Accepter(&client);
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
            // 把新的newfd添加到辅助数组中
            int pos = -1;
            for (int i = 0; i < MAX; i++)
            {
                if (_fds[i].fd == gdefaultfd)
                {
                    pos = i;
                    break;
                }
            }
            if (pos == -1)
            {
                // 可以对数组扩容
                LOG(LogLevel::ERROR) << "服务器已经满了...";
                close(newfd);
            }
            else
            {
                _fds[pos].fd = newfd;
                _fds[pos].events |= POLLIN;
            }
        }
    }

    void Recver(int who)
    {
        char buffer[1024];
        ssize_t n = recv(_fds[who].fd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;

            std::string message = "echo# ";
            message += buffer;
            send(_fds[who].fd, message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << _fds[who].fd;
            close(_fds[who].fd);
            _fds[who].fd = gdefaultfd;
            _fds[who].events = _fds[who].revents = 0;
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << _fds[who].fd;
            close(_fds[who].fd);
            _fds[who].fd = gdefaultfd;
            _fds[who].events = _fds[who].revents = 0;
        }
    }

    void Dispatcher()
    {
        for (int i = 0; i < MAX; i++)
        {
            if (_fds[i].fd == gdefaultfd)
                continue;
            if (_fds[i].fd == _listen_socket->Fd())
            {
                // 是listensockfd,并且已经添加的集合中了
                if (_fds[i].revents & POLLIN)
                {
                    Accepter();
                }
            }
            else
            {
                // 就绪的普通fd
                if (_fds[i].revents & POLLIN)
                {
                    Recver(i);
                }
            }
        }
    }
    ~PollServer()
    {
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    struct pollfd _fds[MAX];
};

| poll 的缺点: 虽然 poll 已经解决了 select 的两个主要问题,但 poll 中监听的文件描述符数目增多时也会和 select 函数一样,函数返回后需要轮询 pollfd 来获取就绪的描述符,每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中,这种情况下效率也不是很高。同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。


3、epoll

epoll 是 Linux 系统中一种高效的 I/O 事件通知机制,常用于处理大量文件描述符的 I/O 事件,特别适合高并发场景。它是 selectpoll 的改进版本,几乎解决了它们的所有问题。对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用 epoll

3.1 epoll_create
代码语言:javascript代码运行次数:0运行复制
int epoll_create(int size);
  • 在内核中创建一个 epoll 模型,并返回文件描述符用于后续的操作;
  • 自从 linux2.6.8 之后,size 参数是被忽略的,可以随意设置,但是必须大于0
  • 用完之后, 必须调用 close() 关闭。

epoll 模型:

  • 红黑树:存储需要监听的文件描述符及其对应的事件,红黑树的特点使得 epoll能够高效地管理大量的文件描述符;
  • 就绪队列:存储已经就绪的文件描述符及其事件,当某个文件描述符上的事件就绪时,内核会将其从红黑树中移动到就绪队列中,用户态的程序通过调用 epoll_wait 函数来等待这些就绪的文件描述符,并从就绪队列中获取;
  • 回调机制:内核与底层驱动程序之间建立的一种回调关系。当底层驱动程序检测到某个文件描述符上的事件就绪时,它会通过回调机制通知内核,内核将这个文件描述符移动到就绪队列中,以便用户态的程序可以获取到它。

3.2 epoll_ctl

epoll 的事件注册函数。

代码语言:javascript代码运行次数:0运行复制
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
  • epfdepoll_create() 的返回值
  • op:操作,用三个宏表示:
    • EPOLL_CTL_ADD:注册新的 fd 到 epfd 中
    • EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件
    • EPOLL_CTL_DEL:从 epfd 中删除一个 fd
  • fd:需要监听的 fd
  • event:需要监听的事件

epoll_event 的结构:

代码语言:javascript代码运行次数:0运行复制
typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};

events 可以是以下几个宏的集合:

说明

EPOLLIN

文件描述符可以读 (包括对端 SOCKET 正常关闭)

EPOLLOUT

文件描述符可以写

EPOLLPRI

文件描述符有紧急的数据可读 (这里应该表示有带外数据到来)

EPOLLERR

文件描述符发生错误

EPOLLHUP

文件描述符被挂断

EPOLLET

将 epoll 设为边缘触发(Edge Triggered)模式

EPOLLONESHOT

只监听一次事件,当监听完这次事件之后如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 epoll 队列里


3.3 epoll_wait
代码语言:javascript代码运行次数:0运行复制
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, 
			   int timeout);
  • epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针,内核 只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存)
  • maxevents 告之内核这个 events 有多大, maxevents 的值不能大于创建epoll_create()时的 size
  • 参数 timeout 是超时时间 (毫秒,0 是非阻塞,< 0 是永久阻塞)
  • 如果函数调用成功,返回对应 I/O 上已准备好的文件描述符数目,如返回 0 表 示已超时,返回小于 0 表示函数失败

3.4 epoll 工作原理
  • 当进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关。
代码语言:javascript代码运行次数:0运行复制
struct eventpoll{
	....
	/*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件*/
	struct rb_root rbr;
	
	/*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/
	struct list_head rdlist;
	....
};
  • 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件;
  • 这些事件都会挂载到红黑树中,能够在O(log n)时间复杂度内完成数据的增删改查操作;
  • 所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,当响应的事件发生时会调用回调方法 ep_poll_callback,它会将发生的事件添加到 rdlist 双链表中;
  • epoll 中,对于每一个事件,都会建立一个 epitem 结构体。
代码语言:javascript代码运行次数:0运行复制
struct epitem{
	struct rb_node rbn;//红黑树节点
	struct list_head rdllink;//双向链表节点
	struct epoll_filefd ffd; //事件句柄信息
	struct eventpoll *ep; //指向其所属的 eventpoll 对象
	struct epoll_event event; //期待发生的事件类型
}
  • 当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 即可;
  • 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户,这个操作的时间复杂度是 O(1)。

3.5 epoll 优点(重点)

对比 selectpollepoll 几乎没有缺点:

  • 接口使用方便:虽然拆分成了三个函数,但是使用起来反而更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离;
  • 事件回调机制:避免使用遍历,而是使用回调函数的方式将就绪的文件描述符结构加入到就绪队列中,直接访问就绪队列就知道哪些文件描述符就绪,时间复杂度 O(1);
  • 性能更高:epoll将文件描述符集合维护在内核态,避免了用户态和内核态之间来回的数据拷贝,同时使用红黑树管理文件描述符集合,提高了数据增删改查的效率;
  • 扩展性更好:epoll能够处理大量的文件描述符,适用于需要同时监视大量I/O操作的应用场景;
  • 使用更灵活:epoll支持水平触发和边缘触发两种模式,用户可以根据需要选择合适的事件触发方式。

3.6 epoll 工作模式
3.6.1 LT
  • epoll 默认的状态就是 LT (Level Triggered)(水平触发)工作模式;
  • 在LT模式下,当一个文件描述符上的I/O事件就绪时,epoll 会立即通知应用程序,但是可以不立即进行处理,或者只处理一部分;
  • 只要文件描述符处于就绪状态(就绪节点一直在 rdlist 中)(例如缓冲区中有数据可读或可写),epoll 就会持续通知应用程序,直到应用程序处理完所有就绪事件并且再次进入阻塞等待状态;
  • 支持阻塞读写和非阻塞读写,适用于对性能要求不高、数据读取量不大或连接数较少的场景。

3.6.2 ET
  • events 设置 EPOLLET ,就可以让 epoll 进入 ET (Edge Triggered)(边缘触发)工作模式;
  • 在ET模式下,只有当文件描述符的状态发生变化时(例如从不可写变为可写,或有新的数据来了),epoll 才会通知应用程序,并且 ET 要求必须立刻处理
  • 通知仅发送一次(该节点在 rdlist 中立马移除),只有一次处理机会,如果应用程序没有及时处理完这个事件,下次等待时将会错过该事件,即使事件仍然处于就绪状态;
  • ET 比 LT 性能更高( epoll_wait 返回次数少了很多),并且 ET 只支持非阻塞的读写ET 模式下要求我们每次都要把数据取完,要保证取完就要循环读取,要循环读取就必须是非阻塞的);
  • 适用于需要高效处理大量并发连接和数据传输的场景,如高性能服务器、实时通信系统等。

ET 为什么比 LT 更高效? ET 不做重复通知,一旦通知,数据必须取完(提高IO带宽),这就倒逼着我们在 ET 模式下不得不把数据取完,另外也会间接地给对方返回一个更大的接收窗口,提高对方滑动窗口的大小(Tcp),提高IO吞吐量。

epoll 使用示例:

代码语言:javascript代码运行次数:0运行复制
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"

#define MAX 4096
using namespace SocketModule;
using namespace LogModule;

const int gdefaultfd = -1;

class EpollServer
{
    static const int revs_num = 64;
public:
    EpollServer(uint16_t port)
        : _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false), _epfd(gdefaultfd)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        // 1.创建epoll模型
        _epfd = epoll_create(256);
        if (_epfd < 0)
        {
            LOG(LogLevel::ERROR) << "epoll create error";
            Die(EPOLL_CREATE_ERROR);
        }
        LOG(LogLevel::DEBUG) << "epoll create success: " << _epfd;

        // 2.将listensockfd添加到epoll模型中
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = _listen_socket->Fd();
        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listen_socket->Fd(), &ev);
        if (n < 0)
        {
            LOG(LogLevel::ERROR) << "epoll_ctl error";
            Die(EPOLL_CTL_ERROR);
        }
    }

    void Loop()
    {
        _isrunning = true;
        while (_isrunning)
        {
            int timeout = -1;
            int n = epoll_wait(_epfd, _revs, revs_num, timeout);
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("epoll_wait");
                break;
            default:
                std::cout << "有事件就绪了..." << std::endl;
                Dispatcher(n); // 把已经就绪的fd,派发给指定的模块
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter()
    {
        InetAddr client;
        // 不会阻塞了,也就是不用等了,只用拷贝就行
        int newfd = _listen_socket->Accepter(&client);
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
            struct epoll_event ev;
            ev.data.fd = newfd;
            ev.events = EPOLLIN;
            int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, newfd, &ev);
            if (n < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl error";
                close(newfd);
            }
        }
    }

    void Recver(int fd)
    {
        char buffer[1024];
        ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;

            std::string message = "echo# ";
            message += buffer;
            send(fd, message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << fd;
            // close(fd);
            // 把fd从epoll中移除,必须保证fd是合法的
            // 应该先把fd从epoll中移除,在关闭fd
            int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            if (n < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl error";
                Die(EPOLL_CTL_ERROR);
            }
            LOG(LogLevel::DEBUG) << "epoll_ctl success: " << fd;
            close(fd);
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << fd;
            // close(fd);
            // 把fd从epoll中移除
            int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            if (n < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl error";
                Die(EPOLL_CTL_ERROR);
            }
            LOG(LogLevel::DEBUG) << "epoll_ctl success: " << fd;
            close(fd);
        }
    }

    void Dispatcher(int rnum)
    {
        for (int i = 0; i < rnum; i++)
        {
            if (_revs[i].data.fd == _listen_socket->Fd())
            {
                if (_revs[i].events == EPOLLIN)
                {
                    Accepter();
                }
            }
            else
            {
                if (_revs[i].events == EPOLLIN)
                {
                    Recver(_revs[i].data.fd);
                }
            }
        }
    }
    ~EpollServer()
    {
        close(_listen_socket->Fd());
        if (_epfd) close(_epfd);
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    int _epfd;
    struct epoll_event _revs[revs_num];
};

本篇文章的分享就到这里了,如果您觉得在本文有所收获,还请留下您的三连支持哦~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-03-10,如有侵权请联系 cloudcommunity@tencent 删除网络linuxepollselect事件

【Linux网络】多路转接:select、poll、epoll

1、select

在Linux中,常见的多路转接/复用有 selectpollepoll

多路转接的核心作用就是:对多个文件描述符进行等待(手段),通知上层哪些文件描述符已经就绪,本质是一种对IO事件就绪的通知机制。

select 函数原型:

代码语言:javascript代码运行次数:0运行复制
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
		   fd_set *exceptfds, struct timeval *timeout);

参数:

  • nfds 是需要监视的最大文件描述符值+1
  • readfds、writefds、exceptfds分别对应需要检测的可读、可写、异常文件描述符的集合(这三个参数是输入输出型参数,每次调用select,都要对输入参数重新设置。
  • 参数 timeout 为结构 timeval, 用来设置 select()的等待时间

timeval 结构:

代码语言:javascript代码运行次数:0运行复制
struct timeval {
    time_t tv_sec;    // 秒
    suseconds_t tv_usec; // 微秒
};

timeval 结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回, 返回值为 0。

参数 timeout 取值:

  • NULL: 表示 select() 没有 timeoutselect 将一直被阻塞,直到某个文件描述符上发生了事件
  • {0,0} (非阻塞等待): 仅检测描述符集合的状态, 然后立即返回, 并不等待外部事件的发生
  • 特定的时间值: 如果在指定的时间段里没有事件发生, select 将超时返回,否则返回还剩余多少时间

函数返回值:

  • n > 0:就绪了多少个fd
  • n == 0:底层没有fd就绪,超时了,没有返回
  • n < 0:select等待失败了

关于select

  • fd_set 就是一个 struct+数组 的位图
  • fd_set 是一种具体的数据类型,大小是固定的,所以 fd_set 能够包含的fd的个数是有上限的,即 select 能管理的fd个数是有上限的
  • fd_set 中的每一个比特位对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个文件描述符

操作 fd_set 的接口:

代码语言:javascript代码运行次数:0运行复制
void FD_CLR(int fd, fd_set *set); // 清除描述词组set中相关fd的位
int FD_ISSET(int fd, fd_set *set); // 测试描述词组set中相关fd的位是否为真
void FD_SET(int fd, fd_set *set); // 设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 清除描述词组set的全部位

select 核心代码:

代码语言:javascript代码运行次数:0运行复制
fd_set rfds;
FD_SET(fd, &rfds);
select(fd + 1, &rfds, nullptr, nullptr, nullptr);
if(FD_ISSET(fd, rfds))
{...}

select 执行过程:

  1. 执行 fd_set rfdsFD_ZERO(&rfdst),则rfds用位表示是 0000 0000
  2. 若 fd= 5,执行 FD_SET(fd, &rfds),后rfds变为 0001 0000
  3. 若再加入 fd= 2, fd=1,则rfds变为 0001 0011
  4. 执行 select(6, &rfds, 0, 0, 0)阻塞等待
  5. 若 fd=1,fd=2 上都发生可读事件, 则 select 返回, 此时rfds变为0000 0011,而没有事件发生的 fd=5 被清空

select 的特点

  • select 可管理的fd有上限,可监控的文件描述符个数取决于 sizeof(fd_set) 的值。
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array (辅助数组)保存到 select 监控集中的 fd:
    1. 一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断
    2. 二是 select 返回后会把以前加入的但并无事件发生的 fd 清空, 则每次开始select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先), 扫描 array 的同时 取得 fd 最大值 maxfd, 用于 select 的第一个参数。

select 的缺点:

  • 每次调用 select 都需要手动设置 fd 集合,不方便
  • 每次调用 select 都需要在内核遍历传递进来的所有 fd,把 fd 集合从用户态拷贝到内核态, fd很多时开销很大
  • select 支持的文件描述符数量太小

select 使用示例:

代码语言:javascript代码运行次数:0运行复制
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include "Log.hpp"
#include "Socket.hpp"

#define NUM sizeof(fd_set) * 8
using namespace SocketModule;
using namespace LogModule;

const int gdefaultfd = -1;

class SelectServer
{
public:
    SelectServer(uint16_t port)
        : _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        // 初始化辅助数组
        for (int i = 0; i < NUM; i++)
        {
            _fd_array[i] = gdefaultfd;
        }

        // 将listensockfd添加到数组中
        _fd_array[0] = _listen_socket->Fd();
    }

    void Loop()
    {
        fd_set rfds; // 读文件描述符集
        _isrunning = true;
        while (_isrunning)
        {
            // 1.清空rfds
            FD_ZERO(&rfds);
            int maxfd = gdefaultfd;

            // // 2.将listensockfd添加到rfds中
            // FD_SET(_listen_socket->Fd(), &rfds);
            for (int i = 0; i < NUM; i++)
            {
                if (_fd_array[i] == gdefaultfd)
                    continue;
                // 将合法的fd设置进入集合中
                FD_SET(_fd_array[i], &rfds);
                if (_fd_array[i] > maxfd)
                {
                    maxfd = _fd_array[i];
                }
            }

            // InetAddr client;
            // // accept 是阻塞式的
            // // 从listensockfd获取新链接,本质也是一种IO
            // int newsockfd = _listen_socket->Accepter(&client);

            struct timeval timeout = {10, 0};
            // 我们不能让accept来阻塞检测链接到来,而应该让select负责进行就绪事件的检测
            int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("select");
                break;
            default:
                std::cout << "有事件就绪了..., timeout: " << timeout.tv_sec << std::endl;
                Dispatcher(rfds); // 把已经就绪的fd,派发给指定的模块
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter()
    {
        InetAddr client;
        // 不会阻塞了,也就是不用等了,只用拷贝就行
        int newfd = _listen_socket->Accepter(&client);
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
            // 把新的newfd添加到辅助数组中
            int pos = -1;
            for (int i = 0; i < NUM; i++)
            {
                if (_fd_array[i] == gdefaultfd)
                {
                    pos = i;
                    break;
                }
            }
            if (pos == -1)
            {
                LOG(LogLevel::ERROR) << "服务器已经满了...";
                close(newfd);
            }
            else
            {
                _fd_array[pos] = newfd;
            }
        }
    }

    void Recver(int who)
    {
        char buffer[1024];
        ssize_t n = recv(_fd_array[who], buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;

            std::string message = "echo# ";
            message += buffer;
            send(_fd_array[who], message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << _fd_array[who];
            close(_fd_array[who]);
            _fd_array[who] = gdefaultfd;
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << _fd_array[who];
            close(_fd_array[who]);
            _fd_array[who] = gdefaultfd;
        }
    }

    void Dispatcher(fd_set &rfds)
    {
        for (int i = 0; i < NUM; i++)
        {
            if (_fd_array[i] == gdefaultfd)
                continue;
            if (_fd_array[i] == _listen_socket->Fd())
            {
                // 是listensockfd,并且已经添加的集合中了
                if (FD_ISSET(_fd_array[i], &rfds))
                {
                    Accepter();
                }
            }
            else
            {
                // 就绪的普通fd
                if (FD_ISSET(_fd_array[i], &rfds))
                {
                    Recver(i);
                }
            }
        }
    }
    ~SelectServer()
    {
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    int _fd_array[NUM]; // 辅助数组
};

select 是一种通用、简便的I/O多路复用机制,适用于小规模连接和处理简单的并发场景。


2、poll

select 一样,poll的定位也是对多个 fd IO事件的等待机制,达到事件派发的目的。

  • 调用poll(fd + events):用户告诉内核,你要帮我关心哪个fd上的哪些事件
  • poll返回(fd + revents):内核告诉用户,你要关心的哪个fd上的哪些事件已经就绪了

参数说明:

  • fds 是一个 poll 函数监听的结构列表,每一个元素中,包含了三部分内容:文件描 述符,监听的事件集合,返回的事件集合
  • nfds 表示 fds 数组的长度(也就是说poll支持的文件描述符理论上没有上限)
  • timeout 表示 poll 函数的超时时间,单位是毫秒。>0:timeout 时间内阻塞式,==0非阻塞,<0阻塞式。

返回值:

  • < 0:出错
  • == 0:等待超时
  • > 0:有n个 fd 就绪

poll 解决了 select 两个最主要的问题:1、支持的文件描述符有限,2、每次调用都 要重新设置参数。

poll 使用示例:

代码语言:javascript代码运行次数:0运行复制
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include <poll.h>
#include "Log.hpp"
#include "Socket.hpp"

#define MAX 4096
using namespace SocketModule;
using namespace LogModule;

const int gdefaultfd = -1;

class PollServer
{
public:
    PollServer(uint16_t port)
        : _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        // 初始化结构体数组
        for (int i = 0; i < MAX; i++)
        {
            _fds[i].fd = gdefaultfd;
            _fds[i].events = 0;
            _fds[i].revents = 0;
        }
        // 将listensockfd添加到数组中
        _fds[0].fd = _listen_socket->Fd();
        _fds[0].events |= POLLIN;
    }

    void Loop()
    {
        _isrunning = true;
        while (_isrunning)
        {
            int timeout = 1000;
            int n = poll(_fds, MAX, timeout);
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("select");
                break;
            default:
                std::cout << "有事件就绪了..." << std::endl;
                Dispatcher(); // 把已经就绪的fd,派发给指定的模块
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter()
    {
        InetAddr client;
        // 不会阻塞了,也就是不用等了,只用拷贝就行
        int newfd = _listen_socket->Accepter(&client);
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
            // 把新的newfd添加到辅助数组中
            int pos = -1;
            for (int i = 0; i < MAX; i++)
            {
                if (_fds[i].fd == gdefaultfd)
                {
                    pos = i;
                    break;
                }
            }
            if (pos == -1)
            {
                // 可以对数组扩容
                LOG(LogLevel::ERROR) << "服务器已经满了...";
                close(newfd);
            }
            else
            {
                _fds[pos].fd = newfd;
                _fds[pos].events |= POLLIN;
            }
        }
    }

    void Recver(int who)
    {
        char buffer[1024];
        ssize_t n = recv(_fds[who].fd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;

            std::string message = "echo# ";
            message += buffer;
            send(_fds[who].fd, message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << _fds[who].fd;
            close(_fds[who].fd);
            _fds[who].fd = gdefaultfd;
            _fds[who].events = _fds[who].revents = 0;
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << _fds[who].fd;
            close(_fds[who].fd);
            _fds[who].fd = gdefaultfd;
            _fds[who].events = _fds[who].revents = 0;
        }
    }

    void Dispatcher()
    {
        for (int i = 0; i < MAX; i++)
        {
            if (_fds[i].fd == gdefaultfd)
                continue;
            if (_fds[i].fd == _listen_socket->Fd())
            {
                // 是listensockfd,并且已经添加的集合中了
                if (_fds[i].revents & POLLIN)
                {
                    Accepter();
                }
            }
            else
            {
                // 就绪的普通fd
                if (_fds[i].revents & POLLIN)
                {
                    Recver(i);
                }
            }
        }
    }
    ~PollServer()
    {
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    struct pollfd _fds[MAX];
};

| poll 的缺点: 虽然 poll 已经解决了 select 的两个主要问题,但 poll 中监听的文件描述符数目增多时也会和 select 函数一样,函数返回后需要轮询 pollfd 来获取就绪的描述符,每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中,这种情况下效率也不是很高。同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。


3、epoll

epoll 是 Linux 系统中一种高效的 I/O 事件通知机制,常用于处理大量文件描述符的 I/O 事件,特别适合高并发场景。它是 selectpoll 的改进版本,几乎解决了它们的所有问题。对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用 epoll

3.1 epoll_create
代码语言:javascript代码运行次数:0运行复制
int epoll_create(int size);
  • 在内核中创建一个 epoll 模型,并返回文件描述符用于后续的操作;
  • 自从 linux2.6.8 之后,size 参数是被忽略的,可以随意设置,但是必须大于0
  • 用完之后, 必须调用 close() 关闭。

epoll 模型:

  • 红黑树:存储需要监听的文件描述符及其对应的事件,红黑树的特点使得 epoll能够高效地管理大量的文件描述符;
  • 就绪队列:存储已经就绪的文件描述符及其事件,当某个文件描述符上的事件就绪时,内核会将其从红黑树中移动到就绪队列中,用户态的程序通过调用 epoll_wait 函数来等待这些就绪的文件描述符,并从就绪队列中获取;
  • 回调机制:内核与底层驱动程序之间建立的一种回调关系。当底层驱动程序检测到某个文件描述符上的事件就绪时,它会通过回调机制通知内核,内核将这个文件描述符移动到就绪队列中,以便用户态的程序可以获取到它。

3.2 epoll_ctl

epoll 的事件注册函数。

代码语言:javascript代码运行次数:0运行复制
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
  • epfdepoll_create() 的返回值
  • op:操作,用三个宏表示:
    • EPOLL_CTL_ADD:注册新的 fd 到 epfd 中
    • EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件
    • EPOLL_CTL_DEL:从 epfd 中删除一个 fd
  • fd:需要监听的 fd
  • event:需要监听的事件

epoll_event 的结构:

代码语言:javascript代码运行次数:0运行复制
typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};

events 可以是以下几个宏的集合:

说明

EPOLLIN

文件描述符可以读 (包括对端 SOCKET 正常关闭)

EPOLLOUT

文件描述符可以写

EPOLLPRI

文件描述符有紧急的数据可读 (这里应该表示有带外数据到来)

EPOLLERR

文件描述符发生错误

EPOLLHUP

文件描述符被挂断

EPOLLET

将 epoll 设为边缘触发(Edge Triggered)模式

EPOLLONESHOT

只监听一次事件,当监听完这次事件之后如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 epoll 队列里


3.3 epoll_wait
代码语言:javascript代码运行次数:0运行复制
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, 
			   int timeout);
  • epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针,内核 只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存)
  • maxevents 告之内核这个 events 有多大, maxevents 的值不能大于创建epoll_create()时的 size
  • 参数 timeout 是超时时间 (毫秒,0 是非阻塞,< 0 是永久阻塞)
  • 如果函数调用成功,返回对应 I/O 上已准备好的文件描述符数目,如返回 0 表 示已超时,返回小于 0 表示函数失败

3.4 epoll 工作原理
  • 当进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关。
代码语言:javascript代码运行次数:0运行复制
struct eventpoll{
	....
	/*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件*/
	struct rb_root rbr;
	
	/*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/
	struct list_head rdlist;
	....
};
  • 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件;
  • 这些事件都会挂载到红黑树中,能够在O(log n)时间复杂度内完成数据的增删改查操作;
  • 所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,当响应的事件发生时会调用回调方法 ep_poll_callback,它会将发生的事件添加到 rdlist 双链表中;
  • epoll 中,对于每一个事件,都会建立一个 epitem 结构体。
代码语言:javascript代码运行次数:0运行复制
struct epitem{
	struct rb_node rbn;//红黑树节点
	struct list_head rdllink;//双向链表节点
	struct epoll_filefd ffd; //事件句柄信息
	struct eventpoll *ep; //指向其所属的 eventpoll 对象
	struct epoll_event event; //期待发生的事件类型
}
  • 当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 即可;
  • 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户,这个操作的时间复杂度是 O(1)。

3.5 epoll 优点(重点)

对比 selectpollepoll 几乎没有缺点:

  • 接口使用方便:虽然拆分成了三个函数,但是使用起来反而更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离;
  • 事件回调机制:避免使用遍历,而是使用回调函数的方式将就绪的文件描述符结构加入到就绪队列中,直接访问就绪队列就知道哪些文件描述符就绪,时间复杂度 O(1);
  • 性能更高:epoll将文件描述符集合维护在内核态,避免了用户态和内核态之间来回的数据拷贝,同时使用红黑树管理文件描述符集合,提高了数据增删改查的效率;
  • 扩展性更好:epoll能够处理大量的文件描述符,适用于需要同时监视大量I/O操作的应用场景;
  • 使用更灵活:epoll支持水平触发和边缘触发两种模式,用户可以根据需要选择合适的事件触发方式。

3.6 epoll 工作模式
3.6.1 LT
  • epoll 默认的状态就是 LT (Level Triggered)(水平触发)工作模式;
  • 在LT模式下,当一个文件描述符上的I/O事件就绪时,epoll 会立即通知应用程序,但是可以不立即进行处理,或者只处理一部分;
  • 只要文件描述符处于就绪状态(就绪节点一直在 rdlist 中)(例如缓冲区中有数据可读或可写),epoll 就会持续通知应用程序,直到应用程序处理完所有就绪事件并且再次进入阻塞等待状态;
  • 支持阻塞读写和非阻塞读写,适用于对性能要求不高、数据读取量不大或连接数较少的场景。

3.6.2 ET
  • events 设置 EPOLLET ,就可以让 epoll 进入 ET (Edge Triggered)(边缘触发)工作模式;
  • 在ET模式下,只有当文件描述符的状态发生变化时(例如从不可写变为可写,或有新的数据来了),epoll 才会通知应用程序,并且 ET 要求必须立刻处理
  • 通知仅发送一次(该节点在 rdlist 中立马移除),只有一次处理机会,如果应用程序没有及时处理完这个事件,下次等待时将会错过该事件,即使事件仍然处于就绪状态;
  • ET 比 LT 性能更高( epoll_wait 返回次数少了很多),并且 ET 只支持非阻塞的读写ET 模式下要求我们每次都要把数据取完,要保证取完就要循环读取,要循环读取就必须是非阻塞的);
  • 适用于需要高效处理大量并发连接和数据传输的场景,如高性能服务器、实时通信系统等。

ET 为什么比 LT 更高效? ET 不做重复通知,一旦通知,数据必须取完(提高IO带宽),这就倒逼着我们在 ET 模式下不得不把数据取完,另外也会间接地给对方返回一个更大的接收窗口,提高对方滑动窗口的大小(Tcp),提高IO吞吐量。

epoll 使用示例:

代码语言:javascript代码运行次数:0运行复制
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"

#define MAX 4096
using namespace SocketModule;
using namespace LogModule;

const int gdefaultfd = -1;

class EpollServer
{
    static const int revs_num = 64;
public:
    EpollServer(uint16_t port)
        : _port(port), _listen_socket(std::make_unique<TcpSocket>()), _isrunning(false), _epfd(gdefaultfd)
    {
    }

    void Init()
    {
        _listen_socket->BuildTcpSocketMethod(_port);
        // 1.创建epoll模型
        _epfd = epoll_create(256);
        if (_epfd < 0)
        {
            LOG(LogLevel::ERROR) << "epoll create error";
            Die(EPOLL_CREATE_ERROR);
        }
        LOG(LogLevel::DEBUG) << "epoll create success: " << _epfd;

        // 2.将listensockfd添加到epoll模型中
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = _listen_socket->Fd();
        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listen_socket->Fd(), &ev);
        if (n < 0)
        {
            LOG(LogLevel::ERROR) << "epoll_ctl error";
            Die(EPOLL_CTL_ERROR);
        }
    }

    void Loop()
    {
        _isrunning = true;
        while (_isrunning)
        {
            int timeout = -1;
            int n = epoll_wait(_epfd, _revs, revs_num, timeout);
            switch (n)
            {
            case 0:
                std::cout << "time out..." << std::endl;
                break;
            case -1:
                perror("epoll_wait");
                break;
            default:
                std::cout << "有事件就绪了..." << std::endl;
                Dispatcher(n); // 把已经就绪的fd,派发给指定的模块
                break;
            }
        }
        _isrunning = false;
    }

    void Accepter()
    {
        InetAddr client;
        // 不会阻塞了,也就是不用等了,只用拷贝就行
        int newfd = _listen_socket->Accepter(&client);
        if (newfd < 0)
            return;
        else
        {
            std::cout << "获得了一个新链接: " << newfd << ", client info: " << client.Addr() << std::endl;
            struct epoll_event ev;
            ev.data.fd = newfd;
            ev.events = EPOLLIN;
            int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, newfd, &ev);
            if (n < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl error";
                close(newfd);
            }
        }
    }

    void Recver(int fd)
    {
        char buffer[1024];
        ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client# " << buffer << std::endl;

            std::string message = "echo# ";
            message += buffer;
            send(fd, message.c_str(), message.size(), 0);
        }
        else if (n == 0)
        {
            LOG(LogLevel::DEBUG) << "客户端退出..., sockfd: " << fd;
            // close(fd);
            // 把fd从epoll中移除,必须保证fd是合法的
            // 应该先把fd从epoll中移除,在关闭fd
            int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            if (n < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl error";
                Die(EPOLL_CTL_ERROR);
            }
            LOG(LogLevel::DEBUG) << "epoll_ctl success: " << fd;
            close(fd);
        }
        else
        {
            LOG(LogLevel::DEBUG) << "客户端读取错误..., sockfd: " << fd;
            // close(fd);
            // 把fd从epoll中移除
            int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            if (n < 0)
            {
                LOG(LogLevel::ERROR) << "epoll_ctl error";
                Die(EPOLL_CTL_ERROR);
            }
            LOG(LogLevel::DEBUG) << "epoll_ctl success: " << fd;
            close(fd);
        }
    }

    void Dispatcher(int rnum)
    {
        for (int i = 0; i < rnum; i++)
        {
            if (_revs[i].data.fd == _listen_socket->Fd())
            {
                if (_revs[i].events == EPOLLIN)
                {
                    Accepter();
                }
            }
            else
            {
                if (_revs[i].events == EPOLLIN)
                {
                    Recver(_revs[i].data.fd);
                }
            }
        }
    }
    ~EpollServer()
    {
        close(_listen_socket->Fd());
        if (_epfd) close(_epfd);
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listen_socket;
    bool _isrunning;
    int _epfd;
    struct epoll_event _revs[revs_num];
};

本篇文章的分享就到这里了,如果您觉得在本文有所收获,还请留下您的三连支持哦~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-03-10,如有侵权请联系 cloudcommunity@tencent 删除网络linuxepollselect事件

本文标签: Linux网络多路转接selectpollepoll