上一篇文章介紹了線程的使用。然而 Python 中由于
Global Interpreter Lock
(全局解釋鎖 GIL )的存在,每個線程在在執(zhí)行時需要獲取到這個 GIL ,在同一時刻中只有一個線程得到解釋鎖的執(zhí)行, Python 中的線程并沒有真正意義上的并發(fā)執(zhí)行,多線程的執(zhí)行效率也不一定比單線程的效率更高。 如果要充分利用現(xiàn)代多核 CPU 的并發(fā)能力,就要使用 multipleprocessing 模塊了。
0x01 multipleprocessing
與使用線程的 threading 模塊類似,
multipleprocessing
模塊提供許多高級 API 。最常見的是 Pool 對象了,使用它的接口能很方便地寫出并發(fā)執(zhí)行的代碼。
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發(fā)地映射到列表中的每個元素 print(p.map(f, [1, 2, 3])) # 執(zhí)行結(jié)果 # [1, 4, 9]
關(guān)于 Pool 下文中還會提到,這里我們先來看 Process 。
Process
要創(chuàng)建一個進(jìn)程可以使用 Process 類,使用 start() 方法啟動進(jìn)程。
from multiprocessing import Process import os def echo(text): # 父進(jìn)程ID print("Process Parent ID : ", os.getppid()) # 進(jìn)程ID print("Process PID : ", os.getpid()) print('echo : ', text) if __name__ == '__main__': p = Process(target=echo, args=('hello process',)) p.start() p.join() # 執(zhí)行結(jié)果 # Process Parent ID : 27382 # Process PID : 27383 # echo : hello process
進(jìn)程池
正如開篇提到的
multiprocessing
模塊提供了 Pool 類可以很方便地實現(xiàn)一些簡單多進(jìn)程場景。 它主要有以下接口
- apply(func[, args[, kwds]])
- 執(zhí)行 func(args,kwds) 方法,在方法結(jié)束返回前會阻塞。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 異步執(zhí)行 func(args,kwds) ,會立即返回一個 result 對象,如果指定了 callback 參數(shù),結(jié)果會通過回調(diào)方法返回,還可以指定執(zhí)行出錯的回調(diào)方法 error_callback()
- map(func, iterable[, chunksize])
- 類似內(nèi)置函數(shù) map() ,可以并發(fā)執(zhí)行 func ,是同步方法
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- 異步版本的 map
- close()
- 關(guān)閉進(jìn)程池。當(dāng)池中的所有工作進(jìn)程都執(zhí)行完畢時,進(jìn)程會退出。
- terminate()
- 終止進(jìn)程池
- join()
-
等待工作進(jìn)程執(zhí)行完,必需先調(diào)用 close() 或者 terminate()
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發(fā)地映射到列表中的每個元素 a = p.map(f, [1, 2, 3]) print(a) # 異步執(zhí)行map b = p.map_async(f, [3, 5, 7, 11]) # b 是一個result對象,代表方法的執(zhí)行結(jié)果 print(b) # 為了拿到結(jié)果,使用join方法等待池中工作進(jìn)程退出 p.close() # 調(diào)用join方法前,需先執(zhí)行close或terminate方法 p.join() # 獲取執(zhí)行結(jié)果 print(b.get()) # 執(zhí)行結(jié)果 # [1, 4, 9] ## [9, 25, 49, 121]
map_async() 和 apply_async() 執(zhí)行后會返回一個 class multiprocessing.pool.AsyncResult 對象,通過它的 get() 可以獲取到執(zhí)行結(jié)果, ready() 可以判斷 AsyncResult 的結(jié)果是否準(zhǔn)備好。
進(jìn)程間數(shù)據(jù)的傳輸
multiprocessing 模塊提供了兩種方式用于進(jìn)程間的數(shù)據(jù)共享:隊列( Queue )和管道( Pipe )
Queue 是線程安全,也是進(jìn)程安全的。使用 Queue 可以實現(xiàn)進(jìn)程間的數(shù)據(jù)共享,例如下面的 demo 中子進(jìn)程 put 一個對象,在主進(jìn)程中就能 get 到這個對象。 任何可以序列化的對象都可以通過 Queue 來傳輸。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': # 使用Queue進(jìn)行數(shù)據(jù)通信 q = Queue() p = Process(target=f, args=(q,)) p.start() # 主進(jìn)程取得子進(jìn)程中的數(shù)據(jù) print(q.get()) # prints "[42, None, 'hello']" p.join() # 執(zhí)行結(jié)果 # [42, None, 'hello']
Pipe() 返回一對通過管道連接的 Connection 對象。這兩個對象可以理解為管道的兩端,它們通過 send() 和 recv() 發(fā)送和接收數(shù)據(jù)。
from multiprocessing import Process, Pipe def write(conn): # 子進(jìn)程中發(fā)送一個對象 conn.send([42, None, 'hello']) conn.close() def read(conn): # 在讀的進(jìn)程中通過recv接收對象 data = conn.recv() print(data) if __name__ == '__main__': # Pipe()方法返回一對連接對象 w_conn, r_conn = Pipe() wp = Process(target=write, args=(w_conn,)) rp = Process(target=read, args=(r_conn,)) wp.start() rp.start() # 執(zhí)行結(jié)果 # [42, None, 'hello']
需要注意的是,兩個進(jìn)程不能同時對一個連接對象進(jìn)行 send 或 recv 操作。
同步
我們知道線程間的同步是通過鎖機(jī)制來實現(xiàn)的,進(jìn)程也一樣。
from multiprocessing import Process, Lock import time def print_with_lock(l, i): l.acquire() try: time.sleep(1) print('hello world', i) finally: l.release() def print_without_lock(i): time.sleep(1) print('hello world', i) if __name__ == '__main__': lock = Lock() # 先執(zhí)行有鎖的 for num in range(5): Process(target=print_with_lock, args=(lock, num)).start() # 再執(zhí)行無鎖的 # for num in range(5): # Process(target=print_without_lock, args=(num,)).start()
有鎖的代碼將每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
如果執(zhí)行無鎖的代碼,則在我的電腦上執(zhí)行結(jié)果是這樣的
hello worldhello world? 0
1
hello world 2
hello world 3
hello world 4
除了 Lock ,還包括 RLock 、 Condition 、 Semaphore 和 Event 等進(jìn)程間的同步原語。其用法也與線程間的同步原語很類似。 API 使用可以參考文末中引用的文檔鏈接。
在工程中實現(xiàn)進(jìn)程間的數(shù)據(jù)共享應(yīng)當(dāng)優(yōu)先使用 隊列或管道。
0x02 總結(jié)
本文對 multiprocessing 模塊中常見的 API 作了簡單的介紹。講述了 Process 和 Pool 的常見用法,同時介紹了進(jìn)程間的數(shù)據(jù)方式:隊列和管道。最后簡單了解了進(jìn)程間的同步原語。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
