一、操作系統中相關進程的知識
??Unix/Linux操作系統提供了一個fork()系統調用,它非常特殊。普通的函數調用,調用一次,返回一次,但是fork()調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。 ??子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調用getppid()就可以拿到父進程的ID。 ??Python的os模塊封裝了常見的系統調用,其中就包括fork,可以在Python程序中輕松創建子進程。
示例如下
import os
pid=os.fork()
if pid==0:
print('I am child process %s my parents is %s'%(os.getpid(),os.getppid()))
else:
print('I (%s) just created a child process (%s).'%(os.getpid(),pid))
輸出如下
I (64225) just created a child process (64226).
I am child process 64226 my parents is 64225
二、跨平臺模塊multiprocessing
multiprocessing
模塊提供了一個Process類來代表一個進程對象。
示例1
from multiprocessing import Process
import os
# 子進程要執行的代碼
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getppid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
#join()方法可以等待子進程結束后再繼續往下運行,通常用于進程間的同步。
示例2
from multiprocessing import Process
import time
import os
class P(Process):
def run(self):
print('Run child process %s (%s)...'%(self.name,os.getpid())) # 默認函數對象有name方法 ,結果為:P-1
time.sleep(3)
print('%s is done' % self.name)
if __name__ == '__main__':
print('Parent process %s.' % os.getppid())
p=P()
p.start()
p.join()
三、進程數據隔離
多個進程間的數據是隔離的,也就是說多個進程修改全局變量互不影響
驗證示例
from multiprocessing import Process
import time
x=100
def task():
global x
print('子進程開啟,當前x的值為%d'%x)
time.sleep(3)
x=10
print('子進程結束,當前x的值為%d'%x)
if __name__ == '__main__':
print('當前為父進程,準備開啟子進程,x的值為%d' % x)
p1=Process(target=task)
p1.start()
p1.join()
print('當前為父進程,準備結束父進程,x的值為%d' % x)
輸出
當前為父進程,準備開啟子進程,x的值為100
子進程開啟,當前x的值為100
子進程結束,當前x的值為10
當前為父進程,準備結束父進程,x的值為100
== 注意:有些情況是需要加鎖的情況,如文件讀寫問題 ==
四、多進程并行執行
示例如下
import time
from multiprocessing import Process
def task(name,n):
print('%s is running'%name)
time.sleep(n)
print('%s is done'%name)
if __name__ == '__main__':
p1=Process(target=task,args=("進程1",1)) #用時1s
p2=Process(target=task,args=("進程2",2)) #用時1s
p3=Process(target=task,args=("進程3",3)) #用時1s
start_time=time.time()
p1.start()
p2.start()
p3.start()
# 當第一秒在運行p1時,其實p2、p3也已經在運行,當1s后到p2時只需要再運行1s就到p3了,到p3也是一樣。
p1.join()
p2.join()
p3.join()
stop_time=time.time()
print(stop_time-start_time) #3.2848567962646484
五、進程池
1、線性執行( pool.apply() )
from multiprocessing import Pool # 導入進程池模塊pool
import time,os
def foo(i):
time.sleep(2)
print("in process", os.getpid()) # 打印進程號
if __name__ == "__main__":
pool = Pool(processes=5) # 設置允許進程池同時放入5個進程
for i in range(10):
pool.apply(func=foo, args=(i,)) # 同步執行掛起進程
print('end')
pool.close() # 關閉進程池,不再接受新進程
pool.join() # 進程池中進程執行完畢后再關閉,如果注釋掉,那么程序直接關閉。
2、并發執行( pool.apply_async() )
from multiprocessing import Pool # 導入進程池模塊pool
import time,os
def foo(i):
time.sleep(2)
print("in process", os.getpid()) # 打印進程號
if __name__ == "__main__":
pool = Pool(processes=5) # 設置允許進程池同時放入5個進程,并且將這5個進程交給cpu去運行
for i in range(10):
pool.apply_async(func=foo, args=(i,)) # 采用異步方式執行foo函數
print('end')
pool.close()
pool.join() # 進程池中進程執行完畢后再關閉,如果注釋掉,那么程序直接關閉。
3、設置回調
from multiprocessing import Process,Pool
import time,os
def foo(i):
time.sleep(2)
print("in process", os.getpid()) # 打印子進程的進程號
def bar(arg):#注意arg參數是必須要有的
print('-->exec done:', arg, os.getpid()) # 打印進程號
if __name__ == "__main__":
pool = Pool(processes=2)
print("主進程", os.getpid()) # 主進程的進程號
for i in range(3):
pool.apply_async(func=foo, args=(i,), callback=bar) # 執行回調函數callback=Bar
print('end')
pool.close()
pool.join() # 進程池中進程執行完畢后再關閉,如果注釋掉,那么程序直接關閉。
執行結果
主進程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752
#回調函數說明fun=Foo干不完就不執行bar函數,等Foo執行完就去執行Bar
#這個回調函數是主進程去調用的,而不是每個子進程去調用的。
六、子進程
1、 很多時候子進程是一個外部進程,如執行一條命令,這和命令行執行效果是一樣的 ?? 示例如下
import subprocess
print('$nslookup https://www.baidu.com')
r = subprocess.call(['nslookup','https://www.baidu.com'])
print('Exit code',r)
2、
有時候子進程還需要進行輸入,可以通過
communicate
方法來輸入 ??
示例如下
import subprocess
print('$ nslookup https://www.baidu.com')
p = subprocess.Popen(['nslookup'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
output,err = p.communicate(b'set q=mx\nbaidu.com\nexit\n')
print(output.decode('gbk'))
print('Exit code:',p.returncode)
輸出如下
$ nslookup https://www.baidu.com
默認服務器: bogon
Address: 192.168.111.1
> > 服務器: bogon
Address: 192.168.111.1
baidu.com MX preference = 10, mail exchanger = mx.maillb.baidu.com
baidu.com MX preference = 20, mail exchanger = jpmx.baidu.com
baidu.com MX preference = 15, mail exchanger = mx.n.shifen.com
baidu.com MX preference = 20, mail exchanger = mx50.baidu.com
baidu.com MX preference = 20, mail exchanger = mx1.baidu.com
>
Exit code: 0
七、守護進程
守護進程在主進程代碼執行完畢時立刻掛掉,然后主進程等待非守護進程執行完畢后回收子進程的資源(避免產生僵尸進程),整體才算結束。
示例
from multiprocessing import Process
import os
import time
def task(x):
print('%s is running ' %x)
time.sleep(3)
print('%s is done' %x)
if __name__ == '__main__':
p1=Process(target=task,args=('守護進程',))
p2=Process(target=task,args=('子進程',))
p2.start()
p1.daemon=True # 設置p1為守護進程
p1.start()
print('主進程代碼執行完畢')
>>:主進程代碼執行完畢
>>:子進程 is running
>>:子進程 is done
== 可以從結果看出,主進程代碼執行完,守護進程立即掛掉,主進程在等待子進程執行完畢后退出 ==
八、進程間通信
??如果想要進程間通信可以使用
Queue
或
Pipe
來實現 ??
使用Queue示例
from multiprocessing import Queue,Process
def put_id(q):
q.put([1,2,3,4])
if __name__ == '__main__':
q=Queue()
p=Process(target=put_id,args=(q,))
p.start()
print(q.get())
p.join()
# 輸出
[1,2,3,4]
== 注意:在這需要從multiprocessing導入Queue模塊 ==
使用Pipe示例
from multiprocessing import Process,Pipe
def put_id(conn):
conn.send([1,2,3])
conn.send([4,5,6])
conn.close()
if __name__ == '__main__':
## 生成管道。 生成時會產生兩個返回對象,這兩個對象相當于兩端的電話,通過管道線路連接。
## 兩個對象分別交給兩個變量。
parent_conn,child_conn=Pipe()
p=Process(target=put_id,args=(child_conn,))#child_conn需要傳給對端,用于send數據給parent_conn
p.start()
print(parent_conn.recv()) # parent_conn在這斷用于接收數據>>>>[1,2,3]
print(parent_conn.recv()) # parent_conn在這斷用于接收數據>>>>[4,5,6]
p.join()
== 注意兩端要發送次數和接受次數要對等,不然會卡住直到對等 ==
九、進程間數據共享(字典和列表型)
??前面說過,進程間數據是隔離的,如果想要進程間數據共享可以通過
Manager
來實現 ??
示例如下
from multiprocessing import Manager,Process
from random import randint
import os
def run(d,l):
d[randint(1,50)]=randint(51,100)#生成一個可在多個進程之間傳遞和共享的字典
l.append(os.getpid())
print(l)
if __name__ == '__main__':
with Manager() as manage: #做一個別名,此時manager就相當于Manager()
d=manage.dict()#生成一個可在多個進程之間傳遞和共享的字典
l=manage.list(range(5))#生成一個可在多個進程之間傳遞和共享的列表
p_list=[]
for i in range(10):#生成10個進程
p=Process(target=run,args=(d,l))
p_list.append(p)# 將每個進程放入空列表中
p.start()
for i in p_list:
i.join()
print(d)#所有進程都執行完畢后打印字典
print(l)#所有進程都執行完畢后打印列表
十、分布式進程
??在做分布式計算時顯然進程比線程各合適,一來進程更穩定,二來線程最多只能在同一臺機器的多個cpu上運行; ??
multiprocessing
的
managers
子模塊支持把多進程分布到多個機器上,一個服務進程用作調度者,依靠網絡將任務分布到其它多個進程中。 ??假設有一個需求,擁有兩臺機器,一臺機器用來做發送任務的服務進程,一臺用來做處理任務的服務進程; ??
示例如下
# task_master.py
from multiprocessing.managers import BaseManager
from queue import Queue
import random
import time
task_queue = Queue()
result_queue = Queue()
class QueueManager(BaseManager):
pass
def get_task_queue():
global task_queue
return task_queue
def get_result_queue():
global result_queue
return result_queue
if __name__ == '__main__':
# 將兩個隊列注冊到網絡上,calltable參數關聯Queue對象
QueueManager.register('get_task_queue', callable=get_task_queue)
QueueManager.register('get_result_queue', callable=get_result_queue)
# 創建一個隊列管理器,綁定端口5000,設定密碼為abc
manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')
manager.start()
# 通過網絡獲取Queue對象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放任務進去
for i in range(10):
n = random.randint(0,1000)
print('Put Task %d'%n)
task.put(n)
# 從結果隊列獲取結果
print('Try get results')
for i in range(10):
r = result.get()
print('Result: %s' % r)
manager.shutdown()
print('master exit')
==
注意:一定要用注冊過的Queue對象,另外在linux/unix/mac等系統上注冊可直接使用
QueueManager.register('get_result_queue', callable=lambda : result_queue)
==
# task_worker.py
from multiprocessing.managers import BaseManager
from queue import Queue
from queue import Empty
import time
class QueueManager(BaseManager):
pass
if __name__ == '__main__':
# 從服務器上獲取,所以注冊時只需要提供名字,也就是接口名字
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 連接到服務器,也就是task_master.py的機器
server_addr = '127.0.0.1'
manager = QueueManager(address=(server_addr,5000),authkey=b'abc')
manager.connect()
# 獲取Queue對象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 從隊列提取任務,將處理結果插入result隊列
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d*%d'%(n,n))
r = '%d * %d = %d'%(n,n,n*n)
time.sleep(1)
result.put(r)
except Empty:
print('task queue is empty')
print('worker exit')
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
