作者在bio.c的頭注釋中對(duì)設(shè)計(jì)進(jìn)行了詳細(xì)的介紹
/*
Background I/O service for Redis.
這個(gè)文件是redis后臺(tái)IO服務(wù)的實(shí)現(xiàn)
*
* This file implements operations that we need to perform in the background.
* Currently there is only a single operation, that is a background close(2)
* system call. This is needed as when the process is the last owner of a
* reference to a file closing it means unlinking it, and the deletion of the
* file is slow, blocking the server.
這個(gè)文件負(fù)責(zé)我們需要在后臺(tái)執(zhí)行的操作?,F(xiàn)在redis的版本中只有一類(lèi)的操作,后臺(tái)的close 系統(tǒng)調(diào)用。
為了避免一個(gè)文件最后的owner在執(zhí)行close操作帶來(lái)的unlink使得阻塞server,將這類(lèi)操作用單獨(dú)的后臺(tái)線程來(lái)執(zhí)行
*
* In the future we'll either continue implementing new things we need or
* we'll switch to libeio. However there are probably long term uses for this
* file as we may want to put here Redis specific background tasks (for instance
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
* implementation).
*
* DESIGN
* ------
*
* The design is trivial, we have a structure representing a job to perform
* and a different thread and job queue for every job type.
* Every thread wait for new jobs in its queue, and process every job
* sequentially.
每種作業(yè)類(lèi)型一個(gè)queue。每個(gè)線程在它的queue里等待新的job到來(lái)。并且按照FIFO的順序處理作業(yè)。
*
* Jobs of the same type are guaranteed to be processed from the least
* recently inserted to the most recently inserted (older jobs processed
* first).
*
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
作業(yè)完成后,其creator無(wú)法得到通知。
*/
現(xiàn)在的兩類(lèi)作業(yè)類(lèi)型:1.close 2.aof_fsync
/*
Background job opcodes
*/
#define
REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define
REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
1
#include
"
redis.h
"
2
#include
"
bio.h
"
3
//使用互斥量+條件變量,作為線程的保護(hù)條件
4
static
pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
5
static
pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
//兩類(lèi)作業(yè)的隊(duì)列
6
static
list *
bio_jobs[REDIS_BIO_NUM_OPS];
7
/*
The following array is used to hold the number of pending jobs for every
8
* OP type. This allows us to export the bioPendingJobsOfType() API that is
9
* useful when the main thread wants to perform some operation that may involve
10
* objects shared with the background thread. The main thread will just wait
11
* that there are no longer jobs of this type to be executed before performing
12
* the sensible operation. This data is also useful for reporting.
*/
13
static
unsigned
long
long
bio_pending[REDIS_BIO_NUM_OPS];
14
15
/*
This structure represents a background Job. It is only used locally to this
16
* file as the API deos not expose the internals at all.
*/
17
struct
bio_job {
18
time_t time;
/*
Time at which the job was created.
*/
19
/*
Job specific arguments pointers. If we need to pass more than three
20
* arguments we can just pass a pointer to a structure or alike.
*/
21
void
*arg1, *arg2, *
arg3;
22
};
23
24
void
*bioProcessBackgroundJobs(
void
*
arg);
25
26
/*
Make sure we have enough stack to perform all the things we do in the
27
* main thread.
*/
28
#define
REDIS_THREAD_STACK_SIZE (1024*1024*4)
29
30
/*
Initialize the background system, spawning the thread.
*/
31
void
bioInit(
void
) {
32
pthread_attr_t attr;
33
pthread_t thread;
34
size_t stacksize;
35
int
j;
36
37
/*
Initialization of state vars and objects
*/
38
for
(j =
0
; j < REDIS_BIO_NUM_OPS; j++
) {
39
pthread_mutex_init(&
bio_mutex[j],NULL);
40
pthread_cond_init(&
bio_condvar[j],NULL);
41
bio_jobs[j] =
listCreate();
42
bio_pending[j] =
0
;
43
}
44
45
/*
Set the stack size as by default it may be small in some system
*/
46
pthread_attr_init(&
attr);
47
pthread_attr_getstacksize(&attr,&
stacksize);
48
if
(!stacksize) stacksize =
1
;
/*
The world is full of Solaris Fixes
*/
49
while
(stacksize < REDIS_THREAD_STACK_SIZE) stacksize *=
2
;
50
pthread_attr_setstacksize(&
attr, stacksize);
51
52
/*
Ready to spawn our threads. We use the single argument the thread
53
* function accepts in order to pass the job ID the thread is
54
* responsible of.
*/
55
for
(j =
0
; j < REDIS_BIO_NUM_OPS; j++
) {
56
void
*arg = (
void
*)(unsigned
long
) j;
57
if
(pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) !=
0
) {
58
redisLog(REDIS_WARNING,
"
Fatal: Can't initialize Background Jobs.
"
);
59
exit(
1
);
60
}
61
}
62
}
63
64
void
bioCreateBackgroundJob(
int
type,
void
*arg1,
void
*arg2,
void
*
arg3) {
65
struct
bio_job *job = zmalloc(
sizeof
(*
job));
66
67
job->time =
time(NULL);
68
job->arg1 =
arg1;
69
job->arg2 =
arg2;
70
job->arg3 =
arg3;
71
pthread_mutex_lock(&
bio_mutex[type]);
72
listAddNodeTail(bio_jobs[type],job);
73
bio_pending[type]++
;
74
pthread_cond_signal(&
bio_condvar[type]);
75
pthread_mutex_unlock(&
bio_mutex[type]);
76
}
77
78
void
*bioProcessBackgroundJobs(
void
*
arg) {
79
struct
bio_job *
job;
80
unsigned
long
type = (unsigned
long
) arg;
81
82
pthread_detach(pthread_self());
83
pthread_mutex_lock(&
bio_mutex[type]);
84
while
(
1
) {
85
listNode *
ln;
86
87
/*
The loop always starts with the lock hold.
*/
88
if
(listLength(bio_jobs[type]) ==
0
) {
89
pthread_cond_wait(&bio_condvar[type],&
bio_mutex[type]);
90
continue
;
91
}
92
/*
Pop the job from the queue.
*/
93
ln =
listFirst(bio_jobs[type]);
94
job = ln->
value;
95
/*
It is now possible to unlock the background system as we know have
96
* a stand alone job structure to process.
*/
97
pthread_mutex_unlock(&
bio_mutex[type]);
98
99
/*
Process the job accordingly to its type.
*/
100
if
(type ==
REDIS_BIO_CLOSE_FILE) {
101
close((
long
)job->
arg1);
102
}
else
if
(type ==
REDIS_BIO_AOF_FSYNC) {
103
aof_fsync((
long
)job->
arg1);
104
}
else
{
105
redisPanic(
"
Wrong job type in bioProcessBackgroundJobs().
"
);
106
}
107
zfree(job);
108
109
/*
Lock again before reiterating the loop, if there are no longer
110
* jobs to process we'll block again in pthread_cond_wait().
*/
111
pthread_mutex_lock(&
bio_mutex[type]);
112
listDelNode(bio_jobs[type],ln);
113
bio_pending[type]--
;
114
}
115
}
116
117
/*
Return the number of pending jobs of the specified type.
*/
118
unsigned
long
long
bioPendingJobsOfType(
int
type) {
119
unsigned
long
long
val;
120
pthread_mutex_lock(&
bio_mutex[type]);
121
val =
bio_pending[type];
122
pthread_mutex_unlock(&
bio_mutex[type]);
123
return
val;
124
}
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元

