ae.c是redis事件框架的具體實現,這篇blog對這份源碼進行簡單說明。其中談到了作者已經標記的一些未來可能做的改進。
ae.c
1
#include <stdio.h>
2
#include <sys/time.h>
3
#include <sys/types.h>
4
#include <unistd.h>
5
#include <stdlib.h>
6
7
#include
"
ae.h
"
8
#include
"
zmalloc.h
"
9
#include
"
config.h
"
10
11
/*
Include the best multiplexing layer supported by this system.
12
* The following should be ordered by performances, descending.
*/
//為了支持不同的平臺,redis用相同的接口封裝了系統提供的多路復用層代碼。接口共提供如下函數:aeApiCreate\aeApiFree\aeApiAddEvent\aeApiDelEvent\aeApiPoll\aeApiName函數,以及一個struct aeApiState
//通過包含不同的頭文件,選擇不同的底層實現
//按如下順序,效率遞減: epoll > kqueue > select
13
#ifdef HAVE_EPOLL
14
#include
"
ae_epoll.c
"
15
#else
16
#ifdef HAVE_KQUEUE
17
#include
"
ae_kqueue.c
"
18
#else
19
#include
"
ae_select.c
"
20
#endif
21
#endif
22
//初始化函數,創建事件循環,函數內部alloc一個結構,用于表示事件狀態,供后續其他函數作為參數使用
23
aeEventLoop *aeCreateEventLoop(
void
) {
24
aeEventLoop *
eventLoop;
25
int
i;
26
27
eventLoop = zmalloc(
sizeof
(*
eventLoop));
28
if
(!eventLoop)
return
NULL;
//時間event用鏈表存儲
29
eventLoop->timeEventHead =
NULL;
30
eventLoop->timeEventNextId =
0
;
//表示是否停止事件循環
31
eventLoop->stop =
0
;
//maxfd只由ae_select.c使用,后續有些相關的處理,如果使用epoll的話,其實可以進行簡化
32
eventLoop->maxfd = -
1
;
//每次調用epoll\select前調用的函數,由框架使用者注冊
33
eventLoop->beforesleep =
NULL;
34
if
(aeApiCreate(eventLoop) == -
1
) {
35
zfree(eventLoop);
36
return
NULL;
37
}
38
/*
Events with mask == AE_NONE are not set. So let's initialize the
39
* vector with it.
*/
//將所有的文件描述符的mask設置為無效值,作為初始化
40
for
(i =
0
; i < AE_SETSIZE; i++
)
41
eventLoop->events[i].mask =
AE_NONE;
42
return
eventLoop;
43
}
44
//底層實現執行釋放操作后,釋放state的內存
45
void
aeDeleteEventLoop(aeEventLoop *
eventLoop) {
46
aeApiFree(eventLoop);
47
zfree(eventLoop);
48
}
49
//停止事件循環,redis作為一個無限循環的server,在redis.c中并沒有任何一處調用此函數
50
void
aeStop(aeEventLoop *
eventLoop) {
51
eventLoop->stop =
1
;
52
}
53
//創建文件fd事件,加入事件循環監控列表,使得后續epoll\select時將會測試這個文件描述符的可讀性(可寫性)
54
int
aeCreateFileEvent(aeEventLoop *eventLoop,
int
fd,
int
mask,
55
aeFileProc *proc,
void
*
clientData)
56
{
57
if
(fd >= AE_SETSIZE)
return
AE_ERR;
58
aeFileEvent *fe = &eventLoop->
events[fd];
59
60
if
(aeApiAddEvent(eventLoop, fd, mask) == -
1
)
61
return
AE_ERR;
62
fe->mask |=
mask;
//注冊函數
63
if
(mask & AE_READABLE) fe->rfileProc =
proc;
64
if
(mask & AE_WRITABLE) fe->wfileProc =
proc;
65
fe->clientData =
clientData;
//更新maxfd
66
if
(fd > eventLoop->
maxfd)
67
eventLoop->maxfd =
fd;
68
return
AE_OK;
69
}
70
71
void
aeDeleteFileEvent(aeEventLoop *eventLoop,
int
fd,
int
mask)
72
{
73
if
(fd >= AE_SETSIZE)
return
;
74
aeFileEvent *fe = &eventLoop->
events[fd];
75
76
if
(fe->mask == AE_NONE)
return
;
77
fe->mask = fe->mask & (~
mask);
78
if
(fd == eventLoop->maxfd && fe->mask ==
AE_NONE) {
79
/*
Update the max fd
*/
80
int
j;
81
82
for
(j = eventLoop->maxfd-
1
; j >=
0
; j--
)
83
if
(eventLoop->events[j].mask != AE_NONE)
break
;
84
eventLoop->maxfd =
j;
85
}
86
aeApiDelEvent(eventLoop, fd, mask);
87
}
88
//返回值為該文件描述符關注的事件類型(可讀、可寫)
89
int
aeGetFileEvents(aeEventLoop *eventLoop,
int
fd) {
90
if
(fd >= AE_SETSIZE)
return
0
;
91
aeFileEvent *fe = &eventLoop->
events[fd];
92
93
return
fe->
mask;
94
}
95
96
static
void
aeGetTime(
long
*seconds,
long
*
milliseconds)
97
{
98
struct
timeval tv;
99
100
gettimeofday(&
tv, NULL);
101
*seconds =
tv.tv_sec;
102
*milliseconds = tv.tv_usec/
1000
;
103
}
104
105
static
void
aeAddMillisecondsToNow(
long
long
milliseconds,
long
*sec,
long
*
ms) {
106
long
cur_sec, cur_ms, when_sec, when_ms;
107
108
aeGetTime(&cur_sec, &
cur_ms);
109
when_sec = cur_sec + milliseconds/
1000
;
110
when_ms = cur_ms + milliseconds%
1000
;
111
if
(when_ms >=
1000
) {
112
when_sec ++
;
113
when_ms -=
1000
;
114
}
115
*sec =
when_sec;
116
*ms =
when_ms;
117
}
118
119
long
long
aeCreateTimeEvent(aeEventLoop *eventLoop,
long
long
milliseconds,
120
aeTimeProc *proc,
void
*
clientData,
121
aeEventFinalizerProc *
finalizerProc)
122
{
123
long
long
id = eventLoop->timeEventNextId++
;
124
aeTimeEvent *
te;
125
126
te = zmalloc(
sizeof
(*
te));
127
if
(te == NULL)
return
AE_ERR;
128
te->id =
id;
//time event執行時間為絕對時間點
129
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->
when_ms);
130
te->timeProc =
proc;
131
te->finalizerProc =
finalizerProc;
132
te->clientData =
clientData;
//time event鏈表為無序表,直接插入到鏈表頭
133
te->next = eventLoop->
timeEventHead;
134
eventLoop->timeEventHead =
te;
135
return
id;
136
}
137
//從鏈表中刪除,是以id作為key查找的(順序遍歷)
138
int
aeDeleteTimeEvent(aeEventLoop *eventLoop,
long
long
id)
139
{
140
aeTimeEvent *te, *prev =
NULL;
141
142
te = eventLoop->
timeEventHead;
143
while
(te) {
144
if
(te->id ==
id) {
145
if
(prev ==
NULL)
146
eventLoop->timeEventHead = te->
next;
147
else
148
prev->next = te->
next;
149
if
(te->
finalizerProc)
150
te->finalizerProc(eventLoop, te->
clientData);
151
zfree(te);
152
return
AE_OK;
153
}
154
prev =
te;
155
te = te->
next;
156
}
157
return
AE_ERR;
/*
NO event with the specified ID found
*/
158
}
159
160
/*
Search the first timer to fire.
161 * This operation is useful to know how many time the select can be
162 * put in sleep without to delay any event. (沒大看懂),也許是給一個阻塞時間的上限值
163
* If there are no timers NULL is returned.
164
*
165
* Note that's O(N) since time events are unsorted.
166
* Possible optimizations (not needed by Redis so far, but...):
167
* 1) Insert the event in order, so that the nearest is just the head.
168
* Much better but still insertion or deletion of timers is O(N).
169
* 2) Use a
skiplist
to have this operation as O(1) and insertion as O(log(N)).
170
*/
171
static
aeTimeEvent *aeSearchNearestTimer(aeEventLoop *
eventLoop)
172
{
173
aeTimeEvent *te = eventLoop->
timeEventHead;
174
aeTimeEvent *nearest =
NULL;
175
176
while
(te) {
177
if
(!nearest || te->when_sec < nearest->when_sec ||
178
(te->when_sec == nearest->when_sec &&
179
te->when_ms < nearest->
when_ms))
180
nearest =
te;
181
te = te->
next;
182
}
183
return
nearest;
184
}
185
//對time event進行處理
186
/*
Process time events
*/
187
static
int
processTimeEvents(aeEventLoop *
eventLoop) {
//處理的事件數
188
int
processed =
0
;
189
aeTimeEvent *
te;
190
long
long
maxId;
191
192
te = eventLoop->
timeEventHead;
193
maxId = eventLoop->timeEventNextId-
1
;
194
while
(te) {
195
long
now_sec, now_ms;
196
long
long
id;
197
198
if
(te->id >
maxId) {
199
te = te->
next;
200
continue
;
201
}
202
aeGetTime(&now_sec, &
now_ms);
203
if
(now_sec > te->when_sec ||
204
(now_sec == te->when_sec && now_ms >= te->
when_ms))
205
{
206
int
retval;
207
208
id = te->
id;
//如果執行時間到或已超出,則執行對應的時間處理函數
209
retval = te->timeProc(eventLoop, id, te->
clientData);
210
processed++
;
211
/*
After an event is processed our time event list may
212
* no longer be the same, so we restart from head. //因為時間處理函數timeProc可能改變此鏈表
213 * Still we make sure to don't process events registered
214 * by event handlers itself in order to don't loop forever.?
215
* To do so we saved the max ID we want to handle.
216
*
217
* FUTURE OPTIMIZATIONS:
218
* Note that this is NOT great algorithmically. Redis uses
219
* a single time event so it's not a problem but the right
220
* way to do this is to add the new elements on head, and
221
* to flag deleted elements in a special way for later
222
* deletion (putting references to the nodes to delete into
223
* another linked list).
*/
//如果這個時間處理函數不再繼續執行,則從time event的鏈表中刪除事件;否則,retval為繼續執行的時間間隔(單位為ms),在當前的timeevent struct的時間值上進行增加
224
if
(retval !=
AE_NOMORE) {
225
aeAddMillisecondsToNow(retval,&te->when_sec,&te->
when_ms);
226
}
else
{
227
aeDeleteTimeEvent(eventLoop, id);
228
}
229
te = eventLoop->
timeEventHead;
230
}
else
{
231
te = te->
next;
232
}
233
}
234
return
processed;
235
}
236
237
/*
Process every pending(懸而未決) time event, then every pending file event
238
* (that may be registered by time event callbacks just processed).
239
* Without special flags the function sleeps until some file event
240
* fires, or when the next time event occurrs (if any).
241
*
242
* If flags is 0, the function does nothing and returns.
243
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
244
* if flags has AE_FILE_EVENTS set, file events are processed.
245
* if flags has AE_TIME_EVENTS set, time events are processed.
246
* if flags has AE_DONT_WAIT set the function returns ASAP until all
247
* the events that's possible to process without to wait are processed.
248
*
249
* The function returns the number of events processed.
*/
250
int
aeProcessEvents(aeEventLoop *eventLoop,
int
flags)
251
{
252
int
processed =
0
, numevents;
253
254
/*
Nothing to do? return ASAP
*/
255
if
(!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS))
return
0
;
256
257
/*
Note that we want call select() even if there are no
258
* file events to process as long as we want to process time
259
* events, in order to sleep until the next time event is ready
260
* to fire.
*/
261
if
(eventLoop->maxfd != -
1
||
262
((flags & AE_TIME_EVENTS) && !(flags &
AE_DONT_WAIT))) {
263
int
j;
264
aeTimeEvent *shortest =
NULL;
265
struct
timeval tv, *
tvp;
266
267
if
(flags & AE_TIME_EVENTS && !(flags &
AE_DONT_WAIT))
268
shortest =
aeSearchNearestTimer(eventLoop);
269
if
(shortest) {
270
long
now_sec, now_ms;
271
272
/*
Calculate the time missing for the nearest
273
* timer to fire.
*/
274
aeGetTime(&now_sec, &
now_ms);
275
tvp = &
tv;
276
tvp->tv_sec = shortest->when_sec -
now_sec;
277 if (shortest->when_ms < now_ms) {
278 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
279 tvp->tv_sec --;
280 } else {
281 tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
282 }
283 if (tvp->tv_sec < 0) tvp->tv_sec = 0;
284 if (tvp->tv_usec < 0) tvp->tv_usec = 0; 疑似有bug,比如當前時間為 1s +2ms ,shortest時間為0s+1ms的case
285
}
else
{
286
/*
If we have to check for events but need to return
287
* ASAP because of AE_DONT_WAIT we need to se the timeout
288
* to zero
*/
289
if
(flags &
AE_DONT_WAIT) {
290
tv.tv_sec = tv.tv_usec =
0
;
291
tvp = &
tv;
292
}
else
{
293
/*
Otherwise we can block
*/
294
tvp = NULL;
/*
wait forever
*/
295
}
296
}
297
//指定這個tvp是說這個tvp到期時,至少由time event可以執行
298
numevents =
aeApiPoll(eventLoop, tvp);
299
for
(j =
0
; j < numevents; j++
) {
300
aeFileEvent *fe = &eventLoop->events[eventLoop->
fired[j].fd];
301
int
mask = eventLoop->
fired[j].mask;
302
int
fd = eventLoop->
fired[j].fd;
303
int
rfired =
0
;
304
305
/*
note the fe->mask & mask & ... code: maybe an already processed
306
* event removed an element that fired and we still didn't
307
* processed, so we check if the event is still valid.
*/
308
if
(fe->mask & mask &
AE_READABLE) {
309
rfired =
1
;
310
fe->rfileProc(eventLoop,fd,fe->
clientData,mask);
311
}
//避免同一個注冊函數被調用兩次
312
if
(fe->mask & mask &
AE_WRITABLE) {
313
if
(!rfired || fe->wfileProc != fe->
rfileProc)
314
fe->wfileProc(eventLoop,fd,fe->
clientData,mask);
315
}
316
processed++
;
317
}
318
}
319
/*
Check time events
*/
320
if
(flags &
AE_TIME_EVENTS)
321
processed +=
processTimeEvents(eventLoop);
322
323
return
processed;
/*
return the number of processed file/time events
*/
324
}
325
//對select的一個簡單封裝,沒有太大意義
326
/*
Wait for millseconds until the given file descriptor becomes
327
* writable/readable/exception
*/
328
int
aeWait(
int
fd,
int
mask,
long
long
milliseconds) {
329
struct
timeval tv;
330
fd_set rfds, wfds, efds;
331
int
retmask =
0
, retval;
332
333
tv.tv_sec = milliseconds/
1000
;
334
tv.tv_usec = (milliseconds%
1000
)*
1000
;
335
FD_ZERO(&
rfds);
336
FD_ZERO(&
wfds);
337
FD_ZERO(&
efds);
338
339
if
(mask & AE_READABLE) FD_SET(fd,&
rfds);
340
if
(mask & AE_WRITABLE) FD_SET(fd,&
wfds);
341
if
((retval =
select
(fd+
1
, &rfds, &wfds, &efds, &tv)) >
0
) {
342
if
(FD_ISSET(fd,&rfds)) retmask |=
AE_READABLE;
343
if
(FD_ISSET(fd,&wfds)) retmask |=
AE_WRITABLE;
344
return
retmask;
345
}
else
{
346
return
retval;
347
}
348
}
349
350
void
aeMain(aeEventLoop *
eventLoop) {
351
eventLoop->stop =
0
;
352
while
(!eventLoop->
stop) {
353
if
(eventLoop->beforesleep !=
NULL)
354
eventLoop->
beforesleep(eventLoop);
355
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
356
}
357
}
358
359
char
*aeGetApiName(
void
) {
360
return
aeApiName();
361
}
362
363
void
aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *
beforesleep) {
364
eventLoop->beforesleep =
beforesleep;
365
}
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

