'''
進程間的通信
'''
"""
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基于消息傳遞實現(xiàn)的,
"""
from multiprocessing import Queue
q = Queue(3)
#put, get, put_nowait, get_nowait, full, empty
q.put(1)
q.put(2)
q.put(3)
# q.put(1)#隊列已經(jīng)滿了,再加程序就會一直停在這里,等待數(shù)據(jù)被別人取走,不取走就一直停在這
## q.get(1)#可以先取出一個,然后再加就可以了
## q.put(1)
# try:
# q.put_nowait(1) #使用這個綁定方法,隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
# except:
# print('隊列已經(jīng)滿了') #加了try之后,不阻塞,但是消息會丟
print(q.full())#查看隊列是否滿了
print(q.get())
print(q.get())
print(q.get())
# q.get(1)#同put 方法一樣,隊列空了繼續(xù)取就會出現(xiàn)阻塞。
#和上面方法類似
try:
q.get_nowait()
except:
print('隊列已經(jīng)空了')
print(q.empty())
# 子進程數(shù)據(jù)給父進程
import time
from multiprocessing import Process, Queue
def f(q):
q.put([time.asctime(), 'from Eva', 'hello'])
if __name__ == '__main__':
q = Queue() #創(chuàng)建一個Queue對象
p = Process(target=f, args=(q, )) #創(chuàng)建一個進程
p.start()
print(q.get())
p.join()
#
# #批量生產(chǎn)數(shù)據(jù)放入隊列再批量獲取結(jié)果
import os
import time
import multiprocessing
#向queue中輸入數(shù)據(jù)的函數(shù)
def inputQ(queue):
info = str(os.getpid()) + '(put): ' + str(time.asctime())
queue.put(info)
#向queue中輸出數(shù)據(jù)的函數(shù)
def outputQ(queue):
info = queue.get()
print(f'{str(os.getpid())} (get): {info}')
#Main
if __name__ == '__main__':
multiprocessing.freeze_support()
record1 = [] #store input processes
record2 = [] #store output processes
queue = multiprocessing.Queue(3)
#輸入進程
for i in range(10):
process = multiprocessing.Process(target=inputQ, args=(queue, ))
process.start()
record1.append(process)
#輸出進程
for i in range(10):
process = multiprocessing.Process(target=outputQ, args = (queue, ))
process.start()
record2.append(process)
for p in record1:
p.join()
for p in record2:
p.join()
import os
import time
import multiprocessing
#向隊列中輸入數(shù)據(jù)
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.asctime())
queue.put(info)
#向隊列中輸出數(shù)據(jù)
def outputQ(queue):
info = queue.get()
print(f'{str(os.getpid())} (get: ) {info}')
if __name__ == '__main__':
multiprocessing.freeze_support()
p1_list = []
p2_list = []
q = multiprocessing.Queue(3)
for i in range(10):
p = multiprocessing.Process(target=inputQ, args=(q, ))
p.start()
p1_list.append(p)
for i in range(10):
p = multiprocessing.Process(target=outputQ, args=(q, ))
p.start()
p2_list.append(p)
for k in p1_list:
k.join()
for j in p2_list:
j.join()
#基于隊列實現(xiàn)生產(chǎn)者和消費者模型(生產(chǎn)者生產(chǎn)出來放到阻塞隊列里,消費者直接從阻塞隊列中取需要的東西)
from multiprocessing import Process, Queue #導入進程和隊列模塊
import time, random,os#導入時間、隨機數(shù)和os模塊
def consume(q):#定義消費者函數(shù)
while True: #循環(huán)消費
res = q.get() #從隊列中取東西
time.sleep(random.randint(1, 3)) #隨機睡幾秒
print(f'{str(os.getpid())} 吃 {res}') #打印出來
def producer(q): #定義生產(chǎn)者函數(shù)
for i in range(10): #生產(chǎn)10個包子
time.sleep(random.randint(1, 3)) #隨機睡幾秒
res = f'包子{i}' #生產(chǎn)包子標記下來
q.put(res) #把包子放到阻塞隊列里面
print(f'生產(chǎn)了{(os.getpid(), res)}') #打印東西
if __name__ == '__main__':
q = Queue() #阻塞隊列
#生產(chǎn)者們:即廚師們
p1 = Process(target=producer, args=(q, ))
#消費者們:即吃貨們
c1 = Process(target=consume, args=(q, ))
#開始
p1.start()
c1.start()
print('主')
#生產(chǎn)者和消費者(改良版)
'''
上面的版本主進程永遠不會結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,
但是消費者c在取空了q之后,一直處于死循環(huán)卡在q.get這一步
解決方法是生產(chǎn)者生產(chǎn)完成之后,網(wǎng)隊列中組發(fā)一個結(jié)束信號,這樣消費者在接收到結(jié)束
信號后就可以break出死循環(huán)
注意:結(jié)束信號None,不一定要由生產(chǎn)者發(fā),主進程里面同樣可以發(fā),但主進程需要
等生產(chǎn)者結(jié)束后才應該發(fā)送該信號
'''
import os, random, time
from multiprocessing import Process, Queue
#定義消費者函數(shù)
def consume(queue):
while True:
res = queue.get()
if res is None:break #收到結(jié)束信號則結(jié)束
time.sleep(random.randint(1, 3))
print(f'{str(os.getpid())} 吃了 {res}')
#定義生產(chǎn)者函數(shù)
def producer(queue):
for i in range(10):
time.sleep(random.randint(1, 3))
res = f'包子{i}'
queue.put(res)
print(f'{os.getpid()}生產(chǎn)了{res}')
# queue.put(None) #生產(chǎn)者發(fā)送結(jié)束信號
if __name__ == '__main__':
q = Queue(3)
#生產(chǎn)者
p = Process(target=producer, args=(q, ))
#消費者
q1 = Process(target=consume, args=(q, ))
p.start()
q1.start()
p.join()
q.put(None) #主進程里面發(fā)送結(jié)束信號
q1.join() #可加可不加
print('主線程')
# 多個消費者就需要發(fā)送多個None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到結(jié)束信號則結(jié)束
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
def producer(name,q):
for i in range(2):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('生產(chǎn)了 %s %s' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生產(chǎn)者們:即廚師們
p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨頭',q))
p3=Process(target=producer,args=('泔水',q))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
#開始
p1.start()
p2.start()
p3.start()
c1.start()
# 必須保證生產(chǎn)者全部生產(chǎn)完畢,才應該發(fā)送結(jié)束信號
p1.join()
p2.join()
p3.join()
# 有幾個消費者就應該發(fā)送幾次結(jié)束信號None
q.put(None)
q.put(None) #發(fā)送結(jié)束信號
print('主')
#JoinableQueue隊列實現(xiàn)消費者與生產(chǎn)者模型
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
#向q.join()發(fā)送一次信號,證明一個數(shù)據(jù)已經(jīng)被取走了
q.task_done()
def producer(name,q):
for i in range(2):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('生產(chǎn)了 %s %s' %(os.getpid(),res))
q.join() #生產(chǎn)完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理
if __name__ == '__main__':
q=JoinableQueue()
#生產(chǎn)者們:即廚師們
p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨頭',q))
p3=Process(target=producer,args=('泔水',q))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
c1.daemon=True #設置成守護進程
c2.daemon=True #設置成守護進程
#開始
p_1 = [p1, p2, p3, c1, c2]
for p in p_1:
p.start()
# 必須保證生產(chǎn)者全部生產(chǎn)完畢,才應該發(fā)送結(jié)束信號
p1.join()
p2.join()
p3.join()
print('主')
'''
主進程等--->p1, p2, p3等--->c1, c2
p1, p2, p3結(jié)束了,證明c1, c2肯定全部收完了p1, p2, p3發(fā)到隊列的數(shù)據(jù)
因而c1, c2也沒有存在的價值了,不需要繼續(xù)阻塞在進程中影響主進程了,
應該隨著主進程的結(jié)束而結(jié)束,所以設置成守護進程就可以了
'''
更多文章、技術交流、商務合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
