本文實例講述了python threading和multiprocessing模塊基本用法。分享給大家供大家參考,具體如下:
前言
這兩天為了做一個小項目,研究了一下python的并發編程,所謂并發無非多線程和多進程,最初找到的是threading模塊,因為印象中線程“輕量...”,“切換快...”,“可共享進程資源...”等等,但是沒想到這里水很深,進而找到了更好的替代品multiprocessing模塊。下面會講一些使用中的經驗。
后面出現的代碼都在ubuntu10.04 + python2.6.5的環境下測試通過。
一、使用threading模塊創建線程
1、三種線程創建方式
(1)傳入一個函數
這種方式是最基本的,即調用threading中的Thread類的構造函數,然后指定參數target=func,再使用返回的Thread的實例調用
start()
方法,即開始運行該線程,該線程將執行函數func,當然,如果func需要參數,可以在Thread的構造函數中傳入參數
args=(...)
。示例代碼如下:
#!/usr/bin/python #-*-coding:utf-8-*- import threading #用于線程執行的函數 def counter(n): cnt = 0; for i in xrange(n): for j in xrange(i): cnt += j; print cnt; if __name__ == '__main__': #初始化一個線程對象,傳入函數counter,及其參數1000 th = threading.Thread(target=counter, args=(1000,)); #啟動線程 th.start(); #主線程阻塞等待子線程結束 th.join();
這段代碼很直觀,counter函數是一個很無聊的雙重循環,需要注意的是
th.join()
這句,這句的意思是主線程將自我阻塞,然后等待th表示的線程執行完畢再結束,如果沒有這句,運行代碼會立即結束。join的意思比較晦澀,其實將這句理解成這樣會好理解些“while th.is_alive(): time.sleep(1)”。雖然意思相同,但是后面將看到,使用join也有陷阱。
(2)傳入一個可調用的對象
許多的python 對象都是我們所說的可調用的,即是任何能通過函數操作符“()”來調用的對象(見《python核心編程》第14章)。類的對象也是可以調用的,當被調用時會自動調用對象的內建方法
__call__()
,因此這種新建線程的方法就是給線程指定一個__call__方法被重載了的對象。示例代碼如下:
#!/usr/bin/python #-*-coding:utf-8-*- import threading #可調用的類 class Callable(object): def __init__(self, func, args): self.func = func; self.args = args; def __call__(self): apply(self.func, self.args); #用于線程執行的函數 def counter(n): cnt = 0; for i in xrange(n): for j in xrange(i): cnt += j; print cnt; if __name__ == '__main__': #初始化一個線程對象,傳入可調用的Callable對象,并用函數counter及其參數1000初始化這個對象 th = threading.Thread(target=Callable(counter, (1000,))); #啟動線程 th.start(); #主線程阻塞等待子線程結束 th.join();
這個例子關鍵的一句是
apply(self.func, self.args);
這里使用初始化時傳入的函數對象及其參數來進行一次調用。
(3)繼承Thread類
這種方式通過繼承Thread類,并重載其run方法,來實現自定義的線程行為,示例代碼如下:
#!/usr/bin/python #-*-coding:utf-8-*- import threading, time, random def counter(): cnt = 0; for i in xrange(10000): for j in xrange(i): cnt += j; class SubThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self, name=name); def run(self): i = 0; while i < 4: print self.name,'counting...\n'; counter(); print self.name,'finish\n'; i += 1; if __name__ == '__main__': th = SubThread('thread-1'); th.start(); th.join(); print 'all done';
這個例子定義了一個SubThread類,它繼承了Thread類,并重載了run方法,在方法中調用counter4次并打印一些信息,可以看到這種方式比較直觀。在構造函數中要記得先調用父類的構造函數進行初始化。
2、python多線程的限制
python多線程有個討厭的限制,全局解釋器鎖(global interpreter lock),這個鎖的意思是任一時間只能有一個線程使用解釋器,跟單cpu跑多個程序一個意思,大家都是輪著用的,這叫“并發”,不是“并行”。手冊上的解釋是為了保證對象模型的正確性!這個鎖造成的困擾是如果有一個計算密集型的線程占著cpu,其他的線程都得等著....,試想你的多個線程中有這么一個線程,得多悲劇,多線程生生被搞成串行;當然這個模塊也不是毫無用處,手冊上又說了:當用于IO密集型任務時,IO期間線程會釋放解釋器,這樣別的線程就有機會使用解釋器了!所以是否使用這個模塊需要考慮面對的任務類型。
二、使用multiprocessing創建進程
1、三種創建方式
進程的創建方式跟線程完全一致,只不過要將threading.Thread換成
multiprocessing.Process
。multiprocessing模塊盡力保持了與threading模塊在方法名上的一致性,示例代碼可參考上面線程部分的。這里只給出第一種使用函數的方式:
#!/usr/bin/python #-*-coding:utf-8-*- import multiprocessing, time def run(): i = 0; while i<10000: print 'running'; time.sleep(2); i += 1; if __name__ == '__main__': p = multiprocessing.Process(target=run); p.start(); #p.join(); print p.pid; print 'master gone';
2、創建進程池
該模塊還允許一次創建一組進程,然后再給他們分配任務。詳細內容可參考手冊,這部分研究不多,不敢亂寫。
pool = multiprocessing.Pool(processes=4) pool.apply_async(func, args...)
3、使用進程的好處
完全并行,無GIL的限制,可充分利用多cpu多核的環境;可以接受linux信號,后面將看到,這個功能非常好用。
三、實例研究
該實例假想的任務是:一個主進程會啟動多個子進程分別處理不同的任務,各個子進程可能又有自己的線程用于不同的IO處理(前面說過,線程在IO方面還是不錯的),要實現的功能是,對這些子進程發送信號,能被正確的處理,例如發生SIGTERM,子進程能通知其線程收工,然后“優雅”的退出。現在要解決的問題有:(1)在子類化的Process對象中如何捕捉信號;(2)如何“優雅的退出”。下面分別說明。
1、子類化Process并捕捉信號
如果是使用第一種進程創建方式(傳入函數),那么捕捉信號很容易,假設給進程運行的函數叫func,代碼示例如下:
#!/usr/bin/python #-*-coding:utf-8-*- import multiprocessing, signal,time def handler(signum, frame): print 'signal', signum; def run(): signal.signal(signal.SIGTERM, handler); signal.signal(signal.SIGINT, handler); i = 0; while i<10000: print 'running'; time.sleep(2); i += 1; if __name__ == '__main__': p = multiprocessing.Process(target=run); p.start(); #p.join(); print p.pid; print 'master gone';
這段代碼是在第一種創建方式的基礎上修改而來的,增加了兩行
signal.signal(...)
調用,這是說這個函數要捕捉SIGTERM和SIGINT兩個信號,另外增加了一個handler函數,該函數用于捕捉到信號時進行相應的處理,我們這里只是簡單的打印出信號值。
注意
p.join()
被注釋掉了,這里跟線程的情況有點區別,新的進程啟動后就開始運行了,主進程也不用等待它運行完,可以該干嘛干嘛去。這段代碼運行后會打印出子進程的進程id,根據這個id,在另一個終端輸入
kill -TERM id
,會發現剛才的終端打印出了"signal 15"。
但是使用傳入函數的方式有一點不好的是封裝性太差,如果功能稍微復雜點,將會有很多的全局變量暴露在外,最好還是將功能封裝成類,那么使用類又怎么注冊信號相應函數呢?上面的例子貌似只能使用一個全局的函數,手冊也沒有給出在類中處理信號的例子,其實解決方法大同小異,也很容易,這個帖子 http://stackoverflow.com/questions/6204443/python-signal-reading-return-from-signal-handler-function 給了我靈感:
class Master(multiprocessing.Process): def __init__(self): super(Master,self).__init__(); signal.signal(signal.SIGTERM, self.handler); #注冊信號處理函數 self.live = 1; #信號處理函數 def handler(self, signum, frame): print 'signal:',signum; self.live = 0; def run(self): print 'PID:',self.pid; while self.live: print 'living...' time.sleep(2);
方法很直觀,首先在構造函數中注冊信號處理函數,然后定義了一個方法handler作為處理函數。這個進程類會每隔2秒打印一個“living...”,當接收到SIGTERM后,改變self.live的值,run方法的循環檢測到這個值為0后就結束了,進程也結束了。
2、讓進程優雅的退出
下面放出這次的假想任務的全部代碼,我在主進程中啟動了一個子進程(通過子類化Process類),然后子進程啟動后又產生兩個子線程,用來模擬“生產者-消費者”模型,兩個線程通過一個隊列進行交流,為了互斥訪問這個隊列,自然要加一把鎖(condition對象跟Lock對象差不多,不過多了等待和通知的功能);生產者每次產生一個隨機數并扔進隊列,然后休息一個隨機時間,消費者每次從隊列取一個數;而子進程中的主線程要負責接收信號,以便讓整個過程優雅的結束。代碼如下:
#!/usr/bin/python #-*-coding:utf-8-*- import time, multiprocessing, signal, threading, random, time, Queue class Master(multiprocessing.Process): def __init__(self): super(Master,self).__init__(); signal.signal(signal.SIGTERM, self.handler); #這個變量要傳入線程用于控制線程運行,為什么用dict?充分利用線程間共享資源的特點 #因為可變對象按引用傳遞,標量是傳值的,不信寫成self.live = true試試 self.live = {'stat':True}; def handler(self, signum, frame): print 'signal:',signum; self.live['stat'] = 0; #置這個變量為0,通知子線程可以“收工”了 def run(self): print 'PID:',self.pid; cond = threading.Condition(threading.Lock()); #創建一個condition對象,用于子線程交互 q = Queue.Queue(); #一個隊列 sender = Sender(cond, self.live, q); #傳入共享資源 geter = Geter(cond, self.live, q); sender.start(); #啟動線程 geter.start(); signal.pause(); #主線程睡眠并等待信號 while threading.activeCount()-1: #主線程收到信號并被喚醒后,檢查還有多少線程活著(除掉自己) time.sleep(2); #再睡眠等待,確保子線程都安全的結束 print 'checking live', threading.activeCount(); print 'mater gone'; class Sender(threading.Thread): def __init__(self, cond, live, queue): super(Sender, self).__init__(name='sender'); self.cond = cond; self.queue = queue; self.live = live def run(self): cond = self.cond; while self.live['stat']: #檢查這個進程內的“全局”變量,為真就繼續運行 cond.acquire(); #獲得鎖,以便控制隊列 i = random.randint(0,100); self.queue.put(i,False); if not self.queue.full(): print 'sender add:',i; cond.notify(); #喚醒等待鎖的其他線程 cond.release(); #釋放鎖 time.sleep(random.randint(1,3)); print 'sender done' class Geter(threading.Thread): def __init__(self, cond, live, queue): super(Geter, self).__init__(name='geter'); self.cond = cond; self.queue = queue; self.live = live def run(self): cond = self.cond; while self.live['stat']: cond.acquire(); if not self.queue.empty(): i = self.queue.get(); print 'geter get:',i; cond.wait(3); cond.release(); time.sleep(random.randint(1,3)); print 'geter done' if __name__ == '__main__': master = Master(); master.start(); #啟動子進程
需要注意的地方是,在Master的run方法中
sender.start()
和
geter.start()
之后,按常理應該接著調用
sender.join()
和
geter.join()
,讓主線程等待子線程結束,前面說的join的陷阱就在這里,join將主線程阻塞(blocking)住了,主線程無法再捕捉信號,剛開始研究這塊時還以為信號處理函數寫錯了。網上討論比較少,這里說的比較清楚
http://stackoverflow.com/questions/631441/interruptible-thread-join-in-python
,
http://www.gossamer-threads.com/lists/python/python/541403
參考:
《python核心編程》
《python manual》
更多關于Python相關內容感興趣的讀者可查看本站專題:《Python進程與線程操作技巧總結》、《Python數據結構與算法教程》、《Python函數使用技巧總結》、《Python字符串操作技巧匯總》、《Python入門與進階經典教程》、《Python+MySQL數據庫程序設計入門教程》及《Python常見數據庫操作技巧匯總》
希望本文所述對大家Python程序設計有所幫助。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
