最近在做websocket ?porting的工作中,需要實現最底層socket讀和寫,基于同步讀,libevent, libuv和android Looper都寫了一套,從中體會不少。
1)同步阻塞讀寫
最開始采用同步阻塞讀寫,主要是為了快速實現來驗證上層websocket協議的完備性。優點僅僅是實現起來簡單,缺點就是效率不高,不能很好利用線程的資源,建立連接這一塊方法都是類似的,主要的區別是在如何讀寫數據,先看幾種方法共用的一塊:
?
int n = 0;
struct sockaddr_in serv_addr;
event_init();
if((mSockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
//TODO error
return;
}
memset(&serv_addr, '0', sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(url.port());
if(inet_pton(AF_INET, url.host().utf8().data(), &serv_addr.sin_addr)<=0){
return;
}
if( connect(mSockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0){
return;
}
這里由于是client,所以比較簡單,當然缺失了DNS解析這一塊。然后,就是要監視讀數據,由于是同步阻塞讀,所以需要在循環里不斷地去read/recv:
?
?
while (1) {
ssize_t result = recv(fd, buf, sizeof(buf), 0);
if (result == 0) {
break;
} else if (result < 0) {
perror("recv");
close(fd);
return 1;
}
fwrite(buf, 1, result, stdout);
}
缺點就顯而易見,此線程需要不斷輪詢。當然,這里是個例子程序,正式代碼中不會處理這么草率。
?
2)libevent
對上面的改進方法就是基于異步非阻塞的方式來處理讀數據,在linux上一般是通過epoll來做異步事件偵聽,而libevent是一個封裝了epoll或其他平臺上異步事件的c庫,所以基于libevent來做異步非阻塞讀寫會更簡單,也能跨平臺。重構的第一個步是設置socketFD為非阻塞:
?
static int setnonblock(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
if (flags < 0){
return flags;
}
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0){
return -1;
}
return 0;
}
然后需要在單獨的線程中維護event loop,并添加read事件偵聽:
?
?
static void* loopListen(void *arg)
{
SocketStreamHandle *handle = (SocketStreamHandle *)arg;
struct event_base* base = event_base_new();
struct event ev_read;
handle->setReadEvent(&ev_read);
setnonblock(handle->getSocketFD());
event_set(&ev_read, handle->getSocketFD(), EV_READ|EV_PERSIST, onRead, handle);
event_base_set(base, &ev_read);
event_add(&ev_read, NULL);
event_base_dispatch(base);
}
? ? pthread_t pid;
? ? pthread_create(&pid, 0, loopListen, this);
然后在onRead方法中處理數據讀取:
?
static void onRead(int fd, short ev, void *arg)
{
while(true){
char *buf = new char[1024];
memset(buf, 0, 1024);
int len = read(fd, buf, 1024);
SocketStreamHandle *handle = (SocketStreamHandle *)arg;
if(len > 0){
SocketContext *context = new SocketContext;
context->buf = buf;
context->readLen = len;
context->handle = handle;
WTF::callOnMainThread(onReadMainThread, context);
if(len == 1024){
continue;
}else{
break;
}
}else{
if(errno == EAGAIN || errno == EWOULDBLOCK){
return;
}else if(errno == EINTR){
continue;
}
__android_log_print(ANDROID_LOG_INFO, LOG_TAG, "onCloseMainThread, len:%d, errno:%d", len, errno);
WTF::callOnMainThread(onCloseMainThread, handle);
event_del(handle->getReadEvent());
}
}
}
這里比較有講究的是:
?
1)當一次buf讀不完,需要在循環里再次讀一次
2)當read到0時,表示socket被關閉,這時需要刪除事件偵聽,不然會導致cpu 100%
3)當read到-1時,不完全是錯誤情況,比如errno == EAGAIN || errno == EWOULDBLOCK表示暫時不可讀,歇一會后面再讀。errno == EINTR表示被系統中斷,應重讀一遍
4)onRead是被libevent中專門做事件偵聽的線程調用的,所以有的時候需要回到主線程,比如:?WTF::callOnMainThread(onReadMainThread, context);這里就需要注意多線程間的同步問題。
3)libuv
libuv在libevent更進一步,它不但有event loop,并且把socket的各種操作也覆蓋了,所以代碼會更簡潔,比如最開始的創建連接和創建loop:
?
uv_loop_t *loop = uv_default_loop();
uv_tcp_t client;
uv_tcp_init(loop, &client);
struct sockaddr_in req_addr = uv_ip4_addr(url.host().utf8().data(), url.port());
uv_connect_t *connect_req;
connect_req->data = this;
uv_tcp_connect(connect_req, &client, req_addr, on_connect);
uv_run(loop);
在on_connect中創建對read的監聽:
?
?
static void* on_connect(uv_connect_t *req, int status)
{
SocketStreamHandle *handle = (SocketStreamHandle *)arg;
uv_read_start(req->handle, alloc_buffer, on_read);
}
on_read就和前面類似了。所以libuv是最強大的,極大的省略了socket相關的開發。
?
4)Android Looper
Android提供一套event loop的機制,并且可以對FD進行監聽,所以如果基于Android Looper,就可以省去對第三方lib的依賴。并且Android也是對epoll的封裝,既然如此,值得試一試用Android原生的looper來做這塊的event looper。socket連接這塊和最開始是一樣的,關鍵是在創建looper的地方:
?
static void* loopListen(void *arg)
{
SocketStreamHandle *handle = (SocketStreamHandle *)arg;
setnonblock(handle->getSocketFD());
Looper *looper = new Looper(true);
looper->addFd(handle->getSocketFD(), 0, ALOOPER_EVENT_INPUT, onRead, handle);
while(true){
if(looper->pollOnce(100) == ALOOPER_POLL_ERROR){
__android_log_print(ANDROID_LOG_INFO, LOG_TAG, "ALOOPER_POLL_ERROR");
break;
}
}
}
代碼比較簡單就不多說,詳細使用方法可以查看<utils/Looper.h>的API。
?
綜上所述,如果是在Android上做,可以直接基于原生的Looper,如果需要跨平臺可以基于libuv。總之,要避免同步阻塞,因為這樣會導致線程設計上的復雜和低效。
在Java里也有類似的概念,可以參見以前的博文:
?
從Jetty、Tomcat和Mina中提煉
NIO
構架網絡服務器的經典模式(一)
從Jetty、Tomcat和Mina中提煉
NIO
構架網絡服務器的經典模式(二)
從Jetty、Tomcat和Mina中提煉
NIO
構架網絡服務器的經典模式(三)
?
?
?
?
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

