您现在的位置是:网站首页> 编程资料编程资料
Redis源码设计剖析之事件处理示例详解_Redis_
2023-05-27
481人已围观
简介 Redis源码设计剖析之事件处理示例详解_Redis_
1. Redis事件介绍
Redis服务器是一个事件驱动程序,所谓事件驱动就是输入一条命令并且按下回车,然后消息被组装成Redis协议的格式发送给Redis服务器,这个时候就会产生一个事件,Redis服务器会接收改命令,处理该命令和发送回复,而当我们没有与服务器进行交互时,服务器就会处于阻塞等待状态,它会让出CPU然后进入睡眠状态,当事件触发时,就会被操作系统唤醒.
而Redis服务器需要处理以下两类事件:
文件事件:Redis 服务器通过套接字与客户端(或者其他Redis服务器)进行连接,而文件事件就是服务器对套接字操作的抽象. 服务器与客户端(或者其他服务器)的通信会产生相应的文件事件,而服务器则通过监听并处理这些事件来完成一系列网络通信操作.
时间事件:Redis 服务器中的一些操作(比如serverCron函数)需要在给定的时间点执行,而时间事件就是服务器对这类定时操作的抽象.
2. 事件的抽象
Redis把文件事件和时间事件分别抽象成一个数据结构来管理.
2.1 文件事件结构
typedef struct aeFileEvent { // 文件时间类型:AE_NONE,AE_READABLE,AE_WRITABLE int mask; // 可读处理函数 aeFileProc *rfileProc; // 可写处理函数 aeFileProc *wfileProc; // 客户端传入的数据 void *clientData; } aeFileEvent; //文件事件 其中rfileProc和wfileProc成员分别为两个函数指针,他们的原型为:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
该函数是回调函数,如果当前文件事件所指定的事件类型发生时,则会调用对应的回调函数来处理该事件.
当事件就绪的时候,我们需要知道文件事件的文件描述符还有事件类型才能对于锁定该事件,因此定义了aeFiredEvent结构统一管理:
typedef struct aeFiredEvent { // 就绪事件的文件描述符 int fd; // 就绪事件类型:AE_NONE,AE_READABLE,AE_WRITABLE int mask; } aeFiredEvent; //就绪事件 文件事件的类型:
#define AE_NONE 0 //未设置 #define AE_READABLE 1 //事件可读 #define AE_WRITABLE 2 //事件可写
2.2 时间事件结构
typedef struct aeTimeEvent { // 时间事件的id long long id; // 时间事件到达的时间的秒数 long when_sec; /* seconds */ // 时间事件到达的时间的毫秒数 long when_ms; /* milliseconds */ // 时间事件处理函数 aeTimeProc *timeProc; // 时间事件终结函数 aeEventFinalizerProc *finalizerProc; // 客户端传入的数据 void *clientData; // 指向下一个时间事件 struct aeTimeEvent *next; } aeTimeEvent; //时间事件 可以看出,时间事件的结构就是一个链表的节点,因为struct aeTimeEvent *next是指向下一个时间事件的指针.
和文件事件一样,当时间事件所指定的事件发生时,也会调用对应的回调函数,结构成员timeProc和finalizerProc都是回调函数,函数原型如下:
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
虽然对文件事件和时间事件都做了抽象,Redis仍然需要对事件做一个整体的抽象,用来描述一个事件的状态. 也就是下面要介绍的事件状态结构:aeEventLoop.
2.3 事件状态结构
typedef struct aeEventLoop { // 当前已注册的最大的文件描述符 int maxfd; /* highest file descriptor currently registered */ // 文件描述符监听集合的大小 int setsize; /* max number of file descriptors tracked */ // 下一个时间事件的ID long long timeEventNextId; // 最后一次执行事件的时间 time_t lastTime; /* Used to detect system clock skew */ // 注册的文件事件表 aeFileEvent *events; /* Registered events */ // 已就绪的文件事件表 aeFiredEvent *fired; /* Fired events */ // 时间事件的头节点指针 aeTimeEvent *timeEventHead; // 事件处理开关 int stop; // 多路复用库的事件状态数据 void *apidata; /* This is used for polling API specific data */ // 执行处理事件之前的函数 aeBeforeSleepProc *beforesleep; } aeEventLoop; //事件轮询的状态结构 aeEventLoop结构保存了一个void *类型的万能指针apidata,用来保存轮询事件的状态,也就是保存底层调用的多路复用库的事件状态.
Redis的 I/O多路复用程序的所有功能都是通过包装常见的select、epoll、evport和kqueue这些I/O多路复用函数库来实现的,每个I/O多路复用函数库在Redis源码中都对应着一个单独的文件,比如ae_select.c、ae_epoll.c等等.
他们在编译阶段,会根据不同的系统选择性能最高的一个多路复用库作为Redis的多路复用程序实现,而且所有库的API都是相同的,这就可以让Redis多路复用程序的底层可以互换.
下面是具体选择库的源码:
// IO复用的选择,性能依次下降,Linux支持 "ae_epoll.c" 和 "ae_select.c" #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
也可以通过命令INFO server来查看当前使用的是哪个多路复用库:

可以看到Linux下默认使用的是epoll多路复用库,那么apidata保存的就是epoll模型的事件状态结构,它在ae_epoll.c源文件中:
typedef struct aeApiState { // epoll事件的文件描述符 int epfd; // 事件表 struct epoll_event *events; } aeApiState; // 事件的状态 epoll模型的struct epoll_event结构中定义着epoll事件的类型,比如EPOLLIN、EPOLLOUT等等,但是Redis的文件结构aeFileEvent中也在mask中定义了自己的事件类型,例如:AE_READABLE、AE_WRITABLE等等,于是就需要实现一个中间层将两者的事件类型相联系起来,这就是之前提到的ae_epoll.c文件中实现的相同的API:
// 创建一个epoll实例,保存到eventLoop中 static int aeApiCreate(aeEventLoop *eventLoop) // 调整事件表的大小 static int aeApiResize(aeEventLoop *eventLoop, int setsize) // 释放epoll实例和事件表空间 static void aeApiFree(aeEventLoop *eventLoop) // 在epfd标识的事件表上注册fd的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) // 在epfd标识的事件表上注删除fd的事件 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) // 等待所监听文件描述符上有事件发生 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) // 返回正在使用的IO多路复用库的名字 static char *aeApiName(void)
这些API会讲epoll的底层函数封装起来,Redis实现事件时,只需要调用这些接口即可.
我们以下面两个API的源码举例:
aeApiAddEvent
该函数会向Redis事件状态结构aeEventLoop的事件表event注册一个事件,对应的是epoll_ctl函数.
// 在epfd标识的事件表上注册fd的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; // EPOLL_CTL_ADD,向epfd注册fd的上的event // EPOLL_CTL_MOD,修改fd已注册的event // #define AE_NONE 0 //未设置 // #define AE_READABLE 1 //事件可读 // #define AE_WRITABLE 2 //事件可写 // 判断fd事件的操作,如果没有设置事件,则进行关联mask类型事件,否则进行修改 int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; // struct epoll_event { // uint32_t events; /* Epoll events */ // epoll_data_t data; /* User data variable */ // }; ee.events = 0; // 如果是修改事件,合并之前的事件类型 mask |= eventLoop->events[fd].mask; /* Merge old events */ // 根据mask映射epoll的事件类型 if (mask & AE_READABLE) ee.events |= EPOLLIN; //读事件 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; //写事件 ee.data.fd = fd; //设置事件所从属的目标文件描述符 // 将ee事件注册到epoll中 if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; } aeApiPoll
等待所监听文件描述符上有事件发生,对应着底层的epoll_wait函数.
// 等待所监听文件描述符上有事件发生 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 监听事件表上是否有事件发生 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // 至少有一个就绪的事件 if (retval > 0) { int j; numevents = retval; // 遍历就绪的事件表,将其加入到eventLoop的就绪事件表中 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; // 根据就绪的事件类型,设置mask if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; // 添加到就绪事件表中 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } // 返回就绪的事件个数 return numevents; } 3. 事件的实现
事件的所有源码都定义在ae.c源文件中,先从aeMain函数说起.
// 事件轮询的主函数 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; // 一直处理事件 while (!eventLoop->stop) { // 执行处理事件之前的函数 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); //处理到时的时间事件和就绪的文件事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } } 可以看到,如果服务器一直处理事件,那么就是一个死循环,而一个最典型的事件驱动,就是一个死循环. 在循环中,程序会调用处理事件的函数aeProcessEvents(),它的参数是一个事件状态结构aeEventLoop和AE_ALL_EVENTS.
事件类型的宏定义,在ae.h头文件中:
#define AE_FILE_EVENTS 1 //文件事件 #define AE_TIME_EVENTS 2 //时间事件 #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) //文件和时间事件 #define AE_DONT_WAIT 4
// 处理到时的时间事件和就绪的文件事件 // 如果flags = 0,函数什么都不做,直接返回 // 如果flags设置了 AE_ALL_EVENTS ,则执行所有类型的事件 // 如果flags设置了 AE_FILE_EVENTS ,则执行文件事件 // 如果flags设置了 AE_TIME_EVENTS ,则执行时间事件 // 如果flags设置了 AE_DONT_WAIT ,那么函数处理完事件后直接返回,不阻塞等待 // 函数返回执行的事件个数 int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; // 如果什么事件都没有设置则直接返回 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; // 请注意,既然我们要处理时间事件,即使没有要处理的文件事件,我们仍要调用select(),以便在下一次事件准备启动之前进行休眠 // 当前还没有要处理的文件事件,或者设置了时间事件但是没有设置不阻塞标识 if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // 如果设置了时间事件而没有设置不阻塞标识 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 获取最近到时的时间事件 shortest = aeSearchNearestTimer(eventLoop); // 获取到了最早到时的时间事件 if (shortest) { long now_sec, now_ms; // 获取当前时间 aeGetTime(&now_sec, &now_ms); tvp = &tv; // 等待该
点击排行
本栏推荐
