aof是redis提供的一種數(shù)據(jù)持久化機(jī)制,通過將每一條命令dump下來,保持?jǐn)?shù)據(jù)和內(nèi)存中的數(shù)據(jù)一致。
1
#include
"
redis.h
"
2
#include
"
bio.h
"
3
4
#include <signal.h>
5
#include <fcntl.h>
6
#include <sys/stat.h>
7
#include <sys/types.h>
8
#include <sys/time.h>
9
#include <sys/resource.h>
10
#include <sys/wait.h>
11
12
void
aofUpdateCurrentSize(
void
);
13
14
void
aof_background_fsync(
int
fd) {
15
bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(
void
*)(
long
)fd,NULL,NULL);
16
}
17
18
/*
Called when the user switches from "appendonly yes" to "appendonly no"
19
* at runtime using the CONFIG command.
*/
20
void
stopAppendOnly(
void
) {
21
flushAppendOnlyFile(
1
);
22
aof_fsync(server.appendfd);
23
close(server.appendfd);
24
25
server.appendfd = -
1
;
26
server.appendseldb = -
1
;
27
server.appendonly =
0
;
28
/*
rewrite operation in progress? kill it, wait child exit
*/
29
if
(server.bgrewritechildpid != -
1
) {
30
int
statloc;
31
32
if
(kill(server.bgrewritechildpid,SIGKILL) != -
1
)
33
wait3(&statloc,
0
,NULL);
34
/*
reset the buffer accumulating changes while the child saves
*/
35
sdsfree(server.bgrewritebuf);
36
server.bgrewritebuf =
sdsempty();
37
server.bgrewritechildpid = -
1
;
38
}
39
}
40
41
/*
Called when the user switches from "appendonly no" to "appendonly yes"
42
* at runtime using the CONFIG command.
*/
43
int
startAppendOnly(
void
) {
44
server.appendonly =
1
;
45
server.lastfsync =
time(NULL);
46
server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,
0644
);
47
if
(server.appendfd == -
1
) {
48
redisLog(REDIS_WARNING,
"
Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s
"
,strerror(errno));
49
return
REDIS_ERR;
50
}
51
if
(rewriteAppendOnlyFileBackground() ==
REDIS_ERR) {
52
server.appendonly =
0
;
53
close(server.appendfd);
54
redisLog(REDIS_WARNING,
"
Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.
"
,strerror(errno));
55
return
REDIS_ERR;
56
}
57
return
REDIS_OK;
58
}
59
60
/*
Write the append only file buffer on disk.
61
*
62
* Since we are required to write the AOF before replying to the client,
63
* and the only way the client socket can get a write is entering when the
64
* the event loop, we accumulate all the AOF writes in a memory
65
* buffer and write it on disk using this function just before entering
66
* the event loop again.
67
*
68
* About the 'force' argument:
69
*
70
* When the fsync policy is set to 'everysec' we may delay the flush if there
71
* is still an fsync() going on in the background thread, since for instance
72
* on Linux write(2) will be blocked by the background fsync anyway.
73
* When this happens we remember that there is some aof buffer to be
74
* flushed ASAP, and will try to do that in the serverCron() function.
75
*
76
* However if force is set to 1 we'll write regardless of the background
77
* fsync.
*/
78
void
flushAppendOnlyFile(
int
force) {
79
ssize_t nwritten;
80
int
sync_in_progress =
0
;
81
82
if
(sdslen(server.aofbuf) ==
0
)
return
;
83
84
if
(server.appendfsync ==
APPENDFSYNC_EVERYSEC)
85
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) !=
0
;
86
87
if
(server.appendfsync == APPENDFSYNC_EVERYSEC && !
force) {
88
/*
With this append fsync policy we do background fsyncing.
89
* If the fsync is still in progress we can try to delay
90
* the write for a couple of seconds.
*/
91
if
(sync_in_progress) {
92
if
(server.aof_flush_postponed_start ==
0
) {
93
/*
No previous write postponinig, remember that we are
94
* postponing the flush and return.
*/
95
server.aof_flush_postponed_start =
server.unixtime;
96
return
;
97
}
else
if
(server.unixtime - server.aof_flush_postponed_start <
2
) {
98
/*
We were already waiting for fsync to finish, but for less
99
* than two seconds this is still ok. Postpone again.
*/
100
return
;
101
}
102
/*
Otherwise fall trough, and go write since we can't wait
103
* over two seconds.
*/
104
redisLog(REDIS_NOTICE,
"
Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.
"
);
105
}
106
}
107
/*
If you are following this code path, then we are going to write so
108
* set reset the postponed flush sentinel to zero.
*/
109
server.aof_flush_postponed_start =
0
;
110
111
/*
We want to perform a single write. This should be guaranteed atomic
112
* at least if the filesystem we are writing is a real physical one.
113
* While this will save us against the server being killed I don't think
114
* there is much to do about the whole server stopping for power problems
115
* or alike
*/
116
nwritten =
write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
117
if
(nwritten !=
(signed)sdslen(server.aofbuf)) {
118
/*
Ooops, we are in troubles. The best thing to do for now is
119
* aborting instead of giving the illusion that everything is
120
* working as expected.
*/
121
if
(nwritten == -
1
) {
122
redisLog(REDIS_WARNING,
"
Exiting on error writing to the append-only file: %s
"
,strerror(errno));
123
}
else
{
124
redisLog(REDIS_WARNING,
"
Exiting on short write while writing to the append-only file: %s
"
,strerror(errno));
125
}
126
exit(
1
);
127
}
128
server.appendonly_current_size +=
nwritten;
129
130
/*
Re-use AOF buffer when it is small enough. The maximum comes from the
131
* arena size of 4k minus some overhead (but is otherwise arbitrary).
*/
132
if
((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) <
4000
) {
133
sdsclear(server.aofbuf);
134
}
else
{
135
sdsfree(server.aofbuf);
136
server.aofbuf =
sdsempty();
137
}
138
139
/*
Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
140
* children doing I/O in the background.
*/
141
if
(server.no_appendfsync_on_rewrite &&
142
(server.bgrewritechildpid != -
1
|| server.bgsavechildpid != -
1
))
143
return
;
144
145
/*
Perform the fsync if needed.
*/
146
if
(server.appendfsync ==
APPENDFSYNC_ALWAYS) {
147
/*
aof_fsync is defined as fdatasync() for Linux in order to avoid
148
* flushing metadata.
*/
149
aof_fsync(server.appendfd);
/*
Let's try to get this data on the disk
*/
150
server.lastfsync =
server.unixtime;
151
}
else
if
((server.appendfsync == APPENDFSYNC_EVERYSEC &&
152
server.unixtime >
server.lastfsync)) {
153
if
(!
sync_in_progress) aof_background_fsync(server.appendfd);
154
server.lastfsync =
server.unixtime;
155
}
156
}
157
158
sds catAppendOnlyGenericCommand(sds dst,
int
argc, robj **
argv) {
159
char
buf[
32
];
160
int
len, j;
161
robj *
o;
162
163
buf[
0
] =
'
*
'
;
164
len =
1
+ll2string(buf+
1
,
sizeof
(buf)-
1
,argc);
165
buf[len++] =
'
\r
'
;
166
buf[len++] =
'
\n
'
;
167
dst =
sdscatlen(dst,buf,len);
168
169
for
(j =
0
; j < argc; j++
) {
170
o =
getDecodedObject(argv[j]);
171
buf[
0
] =
'
$
'
;
172
len =
1
+ll2string(buf+
1
,
sizeof
(buf)-
1
,sdslen(o->
ptr));
173
buf[len++] =
'
\r
'
;
174
buf[len++] =
'
\n
'
;
175
dst =
sdscatlen(dst,buf,len);
176
dst = sdscatlen(dst,o->ptr,sdslen(o->
ptr));
177
dst = sdscatlen(dst,
"
\r\n
"
,
2
);
178
decrRefCount(o);
179
}
180
return
dst;
181
}
182
183
sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *
seconds) {
184
int
argc =
3
;
185
long
when;
186
robj *argv[
3
];
187
188
/*
Make sure we can use strtol
*/
189
seconds =
getDecodedObject(seconds);
190
when = time(NULL)+strtol(seconds->ptr,NULL,
10
);
191
decrRefCount(seconds);
192
193
argv[
0
] = createStringObject(
"
EXPIREAT
"
,
8
);
194
argv[
1
] =
key;
195
argv[
2
] =
createObject(REDIS_STRING,
196
sdscatprintf(sdsempty(),
"
%ld
"
,when));
197
buf =
catAppendOnlyGenericCommand(buf, argc, argv);
198
decrRefCount(argv[
0
]);
199
decrRefCount(argv[
2
]);
200
return
buf;
201
}
202
203
void
feedAppendOnlyFile(
struct
redisCommand *cmd,
int
dictid, robj **argv,
int
argc) {
204
sds buf =
sdsempty();
205
robj *tmpargv[
3
];
206
207
/*
The DB this command was targetting is not the same as the last command
208
* we appendend. To issue a SELECT command is needed.
*/
209
if
(dictid !=
server.appendseldb) {
210
char
seldb[
64
];
211
212
snprintf(seldb,
sizeof
(seldb),
"
%d
"
,dictid);
213
buf = sdscatprintf(buf,
"
*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n
"
,
214
(unsigned
long
)strlen(seldb),seldb);
215
server.appendseldb =
dictid;
216
}
217
218
if
(cmd->proc ==
expireCommand) {
219
/*
Translate EXPIRE into EXPIREAT
*/
220
buf = catAppendOnlyExpireAtCommand(buf,argv[
1
],argv[
2
]);
221
}
else
if
(cmd->proc ==
setexCommand) {
222
/*
Translate SETEX to SET and EXPIREAT
*/
223
tmpargv[
0
] = createStringObject(
"
SET
"
,
3
);
224
tmpargv[
1
] = argv[
1
];
225
tmpargv[
2
] = argv[
3
];
226
buf = catAppendOnlyGenericCommand(buf,
3
,tmpargv);
227
decrRefCount(tmpargv[
0
]);
228
buf = catAppendOnlyExpireAtCommand(buf,argv[
1
],argv[
2
]);
229
}
else
{
230
buf =
catAppendOnlyGenericCommand(buf,argc,argv);
231
}
232
233
/*
Append to the AOF buffer. This will be flushed on disk just before
234
* of re-entering the event loop, so before the client will get a
235
* positive reply about the operation performed.
*/
236
server.aofbuf =
sdscatlen(server.aofbuf,buf,sdslen(buf));
237
238
/*
If a background append only file rewriting is in progress we want to
239
* accumulate the differences between the child DB and the current one
240
* in a buffer, so that when the child process will do its work we
241
* can append the differences to the new append only file.
*/
242
if
(server.bgrewritechildpid != -
1
)
243
server.bgrewritebuf =
sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
244
245
sdsfree(buf);
246
}
247
248
/*
In Redis commands are always executed in the context of a client, so in
249
* order to load the append only file we need to create a fake client.
*/
250
struct
redisClient *createFakeClient(
void
) {
251
struct
redisClient *c = zmalloc(
sizeof
(*
c));
252
253
selectDb(c,
0
);
254
c->fd = -
1
;
255
c->querybuf =
sdsempty();
256
c->argc =
0
;
257
c->argv =
NULL;
258
c->bufpos =
0
;
259
c->flags =
0
;
260
/*
We set the fake client as a slave waiting for the synchronization
261
* so that Redis will not try to send replies to this client.
*/
262
c->replstate =
REDIS_REPL_WAIT_BGSAVE_START;
263
c->reply =
listCreate();
264
c->reply_bytes =
0
;
265
c->watched_keys =
listCreate();
266
listSetFreeMethod(c->
reply,decrRefCount);
267
listSetDupMethod(c->
reply,dupClientReplyValue);
268
initClientMultiState(c);
269
return
c;
270
}
271
272
void
freeFakeClient(
struct
redisClient *
c) {
273
sdsfree(c->
querybuf);
274
listRelease(c->
reply);
275
listRelease(c->
watched_keys);
276
freeClientMultiState(c);
277
zfree(c);
278
}
279
280
/*
Replay the append log file. On error REDIS_OK is returned. On non fatal
281
* error (the append only file is zero-length) REDIS_ERR is returned. On
282
* fatal error an error message is logged and the program exists.
*/
283
int
loadAppendOnlyFile(
char
*
filename) {
284
struct
redisClient *
fakeClient;
285
FILE *fp = fopen(filename,
"
r
"
);
286
struct
redis_stat sb;
287
int
appendonly =
server.appendonly;
288
long
loops =
0
;
289
290
if
(fp && redis_fstat(fileno(fp),&sb) != -
1
&& sb.st_size ==
0
) {
291
server.appendonly_current_size =
0
;
292
fclose(fp);
293
return
REDIS_ERR;
294
}
295
296
if
(fp ==
NULL) {
297
redisLog(REDIS_WARNING,
"
Fatal error: can't open the append log file for reading: %s
"
,strerror(errno));
298
exit(
1
);
299
}
300
301
/*
Temporarily disable AOF, to prevent EXEC from feeding a MULTI
302
* to the same file we're about to read.
*/
303
server.appendonly =
0
;
304
305
fakeClient =
createFakeClient();
306
startLoading(fp);
307
308
while
(
1
) {
309
int
argc, j;
310
unsigned
long
len;
311
robj **
argv;
312
char
buf[
128
];
313
sds argsds;
314
struct
redisCommand *
cmd;
315
int
force_swapout;
316
317
/*
Serve the clients from time to time
*/
318
if
(!(loops++ %
1000
)) {
319
loadingProgress(ftello(fp));
320
aeProcessEvents(server.el, AE_FILE_EVENTS|
AE_DONT_WAIT);
321
}
322
323
if
(fgets(buf,
sizeof
(buf),fp) ==
NULL) {
324
if
(feof(fp))
325
break
;
326
else
327
goto
readerr;
328
}
329
if
(buf[
0
] !=
'
*
'
)
goto
fmterr;
330
argc = atoi(buf+
1
);
331
if
(argc <
1
)
goto
fmterr;
332
333
argv = zmalloc(
sizeof
(robj*)*
argc);
334
for
(j =
0
; j < argc; j++
) {
335
if
(fgets(buf,
sizeof
(buf),fp) == NULL)
goto
readerr;
336
if
(buf[
0
] !=
'
$
'
)
goto
fmterr;
337
len = strtol(buf+
1
,NULL,
10
);
338
argsds =
sdsnewlen(NULL,len);
339
if
(len && fread(argsds,len,
1
,fp) ==
0
)
goto
fmterr;
340
argv[j] =
createObject(REDIS_STRING,argsds);
341
if
(fread(buf,
2
,
1
,fp) ==
0
)
goto
fmterr;
/*
discard CRLF
*/
342
}
343
344
/*
Command lookup
*/
345
cmd = lookupCommand(argv[
0
]->
ptr);
346
if
(!
cmd) {
347
redisLog(REDIS_WARNING,
"
Unknown command '%s' reading the append only file
"
, argv[
0
]->
ptr);
348
exit(
1
);
349
}
350
/*
Run the command in the context of a fake client
*/
351
fakeClient->argc =
argc;
352
fakeClient->argv =
argv;
353
cmd->
proc(fakeClient);
354
355
/*
The fake client should not have a reply
*/
356
redisAssert(fakeClient->bufpos ==
0
&& listLength(fakeClient->reply) ==
0
);
357
/*
The fake client should never get blocked
*/
358
redisAssert((fakeClient->flags & REDIS_BLOCKED) ==
0
);
359
360
/*
Clean up. Command code may have changed argv/argc so we use the
361
* argv/argc of the client instead of the local variables.
*/
362
for
(j =
0
; j < fakeClient->argc; j++
)
363
decrRefCount(fakeClient->
argv[j]);
364
zfree(fakeClient->
argv);
365
366
/*
Handle swapping while loading big datasets when VM is on
*/
367
force_swapout =
0
;
368
if
((zmalloc_used_memory() - server.vm_max_memory) >
1024
*
1024
*
32
)
369
force_swapout =
1
;
370
371
if
(server.vm_enabled &&
force_swapout) {
372
while
(zmalloc_used_memory() >
server.vm_max_memory) {
373
if
(vmSwapOneObjectBlocking() == REDIS_ERR)
break
;
374
}
375
}
376
}
377
378
/*
This point can only be reached when EOF is reached without errors.
379
* If the client is in the middle of a MULTI/EXEC, log error and quit.
*/
380
if
(fakeClient->flags & REDIS_MULTI)
goto
readerr;
381
382
fclose(fp);
383
freeFakeClient(fakeClient);
384
server.appendonly =
appendonly;
385
stopLoading();
386
aofUpdateCurrentSize();
387
server.auto_aofrewrite_base_size =
server.appendonly_current_size;
388
return
REDIS_OK;
389
390
readerr:
391
if
(feof(fp)) {
392
redisLog(REDIS_WARNING,
"
Unexpected end of file reading the append only file
"
);
393
}
else
{
394
redisLog(REDIS_WARNING,
"
Unrecoverable error reading the append only file: %s
"
, strerror(errno));
395
}
396
exit(
1
);
397
fmterr:
398
redisLog(REDIS_WARNING,
"
Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>
"
);
399
exit(
1
);
400
}
401
402
/*
Write a sequence of commands able to fully rebuild the dataset into
403
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*/
404
int
rewriteAppendOnlyFile(
char
*
filename) {
405
dictIterator *di =
NULL;
406
dictEntry *
de;
407
FILE *
fp;
408
char
tmpfile[
256
];
409
int
j;
410
time_t now =
time(NULL);
411
412
/*
Note that we have to use a different temp name here compared to the
413
* one used by rewriteAppendOnlyFileBackground() function.
*/
414
snprintf(tmpfile,
256
,
"
temp-rewriteaof-%d.aof
"
, (
int
) getpid());
415
fp = fopen(tmpfile,
"
w
"
);
416
if
(!
fp) {
417
redisLog(REDIS_WARNING,
"
Failed rewriting the append only file: %s
"
, strerror(errno));
418
return
REDIS_ERR;
419
}
420
for
(j =
0
; j < server.dbnum; j++
) {
421
char
selectcmd[] =
"
*2\r\n$6\r\nSELECT\r\n
"
;
422
redisDb *db = server.db+
j;
423
dict *d = db->
dict;
424
if
(dictSize(d) ==
0
)
continue
;
425
di =
dictGetSafeIterator(d);
426
if
(!
di) {
427
fclose(fp);
428
return
REDIS_ERR;
429
}
430
431
/*
SELECT the new DB
*/
432
if
(fwrite(selectcmd,
sizeof
(selectcmd)-
1
,
1
,fp) ==
0
)
goto
werr;
433
if
(fwriteBulkLongLong(fp,j) ==
0
)
goto
werr;
434
435
/*
Iterate this DB writing every entry
*/
436
while
((de = dictNext(di)) !=
NULL) {
437
sds keystr =
dictGetEntryKey(de);
438
robj key, *
o;
439
time_t expiretime;
440
int
swapped;
441
442
keystr =
dictGetEntryKey(de);
443
o =
dictGetEntryVal(de);
444
initStaticStringObject(key,keystr);
445
/*
If the value for this key is swapped, load a preview in memory.
446
* We use a "swapped" flag to remember if we need to free the
447
* value object instead to just increment the ref count anyway
448
* in order to avoid copy-on-write of pages if we are forked()
*/
449
if
(!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
450
o->storage ==
REDIS_VM_SWAPPING) {
451
swapped =
0
;
452
}
else
{
453
o =
vmPreviewObject(o);
454
swapped =
1
;
455
}
456
expiretime = getExpire(db,&
key);
457
458
/*
Save the key and associated value
*/
459
if
(o->type ==
REDIS_STRING) {
460
/*
Emit a SET command
*/
461
char
cmd[]=
"
*3\r\n$3\r\nSET\r\n
"
;
462
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
463
/*
Key and value
*/
464
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
465
if
(fwriteBulkObject(fp,o) ==
0
)
goto
werr;
466
}
else
if
(o->type ==
REDIS_LIST) {
467
/*
Emit the RPUSHes needed to rebuild the list
*/
468
char
cmd[]=
"
*3\r\n$5\r\nRPUSH\r\n
"
;
469
if
(o->encoding ==
REDIS_ENCODING_ZIPLIST) {
470
unsigned
char
*zl = o->
ptr;
471
unsigned
char
*p = ziplistIndex(zl,
0
);
472
unsigned
char
*
vstr;
473
unsigned
int
vlen;
474
long
long
vlong;
475
476
while
(ziplistGet(p,&vstr,&vlen,&
vlong)) {
477
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
478
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
479
if
(vstr) {
480
if
(fwriteBulkString(fp,(
char
*)vstr,vlen) ==
0
)
481
goto
werr;
482
}
else
{
483
if
(fwriteBulkLongLong(fp,vlong) ==
0
)
484
goto
werr;
485
}
486
p =
ziplistNext(zl,p);
487
}
488
}
else
if
(o->encoding ==
REDIS_ENCODING_LINKEDLIST) {
489
list *list = o->
ptr;
490
listNode *
ln;
491
listIter li;
492
493
listRewind(list,&
li);
494
while
((ln = listNext(&
li))) {
495
robj *eleobj =
listNodeValue(ln);
496
497
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
498
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
499
if
(fwriteBulkObject(fp,eleobj) ==
0
)
goto
werr;
500
}
501
}
else
{
502
redisPanic(
"
Unknown list encoding
"
);
503
}
504
}
else
if
(o->type ==
REDIS_SET) {
505
char
cmd[]=
"
*3\r\n$4\r\nSADD\r\n
"
;
506
507
/*
Emit the SADDs needed to rebuild the set
*/
508
if
(o->encoding ==
REDIS_ENCODING_INTSET) {
509
int
ii =
0
;
510
int64_t llval;
511
while
(intsetGet(o->ptr,ii++,&
llval)) {
512
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
513
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
514
if
(fwriteBulkLongLong(fp,llval) ==
0
)
goto
werr;
515
}
516
}
else
if
(o->encoding ==
REDIS_ENCODING_HT) {
517
dictIterator *di = dictGetIterator(o->
ptr);
518
dictEntry *
de;
519
while
((de = dictNext(di)) !=
NULL) {
520
robj *eleobj =
dictGetEntryKey(de);
521
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
522
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
523
if
(fwriteBulkObject(fp,eleobj) ==
0
)
goto
werr;
524
}
525
dictReleaseIterator(di);
526
}
else
{
527
redisPanic(
"
Unknown set encoding
"
);
528
}
529
}
else
if
(o->type ==
REDIS_ZSET) {
530
/*
Emit the ZADDs needed to rebuild the sorted set
*/
531
char
cmd[]=
"
*4\r\n$4\r\nZADD\r\n
"
;
532
533
if
(o->encoding ==
REDIS_ENCODING_ZIPLIST) {
534
unsigned
char
*zl = o->
ptr;
535
unsigned
char
*eptr, *
sptr;
536
unsigned
char
*
vstr;
537
unsigned
int
vlen;
538
long
long
vll;
539
double
score;
540
541
eptr = ziplistIndex(zl,
0
);
542
redisAssert(eptr !=
NULL);
543
sptr =
ziplistNext(zl,eptr);
544
redisAssert(sptr !=
NULL);
545
546
while
(eptr !=
NULL) {
547
redisAssert(ziplistGet(eptr,&vstr,&vlen,&
vll));
548
score =
zzlGetScore(sptr);
549
550
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
551
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
552
if
(fwriteBulkDouble(fp,score) ==
0
)
goto
werr;
553
if
(vstr !=
NULL) {
554
if
(fwriteBulkString(fp,(
char
*)vstr,vlen) ==
0
)
555
goto
werr;
556
}
else
{
557
if
(fwriteBulkLongLong(fp,vll) ==
0
)
558
goto
werr;
559
}
560
zzlNext(zl,&eptr,&
sptr);
561
}
562
}
else
if
(o->encoding ==
REDIS_ENCODING_SKIPLIST) {
563
zset *zs = o->
ptr;
564
dictIterator *di = dictGetIterator(zs->
dict);
565
dictEntry *
de;
566
567
while
((de = dictNext(di)) !=
NULL) {
568
robj *eleobj =
dictGetEntryKey(de);
569
double
*score =
dictGetEntryVal(de);
570
571
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
572
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
573
if
(fwriteBulkDouble(fp,*score) ==
0
)
goto
werr;
574
if
(fwriteBulkObject(fp,eleobj) ==
0
)
goto
werr;
575
}
576
dictReleaseIterator(di);
577
}
else
{
578
redisPanic(
"
Unknown sorted set encoding
"
);
579
}
580
}
else
if
(o->type ==
REDIS_HASH) {
581
char
cmd[]=
"
*4\r\n$4\r\nHSET\r\n
"
;
582
583
/*
Emit the HSETs needed to rebuild the hash
*/
584
if
(o->encoding ==
REDIS_ENCODING_ZIPMAP) {
585
unsigned
char
*p = zipmapRewind(o->
ptr);
586
unsigned
char
*field, *
val;
587
unsigned
int
flen, vlen;
588
589
while
((p = zipmapNext(p,&field,&flen,&val,&vlen)) !=
NULL) {
590
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
591
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
592
if
(fwriteBulkString(fp,(
char
*)field,flen) ==
0
)
593
goto
werr;
594
if
(fwriteBulkString(fp,(
char
*)val,vlen) ==
0
)
595
goto
werr;
596
}
597
}
else
{
598
dictIterator *di = dictGetIterator(o->
ptr);
599
dictEntry *
de;
600
601
while
((de = dictNext(di)) !=
NULL) {
602
robj *field =
dictGetEntryKey(de);
603
robj *val =
dictGetEntryVal(de);
604
605
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
606
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
607
if
(fwriteBulkObject(fp,field) ==
0
)
goto
werr;
608
if
(fwriteBulkObject(fp,val) ==
0
)
goto
werr;
609
}
610
dictReleaseIterator(di);
611
}
612
}
else
{
613
redisPanic(
"
Unknown object type
"
);
614
}
615
/*
Save the expire time
*/
616
if
(expiretime != -
1
) {
617
char
cmd[]=
"
*3\r\n$8\r\nEXPIREAT\r\n
"
;
618
/*
If this key is already expired skip it
*/
619
if
(expiretime < now)
continue
;
620
if
(fwrite(cmd,
sizeof
(cmd)-
1
,
1
,fp) ==
0
)
goto
werr;
621
if
(fwriteBulkObject(fp,&key) ==
0
)
goto
werr;
622
if
(fwriteBulkLongLong(fp,expiretime) ==
0
)
goto
werr;
623
}
624
if
(swapped) decrRefCount(o);
625
}
626
dictReleaseIterator(di);
627
}
628
629
/*
Make sure data will not remain on the OS's output buffers
*/
630
fflush(fp);
631
aof_fsync(fileno(fp));
632
fclose(fp);
633
634
/*
Use RENAME to make sure the DB file is changed atomically only
635
* if the generate DB file is ok.
*/
636
if
(rename(tmpfile,filename) == -
1
) {
637
redisLog(REDIS_WARNING,
"
Error moving temp append only file on the final destination: %s
"
, strerror(errno));
638
unlink(tmpfile);
639
return
REDIS_ERR;
640
}
641
redisLog(REDIS_NOTICE,
"
SYNC append only file rewrite performed
"
);
642
return
REDIS_OK;
643
644
werr:
645
fclose(fp);
646
unlink(tmpfile);
647
redisLog(REDIS_WARNING,
"
Write error writing append only file on disk: %s
"
, strerror(errno));
648
if
(di) dictReleaseIterator(di);
649
return
REDIS_ERR;
650
}
651
652
/*
This is how rewriting of the append only file in background works:
653
*
654
* 1) The user calls BGREWRITEAOF
655
* 2) Redis calls this function, that forks():
656
* 2a) the child rewrite the append only file in a temp file.
657
* 2b) the parent accumulates differences in server.bgrewritebuf.
658
* 3) When the child finished '2a' exists.
659
* 4) The parent will trap the exit code, if it's OK, will append the
660
* data accumulated into server.bgrewritebuf into the temp file, and
661
* finally will rename(2) the temp file in the actual file name.
662
* The the new file is reopened as the new append only file. Profit!
663
*/
664
int
rewriteAppendOnlyFileBackground(
void
) {
665
pid_t childpid;
666
long
long
start;
667
668
if
(server.bgrewritechildpid != -
1
)
return
REDIS_ERR;
669
if
(server.vm_enabled) waitEmptyIOJobsQueue();
670
start =
ustime();
671
if
((childpid = fork()) ==
0
) {
672
char
tmpfile[
256
];
673
674
/*
Child
*/
675
if
(server.vm_enabled) vmReopenSwapFile();
676
if
(server.ipfd >
0
) close(server.ipfd);
677
if
(server.sofd >
0
) close(server.sofd);
678
snprintf(tmpfile,
256
,
"
temp-rewriteaof-bg-%d.aof
"
, (
int
) getpid());
679
if
(rewriteAppendOnlyFile(tmpfile) ==
REDIS_OK) {
680
_exit(
0
);
681
}
else
{
682
_exit(
1
);
683
}
684
}
else
{
685
/*
Parent
*/
686
server.stat_fork_time = ustime()-
start;
687
if
(childpid == -
1
) {
688
redisLog(REDIS_WARNING,
689
"
Can't rewrite append only file in background: fork: %s
"
,
690
strerror(errno));
691
return
REDIS_ERR;
692
}
693
redisLog(REDIS_NOTICE,
694
"
Background append only file rewriting started by pid %d
"
,childpid);
695
server.aofrewrite_scheduled =
0
;
696
server.bgrewritechildpid =
childpid;
697
updateDictResizePolicy();
698
/*
We set appendseldb to -1 in order to force the next call to the
699
* feedAppendOnlyFile() to issue a SELECT command, so the differences
700
* accumulated by the parent into server.bgrewritebuf will start
701
* with a SELECT statement and it will be safe to merge.
*/
702
server.appendseldb = -
1
;
703
return
REDIS_OK;
704
}
705
return
REDIS_OK;
/*
unreached
*/
706
}
707
708
void
bgrewriteaofCommand(redisClient *
c) {
709
if
(server.bgrewritechildpid != -
1
) {
710
addReplyError(c,
"
Background append only file rewriting already in progress
"
);
711
}
else
if
(server.bgsavechildpid != -
1
) {
712
server.aofrewrite_scheduled =
1
;
713
addReplyStatus(c,
"
Background append only file rewriting scheduled
"
);
714
}
else
if
(rewriteAppendOnlyFileBackground() ==
REDIS_OK) {
715
addReplyStatus(c,
"
Background append only file rewriting started
"
);
716
}
else
{
717
addReply(c,shared.err);
718
}
719
}
720
721
void
aofRemoveTempFile(pid_t childpid) {
722
char
tmpfile[
256
];
723
724
snprintf(tmpfile,
256
,
"
temp-rewriteaof-bg-%d.aof
"
, (
int
) childpid);
725
unlink(tmpfile);
726
}
727
728
/*
Update the server.appendonly_current_size filed explicitly using stat(2)
729
* to check the size of the file. This is useful after a rewrite or after
730
* a restart, normally the size is updated just adding the write length
731
* to the current lenght, that is much faster.
*/
732
void
aofUpdateCurrentSize(
void
) {
733
struct
redis_stat sb;
734
735
if
(redis_fstat(server.appendfd,&sb) == -
1
) {
736
redisLog(REDIS_WARNING,
"
Unable to check the AOF length: %s
"
,
737
strerror(errno));
738
}
else
{
739
server.appendonly_current_size =
sb.st_size;
740
}
741
}
742
743
/*
A background append only file rewriting (BGREWRITEAOF) terminated its work.
744
* Handle this.
*/
745
void
backgroundRewriteDoneHandler(
int
statloc) {
746
int
exitcode =
WEXITSTATUS(statloc);
747
int
bysignal =
WIFSIGNALED(statloc);
748
749
if
(!bysignal && exitcode ==
0
) {
750
int
newfd, oldfd;
751
int
nwritten;
752
char
tmpfile[
256
];
753
long
long
now =
ustime();
754
755
redisLog(REDIS_NOTICE,
756
"
Background AOF rewrite terminated with success
"
);
757
758
/*
Flush the differences accumulated by the parent to the
759
* rewritten AOF.
*/
760
snprintf(tmpfile,
256
,
"
temp-rewriteaof-bg-%d.aof
"
,
761
(
int
)server.bgrewritechildpid);
762
newfd = open(tmpfile,O_WRONLY|
O_APPEND);
763
if
(newfd == -
1
) {
764
redisLog(REDIS_WARNING,
765
"
Unable to open the temporary AOF produced by the child: %s
"
, strerror(errno));
766
goto
cleanup;
767
}
768
769
nwritten =
write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));
770
if
(nwritten !=
(signed)sdslen(server.bgrewritebuf)) {
771
if
(nwritten == -
1
) {
772
redisLog(REDIS_WARNING,
773
"
Error trying to flush the parent diff to the rewritten AOF: %s
"
, strerror(errno));
774
}
else
{
775
redisLog(REDIS_WARNING,
776
"
Short write trying to flush the parent diff to the rewritten AOF: %s
"
, strerror(errno));
777
}
778
close(newfd);
779
goto
cleanup;
780
}
781
782
redisLog(REDIS_NOTICE,
783
"
Parent diff successfully flushed to the rewritten AOF (%lu bytes)
"
, nwritten);
784
785
/*
The only remaining thing to do is to rename the temporary file to
786
* the configured file and switch the file descriptor used to do AOF
787
* writes. We don't want close(2) or rename(2) calls to block the
788
* server on old file deletion.
789
*
790
* There are two possible scenarios:
791
*
792
* 1) AOF is DISABLED and this was a one time rewrite. The temporary
793
* file will be renamed to the configured file. When this file already
794
* exists, it will be unlinked, which may block the server.
795
*
796
* 2) AOF is ENABLED and the rewritten AOF will immediately start
797
* receiving writes. After the temporary file is renamed to the
798
* configured file, the original AOF file descriptor will be closed.
799
* Since this will be the last reference to that file, closing it
800
* causes the underlying file to be unlinked, which may block the
801
* server.
802
*
803
* To mitigate the blocking effect of the unlink operation (either
804
* caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
805
* use a background thread to take care of this. First, we
806
* make scenario 1 identical to scenario 2 by opening the target file
807
* when it exists. The unlink operation after the rename(2) will then
808
* be executed upon calling close(2) for its descriptor. Everything to
809
* guarantee atomicity for this switch has already happened by then, so
810
* we don't care what the outcome or duration of that close operation
811
* is, as long as the file descriptor is released again.
*/
812
if
(server.appendfd == -
1
) {
813
/*
AOF disabled
*/
814
815
/*
Don't care if this fails: oldfd will be -1 and we handle that.
816
* One notable case of -1 return is if the old file does
817
* not exist.
*/
818
oldfd = open(server.appendfilename,O_RDONLY|
O_NONBLOCK);
819
}
else
{
820
/*
AOF enabled
*/
821
oldfd = -
1
;
/*
We'll set this to the current AOF filedes later.
*/
822
}
823
824
/*
Rename the temporary file. This will not unlink the target file if
825
* it exists, because we reference it with "oldfd".
*/
826
if
(rename(tmpfile,server.appendfilename) == -
1
) {
827
redisLog(REDIS_WARNING,
828
"
Error trying to rename the temporary AOF: %s
"
, strerror(errno));
829
close(newfd);
830
if
(oldfd != -
1
) close(oldfd);
831
goto
cleanup;
832
}
833
834
if
(server.appendfd == -
1
) {
835
/*
AOF disabled, we don't need to set the AOF file descriptor
836
* to this new file, so we can close it.
*/
837
close(newfd);
838
}
else
{
839
/*
AOF enabled, replace the old fd with the new one.
*/
840
oldfd =
server.appendfd;
841
server.appendfd =
newfd;
842
if
(server.appendfsync ==
APPENDFSYNC_ALWAYS)
843
aof_fsync(newfd);
844
else
if
(server.appendfsync ==
APPENDFSYNC_EVERYSEC)
845
aof_background_fsync(newfd);
846
server.appendseldb = -
1
;
/*
Make sure SELECT is re-issued
*/
847
aofUpdateCurrentSize();
848
server.auto_aofrewrite_base_size =
server.appendonly_current_size;
849
850
/*
Clear regular AOF buffer since its contents was just written to
851
* the new AOF from the background rewrite buffer.
*/
852
sdsfree(server.aofbuf);
853
server.aofbuf =
sdsempty();
854
}
855
856
redisLog(REDIS_NOTICE,
"
Background AOF rewrite successful
"
);
857
858
/*
Asynchronously close the overwritten AOF.
*/
859
if
(oldfd != -
1
) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(
void
*)(
long
)oldfd,NULL,NULL);
860
861
redisLog(REDIS_VERBOSE,
862
"
Background AOF rewrite signal handler took %lldus
"
, ustime()-
now);
863
}
else
if
(!bysignal && exitcode !=
0
) {
864
redisLog(REDIS_WARNING,
865
"
Background AOF rewrite terminated with error
"
);
866
}
else
{
867
redisLog(REDIS_WARNING,
868
"
Background AOF rewrite terminated by signal %d
"
,
869
WTERMSIG(statloc));
870
}
871
872
cleanup:
873
sdsfree(server.bgrewritebuf);
874
server.bgrewritebuf =
sdsempty();
875
aofRemoveTempFile(server.bgrewritechildpid);
876
server.bgrewritechildpid = -
1
;
877
}
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

