非阻塞IO(non-blocking IO)
Linux下,可以通過(guò)設(shè)置socket使其變?yōu)閚on-blocking。當(dāng)對(duì)一個(gè)non-blocking socket執(zhí)行讀操作時(shí),流程是這個(gè)樣子:
從圖中可以看出,當(dāng)用戶進(jìn)程發(fā)出read操作時(shí),如果kernel中的數(shù)據(jù)還沒有準(zhǔn)備好,那么它并不會(huì)block用戶進(jìn)程,而是立刻返回一個(gè)error。從用戶進(jìn)程角度講 ,它發(fā)起一個(gè)read操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè)error時(shí),它就知道數(shù)據(jù)還沒有準(zhǔn)備好,于是用戶就可以在本次到下次再發(fā)起read詢問的時(shí)間間隔內(nèi)做其他事情,或者直接再次發(fā)送read操作。一旦kernel中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存(這一階段仍然是阻塞的,這段是本地拷貝,copy data ),然后返回。
也就是說(shuō)非阻塞的recvform系統(tǒng)調(diào)用調(diào)用之后,進(jìn)程并沒有被阻塞,內(nèi)核馬上返回給進(jìn)程,如果數(shù)據(jù)還沒準(zhǔn)備好,
此時(shí)會(huì)返回一個(gè)error。進(jìn)程在返回之后,可以干點(diǎn)別的事情,然后再發(fā)起recvform系統(tǒng)調(diào)用。重復(fù)上面的過(guò)程,
循環(huán)往復(fù)的進(jìn)行recvform系統(tǒng)調(diào)用。這個(gè)過(guò)程通常被稱之為輪詢。輪詢檢查內(nèi)核數(shù)據(jù),直到數(shù)據(jù)準(zhǔn)備好,再拷貝數(shù)據(jù)到進(jìn)程,
進(jìn)行數(shù)據(jù)處理。需要注意,拷貝數(shù)據(jù)整個(gè)過(guò)程,進(jìn)程仍然是屬于阻塞的狀態(tài)。
所以,在非阻塞式IO中,用戶進(jìn)程其實(shí)是需要不斷的主動(dòng)詢問kernel操作系統(tǒng)內(nèi)存 數(shù)據(jù)準(zhǔn)備好了沒有。
非阻塞IO示例
- 設(shè)置socket接口為 非阻塞IO接口
- 默認(rèn)是True 為阻塞
- server.setblocking(False)
- 處理一下這個(gè)異常
BlockingIOError: [WinError 10035] 無(wú)法立即完成一個(gè)非阻止性套接字操作。
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認(rèn)是True 為阻塞 server.setblocking(False) print("starting...") while True: try: conn,addr = server.accept() print(addr) except BlockingIOError: print("干其他的工作") server.close()
執(zhí)行結(jié)果,如上面的圖,一直返回error消息
starting... 干其他的工作 干其他的工作 干其他的工作 干其他的工作
服務(wù)端 可以與 多個(gè)客戶端建立連接,實(shí)現(xiàn)服務(wù)端可以不停的建立連接
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認(rèn)是True 為阻塞 server.setblocking(False) r_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: pass server.close()
起三個(gè)客戶端與服務(wù)端建立連接
r_list 存著所有建立的連接
有連接來(lái),就建立連接,沒有連接來(lái),就拋出異常
實(shí)現(xiàn)IO非阻塞 并發(fā) 多個(gè)連接
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認(rèn)是True 為阻塞 server.setblocking(False) r_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: # 定義刪除連接列表 del_rlist = [] for conn in r_list: try: data = conn.recv(1024) # 收空數(shù)據(jù)時(shí)候 if not data: del_rlist.append(conn) continue conn.send(data.upper()) # 沒有連接,拋出異常,就結(jié)束這次循環(huán),繼續(xù) except BlockingIOError: continue # 套接字出現(xiàn)異常,客戶端單方面連接斷開 except Exception: conn.close() del_rlist.append(conn) break # 結(jié)束上面循環(huán)之后,循環(huán)del_list 連接元素 刪除連接 for conn in del_rlist: del_rlist.remove(conn) server.close()
BUG:send也是IO阻塞接口
當(dāng)send在數(shù)據(jù)量過(guò)大時(shí)候,也會(huì)阻塞。
send操作是,把應(yīng)用程序把數(shù)據(jù)發(fā)送到操作系統(tǒng)緩存區(qū)里,而操作系統(tǒng)緩存區(qū)空間也是有限的。緩存區(qū)也會(huì)滿了,后面還有數(shù)據(jù)需要發(fā)送,那只能等緩存區(qū)清掉數(shù)據(jù),有空間了,才能發(fā)送數(shù)據(jù)。所以在這里緩存區(qū)滿了,就阻塞。
修改后服務(wù)端的代碼 可以自己檢測(cè)IO,遇到IO切換單個(gè)線程的其他任務(wù),去運(yùn)行,實(shí)現(xiàn)單線程并發(fā)
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認(rèn)是True 為阻塞 server.setblocking(False) r_list = [] w_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: # 收消息 # 定義刪除連接列表 del_rlist = [] for conn in r_list: try: data = conn.recv(1024) # 收空數(shù)據(jù)時(shí)候 if not data: del_rlist.append(conn) continue '''加入元祖 元祖有兩個(gè)元素 1.存放套接字連接 2.準(zhǔn)備要發(fā)送的的數(shù)據(jù) ''' w_list.append((conn, data.upper())) # 沒有連接,拋出異常,就結(jié)束這次循環(huán),繼續(xù) except BlockingIOError: continue # 套接字出現(xiàn)異常,客戶端單方面連接斷開 except Exception: conn.close() del_rlist.append(conn) break # 發(fā)消息 # 用于 發(fā)成功數(shù)據(jù)后,刪除套接字連接的列表 del_wlist = [] for item in w_list: try: conn = item[0] data = item[1] conn.send(data) # 發(fā)成功后,從列表刪除連接 del_wlist.append(item) # send 有可能出現(xiàn)異常 沒發(fā)完情況 except BlockingIOError: pass # 結(jié)束上面循環(huán)之后,循環(huán)del_wlist 連接元素 刪除連接 for item in del_wlist: del_wlist.remove(item) # 結(jié)束上面循環(huán)之后,循環(huán)del_rlist 連接元素 刪除連接 for conn in del_rlist: del_rlist.remove(conn) server.close()
這就是非阻塞IO
但是非阻塞IO模型絕不被推薦。
我們不能否則其優(yōu)點(diǎn):能夠在等待任務(wù)完成的時(shí)間里干其他活了(包括提交其他任務(wù),也就是 “后臺(tái)” 可以有多個(gè)任務(wù)在“”同時(shí)“”執(zhí)行)。
干其他活時(shí)候,有可能來(lái)新的連接,新的連接來(lái)了,不能及時(shí)響應(yīng)與該新的連接,建立連接。所以會(huì)導(dǎo)致問題:數(shù)據(jù)不會(huì)及時(shí)響應(yīng)
但是也難掩其缺點(diǎn):
1. 循環(huán)調(diào)用recv()將大幅度推高CPU占用率;這也是我們?cè)诖a中留一句time.sleep(2)的原因,否則在低配主機(jī)下極容易出現(xiàn)卡機(jī)情況
2. 任務(wù)完成的響應(yīng)延遲增大了,因?yàn)槊窟^(guò)一段時(shí)間才去輪詢一次read操作,而任務(wù)可能在兩次輪詢之間的任意時(shí)間完成。
這會(huì)導(dǎo)致整體數(shù)據(jù)吞吐量的降低。
3.死循環(huán)While True會(huì)導(dǎo)致CPU的無(wú)用的耗用、占用
此外,在這個(gè)方案中recv()更多的是起到檢測(cè)“操作是否完成”的作用,實(shí)際操作系統(tǒng)提供了更為高效的檢測(cè)“操作是否完成“作用的接口,例如select()多路復(fù)用模式,可以一次檢測(cè)多個(gè)連接是否活躍
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
