我的新浪微博: http://weibo.com/freshairbrucewoo 。
歡迎大家相互交流,共同提高技術(shù)。
?
第三節(jié)、 rpc 通信過程分析
前面兩個小節(jié)分別對rpc服務(wù)端和客戶端的建立流程做了詳細(xì)的分析,也就是說rpc客戶端和服務(wù)器端已經(jīng)能夠進(jìn)行正常的通信了(rpc客戶端已經(jīng)通過connect鏈接上rpc服務(wù)器了),那么這一小節(jié)主要根據(jù)一個實際的例子來分析一個完整的rpc通信過程。
下面以客戶端創(chuàng)建邏輯卷(volume)為例來分析rpc的通信過程,就以下面這個客戶端的命令開始:
gluster?volume?create?test-volume?server3:/exp3?server4:/exp4
先簡單看看glusterfs的客戶端是怎樣開始提交rpc請求的,提交準(zhǔn)備過程流程圖如下:
?
從上面的流程圖可以看出真正開始提交 rpc 請求調(diào)用還是從具體命令的回調(diào)函數(shù)開始發(fā)起的,上面的流程圖主要展示的是準(zhǔn)備工作,下面從具體命令的回調(diào)函數(shù)開始分析,這里分析的實例是創(chuàng)建邏輯卷的命令,執(zhí)行的函數(shù)是 cli_cmd_volume_create_cbk ,主要實現(xiàn)代碼如下:
?
1
proc = &cli_rpc_prog->proctable[GLUSTER_CLI_CREATE_VOLUME];
//
從rpc程序表中選擇對應(yīng)函數(shù)
2
3
frame = create_frame (THIS, THIS->ctx->pool);
//
創(chuàng)建幀
4
5
ret = cli_cmd_volume_create_parse (words, wordcount, &options);
//
創(chuàng)建邏輯卷的命令解析
6
7
if
(proc->
fn) {
8
9
ret = proc->fn (frame, THIS, options);
//
執(zhí)行命令的回調(diào)函數(shù)
10
11
}
12
13
if
(ret) {
14
15
cli_cmd_sent_status_get (&sent);
//
得到命令發(fā)送狀態(tài)
16
17
if
((sent ==
0
) && (parse_error ==
0
))
18
19
cli_out (
"
Volume create failed
"
);
//
如果失敗,錯誤提示
20
21
}
?
首先選擇對應(yīng)命令的 rpc 客戶端創(chuàng)建邏輯卷的命令函數(shù),然后解析命令以后執(zhí)行相應(yīng)的創(chuàng)建邏輯卷的 rpc 函數(shù),下面是對應(yīng)的函數(shù)存放表項:
1
[GLUSTER_CLI_CREATE_VOLUME] = {
"
CREATE_VOLUME
"
, gf_cli3_1_create_volume}
?
所以真正的提交函數(shù)是 gf_cli3_1_create_volume 函數(shù),繼續(xù)分析這個函數(shù),主要實現(xiàn)代碼如下:
1
ret = cli_cmd_submit (&
req, frame, cli_rpc_prog, GLUSTER_CLI_CREATE_VOLUME, NULL,
2
3
gf_xdr_from_cli_create_vol_req,
this
, gf_cli3_1_create_volume_cbk);
?
主要代碼也只有一行,其余代碼就是為了這一行的函數(shù)調(diào)用做相應(yīng)參數(shù)準(zhǔn)備的,這一行的這個函數(shù)就是所有客戶端命令提交 rpc 請求服務(wù)的實現(xiàn)函數(shù),只是提交的數(shù)據(jù)不同而已!下面重點分析這個函數(shù),還是先看看主要代碼:
1
cli_cmd_lock ();
//
命令對象加鎖
2
3
cmd_sent =
0
;
//
初始化命令發(fā)送狀態(tài)標(biāo)示為0
4
5
ret = cli_submit_request (req, frame, prog, procnum, NULL, sfunc,
this
, cbkfn);
//
提交請求
6
7
if
(!
ret) {
8
9
cmd_sent =
1
;
//
標(biāo)示已經(jīng)發(fā)送
10
11
ret = cli_cmd_await_response ();
//
等待響應(yīng)
12
13
}
else
14
15
cli_cmd_unlock ();
//
不成功解鎖
?
在發(fā)送具體的 rpc 請求以前先鎖住命令對象,然后調(diào)用函數(shù) cli_submit_request? 把 rpc 請求發(fā)送出去(應(yīng)該是異步的),然后設(shè)置命令以發(fā)送標(biāo)志,并調(diào)用函數(shù) cli_cmd_await_response 等待響應(yīng)。繼續(xù)看提交 rpc 請求的函數(shù):
1
iobuf = iobuf_get (
this
->ctx->iobuf_pool);
//
從緩沖池取一個io緩存
2
3
if
(!
iobref) {
4
5
iobref = iobref_new ();
//
新建一個iobuf引用池
6
7
new_iobref =
1
;
//
標(biāo)志
8
9
}
10
11
iobref_add (iobref, iobuf);
//
把io緩存加入io緩存引用池
12
13
iov.iov_base = iobuf->ptr;
//
io向量基地址(供用戶使用的內(nèi)存)
14
15
iov.iov_len =
128
* GF_UNIT_KB;
//
大小
16
17
if
(req &&
sfunc) {
18
19
ret = sfunc (iov, req);
//
序列化為xdr格式數(shù)據(jù)(表示層數(shù)據(jù)格式)
20
21
iov.iov_len = ret;
//
序列化以后的長度
22
23
count =
1
;
//
計數(shù)初始化為1
24
25
}
26
27
ret = rpc_clnt_submit (global_rpc, prog, procnum, cbkfn, &iov, count,
//
提交客戶端rpc請求
28
29
NULL,
0
, iobref, frame, NULL,
0
, NULL,
0
, NULL);
?
Xdr 數(shù)據(jù)格式的轉(zhuǎn)換是調(diào)用函數(shù)庫實現(xiàn)的,不具體分析,需要明白的是經(jīng)過 sfunc? 函數(shù)調(diào)用以后就是 xdr 格式的數(shù)據(jù)了,最后根據(jù)轉(zhuǎn)化后的數(shù)據(jù)調(diào)用 rpc_clnt_submit 提交客戶端的 rpc 請求。繼續(xù)深入函數(shù):
1
rpcreq = mem_get (rpc->reqpool);
//
重rpc對象的請求對象池得到一個請求對象
2
3
if
(!
iobref) {
4
5
iobref = iobref_new ();
//
如果io緩存引用池為null就新建一個
6
7
new_iobref =
1
;
//
新建標(biāo)志
8
9
}
10
11
callid = rpc_clnt_new_callid (rpc);
//
新建一個rpc調(diào)用的id號
12
13
conn = &rpc->conn;
//
從rpc對象中取得鏈接對象
14
15
rpcreq->prog = prog;
//
賦值rpc請求對象的程序
16
17
rpcreq->procnum = procnum;
//
程序號
18
19
rpcreq->conn = conn;
//
鏈接對象
20
21
rpcreq->xid = callid;
//
調(diào)用id號
22
23
rpcreq->cbkfn = cbkfn;
//
回調(diào)函數(shù)
24
25
if
(proghdr) {
//
程序頭不為空
26
27
proglen += iov_length (proghdr, proghdrcount);
//
計算頭部長度加入程序消息總長度
28
29
}
30
31
if
(progpayload) {
32
33
proglen += iov_length (progpayload, progpayloadcount);
//
計算io向量的長度加入總長度
34
35
}
36
37
request_iob = rpc_clnt_record (rpc, frame, prog, procnum, proglen, &rpchdr, callid);
//
建立rpc記錄
38
39
iobref_add (iobref, request_iob);
//
添加rpc記錄的io緩存區(qū)到io緩存引用池
40
41
req.msg.rpchdr = &rpchdr;
//
rpc請求消息頭部
42
43
req.msg.rpchdrcount =
1
;
//
頭部數(shù)量
44
45
req.msg.proghdr = proghdr;
//
程序頭部
46
47
req.msg.proghdrcount = proghdrcount;
//
程序頭部數(shù)量
48
49
req.msg.progpayload = progpayload;
//
xdr格式數(shù)據(jù)
50
51
req.msg.progpayloadcount = progpayloadcount;
//
數(shù)量
52
53
req.msg.iobref = iobref;
//
io緩存引用池
54
55
req.rsp.rsphdr = rsphdr;
//
響應(yīng)頭部
56
57
req.rsp.rsphdr_count = rsphdr_count;
//
數(shù)量
58
59
req.rsp.rsp_payload = rsp_payload;
//
負(fù)載
60
61
req.rsp.rsp_payload_count = rsp_payload_count;
//
數(shù)量
62
63
req.rsp.rsp_iobref = rsp_iobref;
//
響應(yīng)緩存引用池
64
65
req.rpc_req = rpcreq;
//
rpc請求
66
67
pthread_mutex_lock (&conn->
lock
);
//
加鎖
68
69
{
70
71
if
(conn->connected ==
0
) {
//
還沒有建立連接
72
73
rpc_transport_connect (conn->trans, conn->config.remote_port);
//
建立連接
74
75
}
76
77
ret = rpc_transport_submit_request (rpc->conn.trans, &req);
//
提交傳輸層rpc請求
78
79
if
((ret >=
0
) &&
frame) {
80
81
gettimeofday (&conn->last_sent, NULL);
//
設(shè)置最后發(fā)送時間
82
83
__save_frame (rpc, frame, rpcreq);
//
保存幀到隊列
84
85
}
86
87
}
88
89
pthread_mutex_unlock (&conn->
lock
);
//
解鎖
?
經(jīng)過上面的代碼,現(xiàn)在數(shù)據(jù)已經(jīng)到達(dá)傳輸層,所以現(xiàn)在就開始調(diào)用傳輸層的 rpc 請求發(fā)送函數(shù) rpc_transport_submit_request ,代碼如下:
1
ret =
this
->ops->submit_request (
this
, req);
?
這里采用函數(shù)指針的方式進(jìn)行調(diào)用的,具體的傳輸協(xié)議調(diào)用具體的傳輸函數(shù),這些函數(shù)都是在裝載協(xié)議庫的時候已經(jīng)賦值具體函數(shù)的實現(xiàn)了,分析的是 tcp ,所以看看 tcp 的發(fā)送函數(shù):
1
struct
rpc_transport_ops tops =
{
2
3
......
4
5
.submit_request =
socket_submit_request,
6
7
......
8
9
};
?
從上面可以看出 tcp 的發(fā)送函數(shù)是 socket_submit_request ,主要實現(xiàn)代碼如下:
1
pthread_mutex_lock (&priv->
lock
);
//
加鎖
2
3
{
4
5
priv->submit_log =
0
;
//
提交標(biāo)志初始化為0
6
7
entry = __socket_ioq_new (
this
, &req->msg);
//
根據(jù)請求對象的消息新建一個io請求隊列
8
9
if
(list_empty (&priv->ioq)) {
//
判斷提交io請求隊列是否為空
10
11
ret = __socket_ioq_churn_entry (
this
, entry);
//
開始依次提交傳輸層的io請求
12
13
if
(ret ==
0
)
14
15
need_append =
0
;
//
需要添加到entry鏈表
16
17
if
(ret >
0
)
18
19
need_poll_out =
1
;
//
需要注冊可寫事件
20
21
}
22
23
if
(need_append) {
24
25
list_add_tail (&entry->list, &priv->ioq);
//
添加到entry的鏈表
26
27
}
28
29
if
(need_poll_out) {
//
注冊可寫事件
30
31
priv->idx = event_select_on (ctx->event_pool, priv->sock, priv->idx, -
1
,
1
);
32
33
}
34
35
}
36
37
pthread_mutex_unlock (&priv->
lock
);
//
解鎖
?
這段加鎖的代碼就是完成整個 rpc 請求信息的發(fā)送,如果沒有發(fā)送完畢就在注冊一個可寫事件啟動下一次請求,到此客戶端的 rpc 請求已經(jīng)發(fā)送完畢,就開始等待服務(wù)器的響應(yīng)。
下面就看看 rpc 服務(wù)器端怎么響應(yīng)客戶端的請求,并根據(jù)相應(yīng)的請求命令做怎樣的處理。在分析 rpc 服務(wù)啟動的時候知道注冊了監(jiān)聽事件,監(jiān)聽事件的處理函數(shù)是 socket_server_event_handler ,它的主要實現(xiàn)代碼如下:
?
1
pthread_mutex_lock (&priv->
lock
);
2
3
{
4
5
if
(poll_in) {
//
連接到來是可讀事件
6
7
new_sock = accept (priv->sock, SA (&new_sockaddr), &addrlen);
//
接收客戶端連接
8
9
if
(!priv->bio) {
//
設(shè)置非阻塞
10
11
ret =
__socket_nonblock (new_sock);
12
13
}
14
15
if
(priv->nodelay) {
//
設(shè)置無延遲發(fā)送
16
17
ret =
__socket_nodelay (new_sock);
18
19
}
20
21
if
(priv->keepalive) {
//
設(shè)置保持連接
22
23
ret = __socket_keepalive (new_sock, priv->keepaliveintvl, priv->
keepaliveidle);
24
25
}
26
27
//
為連接對象
28
29
new_trans = GF_CALLOC (
1
,
sizeof
(*
new_trans), gf_common_mt_rpc_trans_t);
30
31
new_trans->name = gf_strdup (
this
->name);
//
賦值名稱
32
33
memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, addrlen);
//
賦值地址信息
34
35
new_trans->peerinfo.sockaddr_len = addrlen;
//
長度
36
37
new_trans->myinfo.sockaddr_len =
sizeof
(new_trans->
myinfo.sockaddr);
38
39
ret = getsockname (new_sock, SA (&new_trans->
myinfo.sockaddr),
40
41
&new_trans->myinfo.sockaddr_len);
//
得到新socket的地址信息
42
43
get_transport_identifiers (new_trans);
44
45
socket_init (new_trans);
//
初始化新的傳輸層對象(新的socket)
46
47
pthread_mutex_lock (&new_priv->
lock
);
48
49
{
50
51
new_priv->connected =
1
;
//
連接已經(jīng)建立
52
53
rpc_transport_ref (new_trans);
//
傳輸對象引用計數(shù)加1
54
55
new_priv->idx = event_register (ctx->event_pool, new_sock,
//
注冊可讀事件
56
57
socket_event_handler, new_trans,
1
,
0
);
58
59
}
60
61
pthread_mutex_unlock (&new_priv->
lock
);
62
63
//
執(zhí)行傳輸對象注冊的通知函數(shù),通知已經(jīng)接受客戶端連接請求
64
65
ret = rpc_transport_notify (
this
, RPC_TRANSPORT_ACCEPT, new_trans);
66
67
}
68
69
}
70
71
pthread_mutex_unlock (&priv->
lock
);
?
上面的代碼主要就是處理客戶端的連接請求,然后在新的 socket 上注冊可讀事件(準(zhǔn)備讀取客戶端發(fā)送來的 rpc 請求信息),并且執(zhí)行通知函數(shù)做相應(yīng)的處理。注冊的可讀事件的處理函數(shù)是 socket_event_handler ,主要是實現(xiàn)代碼如下:
?
1
if
(!priv->connected) {
//
如果連接還沒有完成就繼續(xù)完成連接,因為連接是異步的可能沒有立即完成
2
3
ret = socket_connect_finish (
this
);
4
5
}
6
7
if
(!ret && poll_out) {
//
處理可寫事件
8
9
ret = socket_event_poll_out (
this
);
10
11
}
12
13
if
(!ret && poll_in) {
//
處理可讀事件
14
15
ret = socket_event_poll_in (
this
);
16
17
}
?
客戶端的連接請求對于服務(wù)器來說是可讀事件,所以執(zhí)行的 socket_event_poll_in 函數(shù),當(dāng)服務(wù)器需要發(fā)送響應(yīng)信息到 rpc 客戶端的時候就會執(zhí)行可寫事件處理函數(shù)。繼續(xù)分析接收客戶端請求信息的處理函數(shù) socket_event_poll_in 主要代碼如下:
??
1
ret = socket_proto_state_machine (
this
, &pollin);
//
根據(jù)rpc服務(wù)記錄的狀態(tài)做相應(yīng)處理
2
3
if
(pollin !=
NULL) {
4
5
ret = rpc_transport_notify (
this
, RPC_TRANSPORT_MSG_RECEIVED, pollin);
//
執(zhí)行通知函數(shù)
6
7
rpc_transport_pollin_destroy (pollin);
//
完成處理就銷毀資源
8
9
}
?
上面的代碼主要還是調(diào)用其它函數(shù)繼續(xù)處理 rpc 客戶端的請求信息,然后執(zhí)行通知函數(shù)通知傳輸對象消息已經(jīng)被接收,最后銷毀傳輸層相關(guān)不在需要的資源。處理具體請求信息的實現(xiàn)是在函數(shù) socket_proto_state_machine ,而這個函數(shù)又調(diào)用 __socket_proto_state_machine 來處理,所以看看這個函數(shù)實現(xiàn)功能的主要代碼:
1
while
(priv->incoming.record_state != SP_STATE_COMPLETE) {
//
直到rpc服務(wù)記錄狀態(tài)完成為止
2
3
switch
(priv->incoming.record_state) {
//
根據(jù)現(xiàn)在rpc服務(wù)記錄的狀態(tài)做相應(yīng)處理
4
5
case
SP_STATE_NADA:
//
開始狀態(tài)
6
7
iobuf = iobuf_get (
this
->ctx->iobuf_pool);
//
取得一個io緩存
8
9
priv->incoming.record_state = SP_STATE_READING_FRAGHDR;
//
改變狀態(tài)為讀取頭部
10
11
case
SP_STATE_READING_FRAGHDR:
//
讀取頭部信息
12
13
ret = __socket_readv (
this
, priv->incoming.pending_vector,
1
,
//
讀取信息
14
15
&priv->
incoming.pending_vector,
16
17
&priv->
incoming.pending_count, NULL);
18
19
if
(ret >
0
) {
//
讀取了部分頭部信息
20
21
}
22
23
if
(ret ==
0
) {
//
讀取了所有頭部信息,繼續(xù)下一步的處理
24
25
priv->incoming.record_state = SP_STATE_READ_FRAGHDR;
//
改變?yōu)橄乱徊?
26
27
}
28
29
case
SP_STATE_READ_FRAGHDR:
//
處理已經(jīng)讀取的頭部信息
30
31
priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr);
//
轉(zhuǎn)換頭部信息為主機字節(jié)
32
33
priv->incoming.record_state = SP_STATE_READING_FRAG;
//
轉(zhuǎn)化為讀取幀數(shù)據(jù)狀態(tài)
34
35
priv->incoming.total_bytes_read += RPC_FRAGSIZE(priv->incoming.fraghdr);
//
字節(jié)數(shù)
36
37
case
SP_STATE_READING_FRAG:
//
讀取所有的數(shù)據(jù)
38
39
ret = __socket_read_frag (
this
);
//
讀取所有幀數(shù)據(jù)
40
41
priv->incoming.frag.bytes_read =
0
;
42
43
if
(!RPC_LASTFRAG (priv->incoming.fraghdr)) {
//
是否為最后一幀數(shù)據(jù)
44
45
priv->incoming.record_state = SP_STATE_READING_FRAGHDR;
//
不是
46
47
break
;
//
退出循環(huán),從新讀取頭部信息
48
49
}
50
51
if
(pollin !=
NULL) {
52
53
int
count =
0
;
//
計數(shù)
54
55
priv->incoming.iobuf_size = priv->
incoming.total_bytes_read
56
57
- priv->incoming.payload_vector.iov_len;
//
計算io緩存大小
58
59
memset (vector,
0
,
sizeof
(vector));
//
io向量清零
60
61
if
(priv->incoming.iobref == NULL) {
//
io緩存引用池為null就新建一個
62
63
priv->incoming.iobref =
iobref_new ();
64
65
}
66
67
vector[count].iov_base = iobuf_ptr (priv->incoming.iobuf);
//
取io緩存基地址
68
69
vector[count].iov_len = priv->incoming.iobuf_size;
//
io緩存長度
70
71
iobref = priv->incoming.iobref;
//
io緩存引用池
72
73
count++;
//
計數(shù)加1
74
75
if
(priv->incoming.payload_vector.iov_base != NULL) {
//
負(fù)載向量不為null
76
77
vector[count] = priv->incoming.payload_vector;
//
保存負(fù)載io向量
78
79
count++;
//
計數(shù)加1
80
81
}
82
83
//
新建一個傳輸層可取對象
84
85
*pollin = rpc_transport_pollin_alloc (
this
, vector, count, priv->
incoming.iobuf,
86
87
iobref, priv->
incoming.request_info);
88
89
iobuf_unref (priv->incoming.iobuf);
//
io緩存引用計算減1
90
91
priv->incoming.iobuf = NULL;
//
清零
92
93
if
(priv->incoming.msg_type == REPLY)
//
消息類型是回復(fù)
94
95
(*pollin)->is_reply =
1
;
//
設(shè)置回復(fù)標(biāo)志
96
97
priv->incoming.request_info = NULL;
//
請求信息清零
98
99
}
100
101
priv->incoming.record_state = SP_STATE_COMPLETE;
//
設(shè)置為完成狀態(tài)
102
103
break
;
104
105
}
106
107
}
108
109
if
(priv->incoming.record_state == SP_STATE_COMPLETE) {
//
如果rpc請求記錄為完成狀態(tài)
110
111
priv->incoming.record_state = SP_STATE_NADA;
//
重新初始化為開始狀態(tài)
112
113
__socket_reset_priv (priv);
//
復(fù)位私有數(shù)據(jù)對象
114
115
}
?
整個處理過程分為了幾個階段,而且每一個階段只處理相應(yīng)的事情,然后就進(jìn)入下一個階段,因為前幾個階段 case 語言都是不帶 break 的,所以直接進(jìn)入下一個階段,最終達(dá)到完成狀態(tài)就退出循環(huán),一個完成的處理過程其實就只需要一次循環(huán)就解決了。當(dāng)所有 rpc 請求消息都已經(jīng)接收以后就調(diào)用通知函數(shù)(在傳輸對象上注冊的通知函數(shù))通知傳輸對象消息已經(jīng)接收,由 rpc 服務(wù)器的初始化過程我們知道注冊的傳輸對象通知函數(shù)是 rpcsvc_notify? ,這個函數(shù)主要實現(xiàn)代碼如下:
?
1
switch
(
event
) {
2
3
case
RPC_TRANSPORT_ACCEPT:
//
rpc請求已經(jīng)被接收處理
4
5
new_trans =
data;
6
7
ret = rpcsvc_accept (svc, trans, new_trans);
//
處理函數(shù)
8
9
break
;
10
11
case
RPC_TRANSPORT_DISCONNECT:
//
斷開連接消息
12
13
ret = rpcsvc_handle_disconnect (svc, trans);
//
處理函數(shù)
14
15
break
;
16
17
case
RPC_TRANSPORT_MSG_RECEIVED:
//
消息已經(jīng)接收
18
19
msg =
data;
20
21
ret = rpcsvc_handle_rpc_call (svc, trans, msg);
//
rpc調(diào)用處理函數(shù)
22
23
break
;
24
25
case
RPC_TRANSPORT_MSG_SENT:
//
消息已經(jīng)發(fā)生,不需要處理
26
27
break
;
28
29
case
RPC_TRANSPORT_CONNECT:
//
已經(jīng)連接
30
31
break
;
32
33
case
RPC_TRANSPORT_CLEANUP:
//
清零消息
34
35
listener = rpcsvc_get_listener (svc, -
1
, trans->listener);
//
得到對應(yīng)的監(jiān)聽器對象
36
37
rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY, trans);
//
通知上層
38
39
break
;
40
41
case
RPC_TRANSPORT_MAP_XID_REQUEST:
42
43
break
;
44
45
}
?
傳輸對象注冊的通知函數(shù)會根據(jù)傳遞過來的信息類型做相應(yīng)的處理,這里傳遞過來的消息是消息已經(jīng)接收,它的處理就是開始執(zhí)行 rpc 調(diào)用了,執(zhí)行的函數(shù)是 rpcsvc_handle_rpc_call ,它的主要實現(xiàn)代碼如下:
?
1
req = rpcsvc_request_create (svc, trans, msg);
//
創(chuàng)建一個rpc服務(wù)請求對象
2
3
if
(!rpcsvc_request_accepted (req))
//
判斷rpc請求是否被接受
4
5
;
6
7
actor = rpcsvc_program_actor (req);
//
根據(jù)請求對象取得rpc過程調(diào)用對象
8
9
if
(actor && (req->rpc_err == SUCCESS)) {
//
rpc過程調(diào)用對象不為null并且請求信息是成功的
10
11
THIS = svc->mydata;
//
取得xlator對象
12
13
if
(req->count ==
2
) {
//
請求的數(shù)量等于2
14
15
if
(actor->vector_actor) {
//
向量過程不為null,就執(zhí)行向量處理函數(shù)
16
17
ret = actor->vector_actor (req, &req->msg[
1
],
1
, req->
iobref);
18
19
}
else
{
20
21
rpcsvc_request_seterr (req, PROC_UNAVAIL);
//
出錯,不可用的函數(shù)
22
23
ret = RPCSVC_ACTOR_ERROR;
//
調(diào)用過程出錯
24
25
}
26
27
}
else
if
(actor->
actor) {
28
29
ret = actor->actor (req);
//
調(diào)用rpc請求函數(shù)
30
31
}
32
33
}
34
35
if
(ret == RPCSVC_ACTOR_ERROR) {
//
出錯
36
37
ret = rpcsvc_error_reply (req);
//
回復(fù)客戶端rpc請求處理出錯
38
39
}
?
上面代碼首先根據(jù)接收到的信息建立一個請求對象,然后根據(jù)建立的請求對象判斷是都已經(jīng)成功接納此次 rpc 請求調(diào)用,如果是就繼續(xù)執(zhí)行函數(shù) rpcsvc_program_actor ,這個函數(shù)會根據(jù)程序號、函數(shù)號等信息查找對應(yīng)的 rpc 請求的遠(yuǎn)程過程調(diào)用,如果找到就執(zhí)行相應(yīng)的函數(shù)調(diào)用。我們分析的是客戶端發(fā)送一條創(chuàng)建邏輯卷的命令道服務(wù)器端,根據(jù)服務(wù)器端在啟動初始化的過程中注冊的程序集中我們能夠找到如下一條對應(yīng)的函數(shù)信息:
1
[GLUSTER_CLI_CREATE_VOLUME] = {
"
CLI_CREATE_VOLUME
"
, GLUSTER_CLI_CREATE_VOLUME, glusterd_handle_create_volume, NULL,NULL},
?
所以服務(wù)器端就會調(diào)用函數(shù) glusterd_handle_create_volume ,如果在處理 rpc 請求的過程中遇到錯誤就會向客戶端發(fā)送一個錯誤信息的相應(yīng)消息。當(dāng)然如果調(diào)用成功的話也同樣會返回給客戶端一個相應(yīng)的結(jié)果信息。客戶端會接收服務(wù)器端的回復(fù),然后根據(jù)消息內(nèi)容做相應(yīng)的處理,如:創(chuàng)建成功等提示信息。這樣一次完整的 rpc 通信就完成了。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

