引言
對于 Python 來說,并不缺少并發(fā)選項,其標(biāo)準(zhǔn)庫中包括了對線程、進程和異步 I/O 的支持。在許多情況下,通過創(chuàng)建諸如異步、線程和子進程之類的高層模塊,Python 簡化了各種并發(fā)方法的使用。除了標(biāo)準(zhǔn)庫之外,還有一些第三方的解決方案,例如 Twisted、Stackless 和進程模塊。本文重點關(guān)注于使用 Python 的線程,并使用了一些實際的示例進行說明。雖然有許多很好的聯(lián)機資源詳細說明了線程 API,但本文嘗試提供一些實際的示例,以說明一些常見的線程使用模式。
全局解釋器鎖 (Global Interpretor Lock) 說明 Python 解釋器并不是線程安全的。當(dāng)前線程必須持有全局鎖,以便對 Python 對象進行安全地訪問。因為只有一個線程可以獲得 Python 對象/C API,所以解釋器每經(jīng)過 100 個字節(jié)碼的指令,就有規(guī)律地釋放和重新獲得鎖。解釋器對線程切換進行檢查的頻率可以通過 sys.setcheckinterval() 函數(shù)來進行控制。
此外,還將根據(jù)潛在的阻塞 I/O 操作,釋放和重新獲得鎖。有關(guān)更詳細的信息,請參見參考資料部分中的 Gil and Threading State 和 Threading the Global Interpreter Lock。
需要說明的是,因為 GIL,CPU 受限的應(yīng)用程序?qū)o法從線程的使用中受益。使用 Python 時,建議使用進程,或者混合創(chuàng)建進程和線程。
首先弄清進程和線程之間的區(qū)別,這一點是非常重要的。線程與進程的不同之處在于,它們共享狀態(tài)、內(nèi)存和資源。對于線程來說,這個簡單的區(qū)別既是它的優(yōu)勢,又是它的缺點。一方面,線程是輕量級的,并且相互之間易于通信,但另一方面,它們也帶來了包括死鎖、爭用條件和高復(fù)雜性在內(nèi)的各種問題。幸運的是,由于 GIL 和隊列模塊,與采用其他的語言相比,采用 Python 語言在線程實現(xiàn)的復(fù)雜性上要低得多。
使用 Python 線程
要繼續(xù)學(xué)習(xí)本文中的內(nèi)容,我假定您已經(jīng)安裝了 Python 2.5 或者更高版本,因為本文中的許多示例都將使用 Python 語言的新特性,而這些特性僅出現(xiàn)于 Python2.5 之后。要開始使用 Python 語言的線程,我們將從簡單的 "Hello World" 示例開始:
hello_threads_example
import threading import datetime class ThreadClass(threading.Thread): def run(self): now = datetime.datetime.now() print "%s says Hello World at time: %s" % (self.getName(), now) for i in range(2): t = ThreadClass() t.start()
如果運行這個示例,您將得到下面的輸出:
# python hello_threads.py Thread-1 says Hello World at time: 2008-05-13 13:22:50.252069 Thread-2 says Hello World at time: 2008-05-13 13:22:50.252576
仔細觀察輸出結(jié)果,您可以看到從兩個線程都輸出了 Hello World 語句,并都帶有日期戳。如果分析實際的代碼,那么將發(fā)現(xiàn)其中包含兩個導(dǎo)入語句;一個語句導(dǎo)入了日期時間模塊,另一個語句導(dǎo)入線程模塊。類 ThreadClass 繼承自 threading.Thread,也正因為如此,您需要定義一個 run 方法,以此執(zhí)行您在該線程中要運行的代碼。在這個 run 方法中唯一要注意的是,self.getName() 是一個用于確定該線程名稱的方法。
最后三行代碼實際地調(diào)用該類,并啟動線程。如果注意的話,那么會發(fā)現(xiàn)實際啟動線程的是 t.start()。在設(shè)計線程模塊時考慮到了繼承,并且線程模塊實際上是建立在底層線程模塊的基礎(chǔ)之上的。對于大多數(shù)情況來說,從 threading.Thread 進行繼承是一種最佳實踐,因為它創(chuàng)建了用于線程編程的常規(guī) API。
使用線程隊列
如前所述,當(dāng)多個線程需要共享數(shù)據(jù)或者資源的時候,可能會使得線程的使用變得復(fù)雜。線程模塊提供了許多同步原語,包括信號量、條件變量、事件和鎖。當(dāng)這些選項存在時,最佳實踐是轉(zhuǎn)而關(guān)注于使用隊列。相比較而言,隊列更容易處理,并且可以使得線程編程更加安全,因為它們能夠有效地傳送單個線程對資源的所有訪問,并支持更加清晰的、可讀性更強的設(shè)計模式。
在下一個示例中,您將首先創(chuàng)建一個以串行方式或者依次執(zhí)行的程序,獲取網(wǎng)站的 URL,并顯示頁面的前 1024 個字節(jié)。有時使用線程可以更快地完成任務(wù),下面就是一個典型的示例。首先,讓我們使用 urllib2 模塊以獲取這些頁面(一次獲取一個頁面),并且對代碼的運行時間進行計時:
URL 獲取序列
import urllib2 import time hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] start = time.time() #grabs urls of hosts and prints first 1024 bytes of page for host in hosts: url = urllib2.urlopen(host) print url.read(1024) print "Elapsed Time: %s" % (time.time() - start)
在運行以上示例時,您將在標(biāo)準(zhǔn)輸出中獲得大量的輸出結(jié)果。但最后您將得到以下內(nèi)容:
Elapsed Time: 2.40353488922
讓我們仔細分析這段代碼。您僅導(dǎo)入了兩個模塊。首先,urllib2 模塊減少了工作的復(fù)雜程度,并且獲取了 Web 頁面。然后,通過調(diào)用 time.time(),您創(chuàng)建了一個開始時間值,然后再次調(diào)用該函數(shù),并且減去開始值以確定執(zhí)行該程序花費了多長時間。最后分析一下該程序的執(zhí)行速度,雖然“2.5 秒”這個結(jié)果并不算太糟,但如果您需要檢索數(shù)百個 Web 頁面,那么按照這個平均值,就需要花費大約 50 秒的時間。研究如何創(chuàng)建一種可以提高執(zhí)行速度的線程化版本:
URL 獲取線程化
#!/usr/bin/env python import Queue import threading import urllib2 import time hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] queue = Queue.Queue() class ThreadUrl(threading.Thread): """Threaded Url Grab""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: #grabs host from queue host = self.queue.get() #grabs urls of hosts and prints first 1024 bytes of page url = urllib2.urlopen(host) print url.read(1024) #signals to queue job is done self.queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadUrl(queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
對于這個示例,有更多的代碼需要說明,但與第一個線程示例相比,它并沒有復(fù)雜多少,這正是因為使用了隊列模塊。在 Python 中使用線程時,這個模式是一種很常見的并且推薦使用的方式。具體工作步驟描述如下:
- ??? 創(chuàng)建一個 Queue.Queue() 的實例,然后使用數(shù)據(jù)對它進行填充。
- ??? 將經(jīng)過填充數(shù)據(jù)的實例傳遞給線程類,后者是通過繼承 threading.Thread 的方式創(chuàng)建的。
- ??? 生成守護線程池。
- ??? 每次從隊列中取出一個項目,并使用該線程中的數(shù)據(jù)和 run 方法以執(zhí)行相應(yīng)的工作。
- ??? 在完成這項工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊列發(fā)送一個信號。
- ??? 對隊列執(zhí)行 join 操作,實際上意味著等到隊列為空,再退出主程序。
在使用這個模式時需要注意一點:通過將守護線程設(shè)置為 true,將允許主線程或者程序僅在守護線程處于活動狀態(tài)時才能夠退出。這種方式創(chuàng)建了一種簡單的方式以控制程序流程,因為在退出之前,您可以對隊列執(zhí)行 join 操作、或者等到隊列為空。隊列模塊文檔詳細說明了實際的處理過程,請參見參考資料:
??? join()
??? 保持阻塞狀態(tài),直到處理了隊列中的所有項目為止。在將一個項目添加到該隊列時,未完成的任務(wù)的總數(shù)就會增加。當(dāng)使用者線程調(diào)用 task_done() 以表示檢索了該項目、并完成了所有的工作時,那么未完成的任務(wù)的總數(shù)就會減少。當(dāng)未完成的任務(wù)的總數(shù)減少到零時,join() 就會結(jié)束阻塞狀態(tài)。
使用多個隊列
因為上面介紹的模式非常有效,所以可以通過連接附加線程池和隊列來進行擴展,這是相當(dāng)簡單的。在上面的示例中,您僅僅輸出了 Web 頁面的開始部分。而下一個示例則將返回各線程獲取的完整 Web 頁面,然后將結(jié)果放置到另一個隊列中。然后,對加入到第二個隊列中的另一個線程池進行設(shè)置,然后對 Web 頁面執(zhí)行相應(yīng)的處理。這個示例中所進行的工作包括使用一個名為 Beautiful Soup 的第三方 Python 模塊來解析 Web 頁面。使用這個模塊,您只需要兩行代碼就可以提取所訪問的每個頁面的 title 標(biāo)記,并將其打印輸出。
多隊列數(shù)據(jù)挖掘網(wǎng)站
import Queue import threading import urllib2 import time from BeautifulSoup import BeautifulSoup hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] queue = Queue.Queue() out_queue = Queue.Queue() class ThreadUrl(threading.Thread): """Threaded Url Grab""" def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): while True: #grabs host from queue host = self.queue.get() #grabs urls of hosts and then grabs chunk of webpage url = urllib2.urlopen(host) chunk = url.read() #place chunk into out queue self.out_queue.put(chunk) #signals to queue job is done self.queue.task_done() class DatamineThread(threading.Thread): """Threaded Url Grab""" def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue def run(self): while True: #grabs host from queue chunk = self.out_queue.get() #parse the chunk soup = BeautifulSoup(chunk) print soup.findAll(['title']) #signals to queue job is done self.out_queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadUrl(queue, out_queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) for i in range(5): dt = DatamineThread(out_queue) dt.setDaemon(True) dt.start() #wait on the queue until everything has been processed queue.join() out_queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
如果運行腳本的這個版本,您將得到下面的輸出:
# python url_fetch_threaded_part2.py [ ] [ ] [ ] [ ] [ ] Elapsed Time: 3.75387597084
分析這段代碼時您可以看到,我們添加了另一個隊列實例,然后將該隊列傳遞給第一個線程池類 ThreadURL。接下來,對于另一個線程池類 DatamineThread,幾乎復(fù)制了完全相同的結(jié)構(gòu)。在這個類的 run 方法中,從隊列中的各個線程獲取 Web 頁面、文本塊,然后使用 Beautiful Soup 處理這個文本塊。在這個示例中,使用 Beautiful Soup 提取每個頁面的 title 標(biāo)記、并將其打印輸出。可以很容易地將這個示例推廣到一些更有價值的應(yīng)用場景,因為您掌握了基本搜索引擎或者數(shù)據(jù)挖掘工具的核心內(nèi)容。一種思想是使用 Beautiful Soup 從每個頁面中提取鏈接,然后按照它們進行導(dǎo)航。
總結(jié)
本文研究了 Python 的線程,并且說明了如何使用隊列來降低復(fù)雜性和減少細微的錯誤、并提高代碼可讀性的最佳實踐。盡管這個基本模式比較簡單,但可以通過將隊列和線程池連接在一起,以便將這個模式用于解決各種各樣的問題。在最后的部分中,您開始研究如何創(chuàng)建更復(fù)雜的處理管道,它可以用作未來項目的模型。參考資料部分提供了很多有關(guān)常規(guī)并發(fā)性和線程的極好的參考資料。
最后,還有很重要的一點需要指出,線程并不能解決所有的問題,對于許多情況,使用進程可能更為合適。特別是,當(dāng)您僅需要創(chuàng)建許多子進程并對響應(yīng)進行偵聽時,那么標(biāo)準(zhǔn)庫子進程模塊可能使用起來更加容易。有關(guān)更多的官方說明文檔,請參考參考資料部分。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
