最近要對一個用libevent寫的C/C++項目進行修改,要改成多線程的,故做了一些學習和研究。
libevent是一個用C語言寫的開源的一個庫。它對socket編程里的epoll/select等功能進行了封裝,并且使用了一些設計模式(比如反應堆模式),用事件機制來簡化了socket編程。libevent的好處網上有很多,但是初學者往往都看不懂。我打個比方吧, 1) 假設有N個客戶端同時往服務端通過socket寫數據,用了libevent之后,你的server程序里就不用再使用epoll或是select來判斷都哪些socket的緩沖區里已經收到了客戶端寫來的數據。當某個socket的緩沖區里有可讀數據時,libevent會自動觸發一個“讀事件”,通過這個“讀事件”來調用相應的代碼來讀取socket緩沖區里的數據即可。換句話說,libevent自己調用select()或是epoll的函數來判斷哪個緩沖區可讀了,只要可讀了,就自動調用相應的處理程序。 2) 對于“寫事件”,libevent會監控某個socket的緩沖區是否可寫(一般情況下,只要緩沖區沒滿就可寫),只要可寫,就會觸發“寫事件”,通過“寫事件”來調用相應的函數,將數據寫到socket里。
以上兩個例子分別從“讀”和“寫”兩方面簡介了一下,可能不十分準確(但十分準確的描述往往會讓人看不懂)。
以下兩個鏈接關于libevent的剖析比較詳細,想學習libevent最好看一下。
1) sparkliang的專欄 ? ? ? ? 2) 魚思故淵的專欄
=========關于libevent使用多線程的討論=========================
網上很多資料說libevent不支持多線程,也有很多人說libevent可以支持多線程。究竟值不支持呢?我的答案是: 得看你的多線程是怎么寫的,如何跟libevent結合的。
1)可以肯定的是,libevent的 信號事件 是不支持多線程的(因為源碼里用了個全局變量)。可以看這篇文章(http://blog.csdn.net/sparkliang/article/details/5306809)。(注:libevent里有“超時事件”,“IO事件”,“信號事件”。)
2)對于不同的線程,使用不同的base,是可以的。
3)如果不同的線程使用相同的base呢?——如果在不同的線程里的事件都注冊到同一個base上,會有問題嗎?
(http://www.cnblogs.com/zzyoucan/p/3970578.html)這篇博客里提到說,不行!即使加鎖也不行。我最近稍微看了部分源碼,我的答案是:不加鎖會有并發問題,但如果對每個event_add(),event_del()等這些操作event的動作都用同一個臨界變量來加鎖,應該是沒問題的。——貌似也有點問題,如果某個事件沒有用event_set()設置為EV_PERSIST,當事件發生時,會被自動刪除。有可能線程a在刪除事件的時候,線程b卻在添加事件,這樣還是會出現并發問題。 最后的結論是——不行! 。
========本次實驗代碼邏輯的說明==========================
我采取的方案是對于不同的線程,使用不同的base。——即每個線程對應一個base,將線程里的事件注冊到線程的base上,而不是所有線程里的事件都用同一個base。
一 實驗需求描述:
1)寫一個client和server程序。多個client可以同時連接一個server;
2)client接收用戶在標準輸入的字符,發往server端;
3)server端收到后,再把收到的數據處理一下,返回給client;
4)client收到server返回的數據后,將其打印在終端上。
二 設計方案:
1. client:
1) client采用兩個線程,主線程接收用戶在終端上的輸入,并通過socket將用戶的輸入發往server。
2) 派生一個子線程,接收server返回來的數據,如果收到數據,就打印出來。
2. server:
在主線程里監聽client有沒有連接連過來,如果有,立馬accept出一個socket,并創建一個子線程,在子線程里接收client傳過來的數據,并對數據進行一些修改,然后將修改后的數據寫回到client端。
三 代碼實現
1. client代碼如下:
1
#include <iostream>
2
#include <sys/
select
.h>
3
#include <sys/socket.h>
4
#include <unistd.h>
5
#include <pthread.h>
6
#include <stdio.h>
7
#include <stdlib.h>
8
#include <sys/types.h>
9
#include <netinet/
in
.h>
10
#include <arpa/inet.h>
11
#include <
string
>
12
#include <
string
.h>
13
#include <
event
.h>
14
using
namespace
std;
15
16
#define
BUF_SIZE 1024
17
18
/*
*
19
* 連接到server端,如果成功,返回fd,如果失敗返回-1
20
*/
21
int
connectServer(
char
* ip,
int
port){
22
int
fd = socket( AF_INET, SOCK_STREAM,
0
);
23
cout<<
"
fd=
"
<<fd<<
endl;
24
if
(-
1
==
fd){
25
cout<<
"
Error, connectServer() quit
"
<<
endl;
26
return
-
1
;
27
}
28
struct
sockaddr_in remote_addr;
//
服務器端網絡地址結構體
29
memset(&remote_addr,
0
,
sizeof
(remote_addr));
//
數據初始化--清零
30
remote_addr.sin_family=AF_INET;
//
設置為IP通信
31
remote_addr.sin_addr.s_addr=inet_addr(ip);
//
服務器IP地址
32
remote_addr.sin_port=htons(port);
//
服務器端口號
33
int
con_result = connect(fd, (
struct
sockaddr*) &remote_addr,
sizeof
(
struct
sockaddr));
34
if
(con_result <
0
){
35
cout<<
"
Connect Error!
"
<<
endl;
36
close(fd);
37
return
-
1
;
38
}
39
cout<<
"
con_result=
"
<<con_result<<
endl;
40
return
fd;
41
}
42
43
void
on_read(
int
sock,
short
event
,
void
*
arg)
44
{
45
char
* buffer =
new
char
[BUF_SIZE];
46
memset(buffer,
0
,
sizeof
(
char
)*
BUF_SIZE);
47
//
--本來應該用while一直循環,但由于用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了
48
int
size =
read(sock, buffer, BUF_SIZE);
49
if
(
0
== size){
//
說明socket關閉
50
cout<<
"
read size is 0 for socket:
"
<<sock<<
endl;
51
struct
event
* read_ev = (
struct
event
*
)arg;
52
if
(NULL !=
read_ev){
53
event_del(read_ev);
54
free(read_ev);
55
}
56
close(sock);
57
return
;
58
}
59
cout<<
"
Received from server---
"
<<buffer<<
endl;
60
delete[]buffer;
61
}
62
63
void
* init_read_event(
void
*
arg){
64
long
long_sock = (
long
)arg;
65
int
sock = (
int
)long_sock;
66
//
-----初始化libevent,設置回調函數on_read()------------
67
struct
event_base*
base
=
event_base_new();
68
struct
event
* read_ev = (
struct
event
*)malloc(
sizeof
(
struct
event
));
//
發生讀事件后,從socket中取出數據
69
event_set(read_ev, sock, EV_READ|
EV_PERSIST, on_read, read_ev);
70
event_base_set(
base
, read_ev);
71
event_add(read_ev, NULL);
72
event_base_dispatch(
base
);
73
//
--------------
74
event_base_free(
base
);
75
}
76
/*
*
77
* 創建一個新線程,在新線程里初始化libevent讀事件的相關設置,并開啟event_base_dispatch
78
*/
79
void
init_read_event_thread(
int
sock){
80
pthread_t thread;
81
pthread_create(&thread,NULL,init_read_event,(
void
*
)sock);
82
pthread_detach(thread);
83
}
84
int
main() {
85
cout <<
"
main started
"
<< endl;
//
prints Hello World!!!
86
cout <<
"
Please input server IP:
"
<<
endl;
87
char
ip[
16
];
88
cin >>
ip;
89
cout <<
"
Please input port:
"
<<
endl;
90
int
port;
91
cin >>
port;
92
cout <<
"
ServerIP is
"
<<ip<<
"
,port=
"
<<port<<
endl;
93
int
socket_fd =
connectServer(ip, port);
94
cout <<
"
socket_fd=
"
<<socket_fd<<
endl;
95
init_read_event_thread(socket_fd);
96
//
--------------------------
97
char
buffer[BUF_SIZE];
98
bool
isBreak =
false
;
99
while
(!
isBreak){
100
cout <<
"
Input your data to server(\'q\' or \"quit\" to exit)
"
<<
endl;
101
cin >>
buffer;
102
if
(strcmp(
"
q
"
, buffer)==
0
|| strcmp(
"
quit
"
, buffer)==
0
){
103
isBreak=
true
;
104
close(socket_fd);
105
break
;
106
}
107
cout <<
"
Your input is
"
<<buffer<<
endl;
108
int
write_num =
write(socket_fd, buffer, strlen(buffer));
109
cout << write_num <<
"
characters written
"
<<
endl;
110
sleep(
2
);
111
}
112
cout<<
"
main finished
"
<<
endl;
113
return
0
;
114
}
1)在main()里先調用init_read_event_thread()來生成一個子線程,子線程里調用init_read_event()來將socket的讀事件注冊到libevent的base上,并調用libevent的event_base_dispatch()不斷地進行輪詢。一旦socket可讀,libevent就調用“讀事件”上綁定的on_read()函數來讀取數據。
2)在main()的主線程里,通過一個while循環來接收用戶從終端的輸入,并通過socket將用戶的輸入寫到server端。
-------------------------------------------------------------
2. server端代碼如下:
1
#include <iostream>
2
#include <sys/
select
.h>
3
#include <sys/socket.h>
4
#include <stdio.h>
5
#include <unistd.h>
6
#include <pthread.h>
7
#include <stdio.h>
8
#include <sys/types.h>
9
#include <netinet/
in
.h>
10
#include <arpa/inet.h>
11
#include <
string
>
12
#include <
string
.h>
13
#include <
event
.h>
14
#include <stdlib.h>
15
using
namespace
std;
16
17
#define
SERVER_IP "127.0.0.1"
18
#define
SERVER_PORT 9090
19
#define
BUF_SIZE 1024
20
21
struct
sock_ev_write{
//
用戶寫事件完成后的銷毀,在on_write()中執行
22
struct
event
*
write_ev;
23
char
*
buffer;
24
};
25
struct
sock_ev {
//
用于讀事件終止(socket斷開)后的銷毀
26
struct
event_base*
base
;
//
因為socket斷掉后,讀事件的loop要終止,所以要有base指針
27
struct
event
*
read_ev;
28
};
29
30
/*
*
31
* 銷毀寫事件用到的結構體
32
*/
33
void
destroy_sock_ev_write(
struct
sock_ev_write*
sock_ev_write_struct){
34
if
(NULL !=
sock_ev_write_struct){
35
//
event_del(sock_ev_write_struct->write_ev);
//
因為寫事件沒用EV_PERSIST,故不用event_del
36
if
(NULL != sock_ev_write_struct->
write_ev){
37
free(sock_ev_write_struct->
write_ev);
38
}
39
if
(NULL != sock_ev_write_struct->
buffer){
40
delete[]sock_ev_write_struct->
buffer;
41
}
42
free(sock_ev_write_struct);
43
}
44
}
45
46
47
/*
*
48
* 讀事件結束后,用于銷毀相應的資源
49
*/
50
void
destroy_sock_ev(
struct
sock_ev*
sock_ev_struct){
51
if
(NULL ==
sock_ev_struct){
52
return
;
53
}
54
event_del(sock_ev_struct->
read_ev);
55
event_base_loopexit(sock_ev_struct->
base
, NULL);
//
停止loop循環
56
if
(NULL != sock_ev_struct->
read_ev){
57
free(sock_ev_struct->
read_ev);
58
}
59
event_base_free(sock_ev_struct->
base
);
60
//
destroy_sock_ev_write(sock_ev_struct->sock_ev_write_struct);
61
free(sock_ev_struct);
62
}
63
int
getSocket(){
64
int
fd =socket( AF_INET, SOCK_STREAM,
0
);
65
if
(-
1
==
fd){
66
cout<<
"
Error, fd is -1
"
<<
endl;
67
}
68
return
fd;
69
}
70
71
void
on_write(
int
sock,
short
event
,
void
*
arg)
72
{
73
cout<<
"
on_write() called, sock=
"
<<sock<<
endl;
74
if
(NULL ==
arg){
75
cout<<
"
Error! void* arg is NULL in on_write()
"
<<
endl;
76
return
;
77
}
78
struct
sock_ev_write* sock_ev_write_struct = (
struct
sock_ev_write*
)arg;
79
80
char
buffer[BUF_SIZE];
81
sprintf(buffer,
"
fd=%d, received[%s]
"
, sock, sock_ev_write_struct->
buffer);
82
//
int write_num0 = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
83
//
int write_num = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
84
int
write_num =
write(sock, buffer, strlen(buffer));
85
destroy_sock_ev_write(sock_ev_write_struct);
86
cout<<
"
on_write() finished, sock=
"
<<sock<<
endl;
87
}
88
89
void
on_read(
int
sock,
short
event
,
void
*
arg)
90
{
91
cout<<
"
on_read() called, sock=
"
<<sock<<
endl;
92
if
(NULL ==
arg){
93
return
;
94
}
95
struct
sock_ev* event_struct = (
struct
sock_ev*) arg;
//
獲取傳進來的參數
96
char
* buffer =
new
char
[BUF_SIZE];
97
memset(buffer,
0
,
sizeof
(
char
)*
BUF_SIZE);
98
//
--本來應該用while一直循環,但由于用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了
99
int
size =
read(sock, buffer, BUF_SIZE);
100
if
(
0
== size){
//
說明socket關閉
101
cout<<
"
read size is 0 for socket:
"
<<sock<<
endl;
102
destroy_sock_ev(event_struct);
103
close(sock);
104
return
;
105
}
106
struct
sock_ev_write* sock_ev_write_struct = (
struct
sock_ev_write*)malloc(
sizeof
(
struct
sock_ev_write));
107
sock_ev_write_struct->buffer =
buffer;
108
struct
event
* write_ev = (
struct
event
*)malloc(
sizeof
(
struct
event
));
//
發生寫事件(也就是只要socket緩沖區可寫)時,就將反饋數據通過socket寫回客戶端
109
sock_ev_write_struct->write_ev =
write_ev;
110
event_set(write_ev, sock, EV_WRITE, on_write, sock_ev_write_struct);
111
event_base_set(event_struct->
base
, write_ev);
112
event_add(write_ev, NULL);
113
cout<<
"
on_read() finished, sock=
"
<<sock<<
endl;
114
}
115
116
117
/*
*
118
* main執行accept()得到新socket_fd的時候,執行這個方法
119
* 創建一個新線程,在新線程里反饋給client收到的信息
120
*/
121
void
* process_in_new_thread_when_accepted(
void
*
arg){
122
long
long_fd = (
long
)arg;
123
int
fd = (
int
)long_fd;
124
if
(fd<
0
){
125
cout<<
"
process_in_new_thread_when_accepted() quit!
"
<<
endl;
126
return
0
;
127
}
128
//
-------初始化base,寫事件和讀事件--------
129
struct
event_base*
base
=
event_base_new();
130
struct
event
* read_ev = (
struct
event
*)malloc(
sizeof
(
struct
event
));
//
發生讀事件后,從socket中取出數據
131
132
//
-------將base,read_ev,write_ev封裝到一個event_struct對象里,便于銷毀---------
133
struct
sock_ev* event_struct = (
struct
sock_ev*)malloc(
sizeof
(
struct
sock_ev));
134
event_struct->
base
=
base
;
135
event_struct->read_ev =
read_ev;
136
//
-----對讀事件進行相應的設置------------
137
event_set(read_ev, fd, EV_READ|
EV_PERSIST, on_read, event_struct);
138
event_base_set(
base
, read_ev);
139
event_add(read_ev, NULL);
140
//
--------開始libevent的loop循環-----------
141
event_base_dispatch(
base
);
142
cout<<
"
event_base_dispatch() stopped for sock(
"
<<fd<<
"
)
"
<<
"
in process_in_new_thread_when_accepted()
"
<<
endl;
143
return
0
;
144
}
145
146
/*
*
147
* 每當accept出一個新的socket_fd時,調用這個方法。
148
* 創建一個新線程,在新線程里與client做交互
149
*/
150
void
accept_new_thread(
int
sock){
151
pthread_t thread;
152
pthread_create(&thread,NULL,process_in_new_thread_when_accepted,(
void
*
)sock);
153
pthread_detach(thread);
154
}
155
156
/*
*
157
* 每當有新連接連到server時,就通過libevent調用此函數。
158
* 每個連接對應一個新線程
159
*/
160
void
on_accept(
int
sock,
short
event
,
void
*
arg)
161
{
162
struct
sockaddr_in remote_addr;
163
int
sin_size=
sizeof
(
struct
sockaddr_in);
164
int
new_fd = accept(sock, (
struct
sockaddr*) &remote_addr, (socklen_t*)&
sin_size);
165
if
(new_fd <
0
){
166
cout<<
"
Accept error in on_accept()
"
<<
endl;
167
return
;
168
}
169
cout<<
"
new_fd accepted is
"
<<new_fd<<
endl;
170
accept_new_thread(new_fd);
171
cout<<
"
on_accept() finished for fd=
"
<<new_fd<<
endl;
172
}
173
174
int
main(){
175
int
fd =
getSocket();
176
if
(fd<
0
){
177
cout<<
"
Error in main(), fd<0
"
<<
endl;
178
}
179
cout<<
"
main() fd=
"
<<fd<<
endl;
180
//
----為服務器主線程綁定ip和port------------------------------
181
struct
sockaddr_in local_addr;
//
服務器端網絡地址結構體
182
memset(&local_addr,
0
,
sizeof
(local_addr));
//
數據初始化--清零
183
local_addr.sin_family=AF_INET;
//
設置為IP通信
184
local_addr.sin_addr.s_addr=inet_addr(SERVER_IP);
//
服務器IP地址
185
local_addr.sin_port=htons(SERVER_PORT);
//
服務器端口號
186
int
bind_result = bind(fd, (
struct
sockaddr*) &local_addr,
sizeof
(
struct
sockaddr));
187
if
(bind_result <
0
){
188
cout<<
"
Bind Error in main()
"
<<
endl;
189
return
-
1
;
190
}
191
cout<<
"
bind_result=
"
<<bind_result<<
endl;
192
listen(fd,
10
);
193
//
-----設置libevent事件,每當socket出現可讀事件,就調用on_accept()------------
194
struct
event_base*
base
=
event_base_new();
195
struct
event
listen_ev;
196
event_set(&listen_ev, fd, EV_READ|
EV_PERSIST, on_accept, NULL);
197
event_base_set(
base
, &
listen_ev);
198
event_add(&
listen_ev, NULL);
199
event_base_dispatch(
base
);
200
//
------以下語句理論上是不會走到的---------------------------
201
cout<<
"
event_base_dispatch() in main() finished
"
<<
endl;
202
//
----銷毀資源-------------
203
event_del(&
listen_ev);
204
event_base_free(
base
);
205
cout<<
"
main() finished
"
<<
endl;
206
}
1)在main()里(運行在主線程中),先設置服務端的socket,然后為主線程生成一個libevent的base,并將一個“讀事件”注冊到base上。“讀事件”綁定了一個on_accept(),每當client有新連接連過來時,就會觸發這個“讀事件”,進而調用on_accept()方法。
2)在on_accept()里(運行在主線程中),每當有新連接連過來時,就會accept出一個新的new_fd,并調用accept_new_thread()來創建一個新的子線程。子線程里會調用process_in_new_thread_when_accepted()方法。
3)process_in_new_thread_when_accepted()方法里(運行在子線程中),創建一個子線程的base,并創建一個“讀事件”,注冊到“子線程的base”上。并調用event_base_dispatch(base)進入libevent的loop中。當發現new_fd的socket緩沖區中有數據可讀時,就觸發了這個“讀事件”,繼而調用on_read()方法。
4)on_read()方法里(運行在子線程中),從socket緩沖區里讀取數據。讀完數據之后,將一個“寫事件”注冊到“子線程的base”上。一旦socket可寫,就調用on_write()函數。
5)on_write()方法(運行在子線程中),對數據進行修改,然后通過socket寫回到client端。
注:其實可以不用注冊“寫事件”——在on_read()方法中直接修改數據,然后寫回到client端也是可以的——但這有個問題。就是如果socket的寫緩沖區是滿的,那么這時候?write(sock, buffer, strlen(buffer))會阻塞的。這會導致整個on_read()方法阻塞掉,而無法讀到接下來client傳過來的數據了。而用了libevent的”寫事件“之后,雖然?write(sock, buffer, strlen(buffer))仍然會阻塞,但是只要socket緩沖區不可以寫就不會觸發這個“寫事件”,所以程序就不會阻塞,也就不會影響on_read()函數里的流程了。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

