一、簡介
我們將一個正在運行的程序稱為進程。每個進程都有它自己的系統狀態,包含內存狀態、打開文件列表、追蹤指令執行情況的程序指針以及一個保存局部變量的調用棧。通常情況下,一個進程依照一個單序列控制流順序執行,這個控制流被稱為該進程的主線程。在任何給定的時刻,一個程序只做一件事情。
一個程序可以通過Python庫函數中的os或subprocess模塊創建新進程(例如os.fork()或是subprocess.Popen())。然而,這些被稱為子進程的進程卻是獨立運行的,它們有各自獨立的系統狀態以及主線程。因為進程之間是相互獨立的,因此它們同原有的進程并發執行。這是指原進程可以在創建子進程后去執行其它工作。
雖然進程之間是相互獨立的,但是它們能夠通過名為進程間通信(IPC)的機制進行相互通信。一個典型的模式是基于消息傳遞,可以將其簡單地理解為一個純字節的緩沖區,而send()或recv()操作原語可以通過諸如管道(pipe)或是網絡套接字(network socket)等I/O通道傳輸或接收消息。還有一些IPC模式可以通過內存映射(memory-mapped)機制完成(例如mmap模塊),通過內存映射,進程可以在內存中創建共享區域,而對這些區域的修改對所有的進程可見。
多進程能夠被用于需要同時執行多個任務的場景,由不同的進程負責任務的不同部分。然而,另一種將工作細分到任務的方法是使用線程。同進程類似,線程也有其自己的控制流以及執行棧,但線程在創建它的進程之內運行,分享其父進程的所有數據和系統資源。當應用需要完成并發任務的時候線程是很有用的,但是潛在的問題是任務間必須分享大量的系統狀態。
當使用多進程或多線程時,操作系統負責調度。這是通過給每個進程(或線程)一個很小的時間片并且在所有活動任務之間快速循環切換來實現的,這個過程將CPU時間分割為小片段分給各個任務。例如,如果你的系統中有10個活躍的進程正在執行,操作系統將會適當的將十分之一的CPU時間分配給每個進程并且循環地在十個進程之間切換。當系統不止有一個CPU核時,操作系統能夠將進程調度到不同的CPU核上,保持系統負載平均以實現并行執行。
利用并發執行機制寫的程序需要考慮一些復雜的問題。復雜性的主要來源是關于同步和共享數據的問題。通常情況下,多個任務同時試圖更新同一個數據結構會造成臟數據和程序狀態不一致的問題(正式的說法是資源競爭的問題)。為了解決這個問題,需要使用互斥鎖或是其他相似的同步原語來標識并保護程序中的關鍵部分。舉個例子,如果多個不同的線程正在試圖同時向同一個文件寫入數據,那么你需要一個互斥鎖使這些寫操作依次執行,當一個線程在寫入時,其他線程必須等待直到當前線程釋放這個資源。
Python中的并發編程
Python長久以來一直支持不同方式的并發編程,包括線程、子進程以及其他利用生成器(generator function)的并發實現。
Python在大部分系統上同時支持消息傳遞和基于線程的并發編程機制。雖然大部分程序員對線程接口更為熟悉,但是Python的線程機制卻有著諸多的限制。Python使用了內部全局解釋器鎖(GIL)來保證線程安全,GIL同時只允許一個線程執行。這使得Python程序就算在多核系統上也只能在單個處理器上運行。Python界關于GIL的爭論盡管很多,但在可預見的未來卻沒有將其移除的可能。
Python提供了一些很精巧的工具用于管理基于線程和進程的并發操作。即使是簡單地程序也能夠使用這些工具使得任務并發進行從而加快運行速度。subprocess模塊為子進程的創建和通信提供了API。這特別適合運行與文本相關的程序,因為這些API支持通過新進程的標準輸入輸出通道傳送數據。signal模塊將UNIX系統的信號量機制暴露給用戶,用以在進程之間傳遞事件信息。信號是異步處理的,通常有信號到來時會中斷程序當前的工作。信號機制能夠實現粗粒度的消息傳遞系統,但是有其他更可靠的進程內通訊技術能夠傳遞更復雜的消息。threading模塊為并發操作提供了一系列高級的,面向對象的API。Thread對象們在一個進程內并發地運行,分享內存資源。使用線程能夠更好地擴展I/O密集型的任務。multiprocessing模塊同threading模塊類似,不過它提供了對于進程的操作。每個進程類是真實的操作系統進程,并且沒有共享內存資源,但multiprocessing模塊提供了進程間共享數據以及傳遞消息的機制。通常情況下,將基于線程的程序改為基于進程的很簡單,只需要修改一些import聲明即可。
Threading模塊示例
以threading模塊為例,思考這樣一個簡單的問題:如何使用分段并行的方式完成一個大數的累加。
import threading
class SummingThread(threading.Thread):
def __init__(self, low, high):
super(SummingThread, self).__init__()
self.low = low
self.high = high
self.total = 0
def run(self):
for i in range(self.low, self.high):
self.total += i
thread1 = SummingThread(0, 500000)
thread2 = SummingThread(500000, 1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print(result)
自定義Threading類庫
我寫了一個易于使用threads的小型Python類庫,包含了一些有用的類和函數。
關鍵參數:
* do_threaded_work ?C 該函數將一系列給定的任務分配給對應的處理函數(分配順序不確定)
* ThreadedWorker ?C 該類創建一個線程,它將從一個同步的工作隊列中拉取工作任務并將處理結果寫入同步結果隊列
* start_logging_with_thread_info ?C 將線程id寫入所有日志消息。(依賴日志環境)
* stop_logging_with_thread_info ?C 用于將線程id從所有的日志消息中移除。(依賴日志環境)
import threading
import logging
def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):
""" Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).
Parameters:
- num_threads Default: len(work_items) --- Number of threads to use process items in work_items.
- per_sync_timeout Default: 1 --- Each synchronized operation can optionally timeout.
- preserve_result_ordering Default: True --- Reorders result_item to match original work_items ordering.
Return:
--- list of results from applying work_func to each work_item. Order is optionally preserved.
Example:
def process_url(url):
# TODO: Do some work with the url
return url
urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
# process urls in parallel
result_items = do_threaded_work(urls_to_process, process_url)
# print(results)
print(repr(result_items))
"""
global wrapped_work_func
if not num_threads:
num_threads = len(work_items)
work_queue = Queue.Queue()
result_queue = Queue.Queue()
index = 0
for work_item in work_items:
if preserve_result_ordering:
work_queue.put((index, work_item))
else:
work_queue.put(work_item)
index += 1
if preserve_result_ordering:
wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))
start_logging_with_thread_info()
#spawn a pool of threads, and pass them queue instance
for _ in range(num_threads):
if preserve_result_ordering:
t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)
else:
t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)
t.setDaemon(True)
t.start()
work_queue.join()
stop_logging_with_thread_info()
logging.info('work_queue joined')
result_items = []
while not result_queue.empty():
result = result_queue.get(timeout=per_sync_timeout)
logging.info('found result[:500]: ' + repr(result)[:500])
if result:
result_items.append(result)
if preserve_result_ordering:
result_items = [work_item for index, work_item in result_items]
return result_items
class ThreadedWorker(threading.Thread):
""" Generic Threaded Worker
Input to work_func: item from work_queue
Example usage:
import Queue
urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
work_queue = Queue.Queue()
result_queue = Queue.Queue()
def process_url(url):
# TODO: Do some work with the url
return url
def main():
# spawn a pool of threads, and pass them queue instance
for i in range(3):
t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
t.setDaemon(True)
t.start()
# populate queue with data
for url in urls_to_process:
work_queue.put(url)
# wait on the queue until everything has been processed
work_queue.join()
# print results
print repr(result_queue)
main()
"""
def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):
threading.Thread.__init__(self)
self.work_queue = work_queue
self.result_queue = result_queue
self.work_func = work_func
self.stop_when_work_queue_empty = stop_when_work_queue_empty
self.queue_timeout = queue_timeout
def should_continue_running(self):
if self.stop_when_work_queue_empty:
return not self.work_queue.empty()
else:
return True
def run(self):
while self.should_continue_running():
try:
# grabs item from work_queue
work_item = self.work_queue.get(timeout=self.queue_timeout)
# works on item
work_result = self.work_func(work_item)
#place work_result into result_queue
self.result_queue.put(work_result, timeout=self.queue_timeout)
except Queue.Empty:
logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out')
except Queue.Full:
logging.warning('ThreadedWorker Queue was full or Queue.put() timed out')
except:
logging.exception('Error in ThreadedWorker')
finally:
#signals to work_queue that item is done
self.work_queue.task_done()
def start_logging_with_thread_info():
try:
formatter = logging.Formatter('[thread %(thread)-3s] %(message)s')
logging.getLogger().handlers[0].setFormatter(formatter)
except:
logging.exception('Failed to start logging with thread info')
def stop_logging_with_thread_info():
try:
formatter = logging.Formatter('%(message)s')
logging.getLogger().handlers[0].setFormatter(formatter)
except:
logging.exception('Failed to stop logging with thread info')
使用示例
from test import ThreadedWorker
from queue import Queue
urls_to_process = ["http://facebook.com", "http://pypix.com"]
work_queue = Queue()
result_queue = Queue()
def process_url(url):
# TODO: Do some work with the url
return url
def main():
# spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
t.setDaemon(True)
t.start()
# populate queue with data
for url in urls_to_process:
work_queue.put(url)
# wait on the queue until everything has been processed
work_queue.join()
# print results
print(repr(result_queue))
main()
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

