多任務編程
意義 :充分利用計算機的資源提高程序的運行效率
定義 :通過應用程序利用計算機多個核心,達到同時執行多個任務的目的
實施方案
:
多進程
、
多線程
并行 :多個計算機核心 并行的 同時處理 多個任務
并發 :內核在 多個任務間 不斷 切換 ,達到好像內核在同時處理多個任務的運行效果
進程 :程序在計算機中運行一次的過程
程序 :是一個可執行文件,是靜態的,占有磁盤,不占有計算機運行資源
進程 :進程是一個動態的過程描述,占有CPU內存等計算機資源的,有一定的生命周期
* 同一個程序的不同執行過程是不同的進程,因為分配的計算機資源等均不同
父子進程 : 系統中每一個進程(除了系統初始化進程)都有唯一的父進程,可以有0個或多個子進
程。父子進程關系便于進程管理。
進程
CPU時間片: 如果一個進程在某個時間點被計算機分配了內核,我們稱為該進程在CPU時間片上。
PCB( 進程控制塊):存放進程消息的空間
進程ID(PID):進程在操作系統中的唯一編號,由系統自動分配
進程信息包括:進程PID,進程占有的內存位置,創建時間,創建用戶. . . . . . . .??
進程特征:
- 進程是操作系統分配計算機資源的最小單位
- 每一個進程都有自己單獨的 虛擬 內存空間
- 進程間的執行相互獨立,互不影響
進程的狀態
1、 三態
- 就緒態: 進程具備執行條件, 等待系統分配CPU
- 運行態: 進程 占有CPU處理器 ,處于運行狀態
- 等待態: 進程 暫時不具備運行條件 ,需要 阻塞 等待,讓出CPU
2、 五態(增加新建態和終止態)
- 新建態: 創建一個 新的進程 ,獲取資源的過程
- 終止態: 進程結束 釋放資源的過程
查看進程樹: pstree
查看父進程PID: ? ps -ajx
linux查看進程命令: ps -aux
有一列為STAT為進程的狀態
D 等待態 (不可中斷等待)(
阻塞
)
S 等待態 (可中斷等待)(
睡眠
)
T 等待態 (
暫停
狀態)
R 運行態 (就緒態運行態)
Z 僵尸態
+ 前臺進程(能在終端顯示出現象的)
< 高優先級
N 低優先級
l 有多線程的
s 會話組組長
?
os.fork創建進程
pid = os.fork()
功能:創建一個子進程
返回值:創建成功在原有的進程中返回子進程的PID,在子進程中返回0;創建失敗返回一個負數
父子進程通常會根據fork返回值的差異選擇執行不同的代碼(使用if結構)
import os from time import sleep pid = os.fork() if pid < 0: print ( " 創建進程失敗 " ) # 子進程執行部分 elif pid == 0: print ( " 新進程創建成功 " ) # 父進程執行部分 else : sleep( 1 ) print ( " 原來的進程 " ) print ( " 程序執行完畢 " ) # 新進程創建成功 # 原來的進程 # 程序執行完畢
- 子進程會復制父進程全部代碼段 (包括fork前的代碼) 但是子進程僅從fork的下一句開始執行
- 父進程不一定先執行 (進程之間相互獨立,互不影響)
- 父子進程各有自己的屬性特征,比如:PID號PCB內存空間
- 父進程fork之前開辟的空間子進程同樣擁有,但是進程之間相互獨立,互不影響.
父子進程的變量域
import os from time import sleep a = 1 pid = os.fork() if pid < 0: print ( " 創建進程失敗 " ) elif pid == 0: print ( " 子進程 " ) print ( " a = " ,a) a = 10000 print ( " a = " ,a) else : sleep( 1 ) print ( " 父進程 " ) print ( " parent a : " ,a) # a = 1 # 子進程 # a = 1 # a = 10000 # 父進程 # parent a : 1
進程ID和退出函數
os.getpid() ? 獲取當前進程的PID號
返回值:返回PID號
os.getppid()? 獲取父類進程的進程號
返回值:返回PID號
import os pid = os.fork() if pid < 0: print ( " Error " ) elif pid == 0: print ( " Child PID: " , os.getpid()) # 26537 print ( " Get parent PID: " , os.getppid()) # 26536 else : print ( " Get child PID: " , pid) # 26537 print ( " Parent PID: " , os.getpid()) # 26536
os._exit(status) ?? 退出進程
參數:進程的退出狀態 整數
sys.exit([status]) ?? 退出進程
參數:默認為0 整數則表示退出狀態;符串則表示退出時打印內容
sys.exit([status])可以通過 捕獲 SystemExit 異常 阻止退出
import os,sys # os._exit(0) # 退出進程 try : sys.exit( " 退出 " ) except SystemExit as e: print ( " 退出原因: " ,e) # 退出原因: 退出
孤兒和僵尸
孤兒進程
父進程先于子進程退出,此時子進程就會變成孤兒進程
孤兒進程會被系統指定的進程收養,即系統進程會成為該孤兒進程新的父進程。孤兒進程退出時該父進程會處理退出狀態
僵尸進程
子進程先與父進程退出,父進程沒有處理子進程退出狀態,此時子進程成為僵尸進程
僵尸進程已經結束,但是會滯留部分PCB信息在內存,大量的僵尸會消耗系統資源,應該盡量避免
如何避免僵尸進程的產生
父進程處理子進程退出狀態
pid, status = os.wait()
功能:在父進程中 阻塞等待 處理子進程的退出
返回值: pid? ? ?退出的子進程的PID號
status 子進程的退出狀態
import os, sys pid = os.fork() if pid < 0: print ( " Error " ) elif pid == 0: print ( " Child process " , os.getpid()) # Child process 27248 sys.exit(1 ) else : pid, status = os.wait() # 阻塞等待子進程退出 print ( " pid : " , pid) # pid : 27248 # 還原退出狀態 print ( " status: " , os.WEXITSTATUS(status)) # status: 1 while True: pass
創建二級子進程
- 父進程創建子進程等待子進程退出
- 子進程創建二級子進程,然后馬上退出
- 二級子進程成為孤兒,處理具體事件
import os from time import sleep def fun1(): sleep( 3 ) print ( " 第一件事情 " ) def fun2(): sleep( 4 ) print ( " 第二件事情 " ) pid = os.fork() if pid < 0: print ( " Create process error " ) elif pid == 0: # 子進程 pid0 = os.fork() # 創建二級進程 if pid0 < 0: print ( " 創建二級進程失敗 " ) elif pid0 == 0: # 二級子進程 fun2() # 做第二件事 else : # 二級進程 os._exit(0) # 二級進程退出 else : os.wait() fun1() # 做第一件事 # 第一件事情 # 第二件事情
通過信號處理子進程退出
原理: 子進程退出時會發送信號給父進程,如果父進程忽略子進程信號, 則系統就會自動處 理子進程退出。
方法: 使用signal模塊在父進程創建子進程前寫如下語句 :
import signal
signal.signal(signal.SIGCHLD,signal.SIG_IGN)
特點 : 非阻塞,不會影響父進程運行。可以處理所有子進程退出
Multiprocessing創建進程
步驟:
- 需要將要做的事情封裝成函數
- multiprocessing.Process創建進程,并綁定函數
- start啟動進程
- join回收進程 ?
p = multiprocessing.Process(target, [name], [args], [kwargs]) ?
創建進程對象
參數:
- target : 要綁定的函數名
- name : 給進程起的名稱 (默認Process-1)
- args:?元組?用來給target函數傳參
- kwargs :?字典?用來給target函數鍵值傳參
p.start()
功能
?: 啟動進程 自動運行terget綁定函數。此時進程被創建
p.join([timeout])
功能
: 阻塞等待子進程退出,最后回收進程
參數
: 超時時間
multiprocessing的注意事項:
- 使用multiprocessing創建進程子進程同樣復制父進程的全部內存空間,之后有自己獨立的空間,執行上互不干擾
- 如果不使用join回收可能會產生僵尸進程
- 一般父進程功能就是創建子進程回收子進程,所有事件交給子進程完成
- multiprocessing創建的子進程無法使用ptint
import multiprocessing as mp from time import sleep import os a = 1 def fun(): sleep( 2 ) print ( " 子進程事件 " ,os.getpid()) global a a = 10000 print ( " a = " ,a) p = mp.Process(target = fun) # 創建進程對象 p.start() # 啟動進程 sleep(3 ) print ( " 這是父進程 " ) p.join() # 回收進程 print ( " parent a: " ,a) # 子進程事件 5434 # a = 10000 # 這是父進程 # parent a: 1 Process(target)
multiprocessing進程屬性
p.name ? ? 進程名稱
p.pid 對應子進程的PID號
p.is_alive() ? 查看子進程是否在生命周期
p.daemon? ? ? ? ?設置父子進程的退出關系
如果等于True則子進程會隨父進程的退出而結束,就不用使用 join(),必須要求在start()前設置
進程池
引言:如果有大量的任務需要多進程完成,而任務周期又比較短且需要頻繁創建。此時可能產生大量進程頻繁創建銷毀的情況,消耗計算機資源較大,這個時候就需要進程池技術
進程池的原理:創建一定數量的進程來處理事件,事件處理完進程不退出而是繼續處理其他事件,直到所有事件全都處理完畢統一銷毀。增加進程的重復利用,降低資源消耗。 ?
1.創建進程池,在池內放入適當數量的進程
from multiprocessing import Pool
Pool(processes) 創建進程池對象
- 參數:進程數量
- 返回 :?指定進程數量,默認根據系統自動判定 ?
2.將事件封裝函數,放入到進程池
pool.apply_async(fun,args,kwds) 將事件放入進程池執行
參數:
- fun 要執行的事件函數
- args 以元組為fun傳參
- kwds 以字典為fun傳參
返回值 : ?
- 返回一個事件對象 通過get()屬性函數可以獲取fun的返回值?
3.關閉進程池
?pool.close() 關閉進程池,無法再加入事件
4.回收進程
pool.join() 回收進程池 ?
from multiprocessing import Pool from time import sleep,ctime pool = Pool(4) # 創建進程池 # 進程池事件 def worker(msg): sleep( 2 ) print (msg) return ctime() # 向進程池添加執行事件 for i in range(4 ): msg = " Hello %d " % i # r 代表func事件的一個對象 r = pool.apply_async(func=worker,args= (msg,)) pool.close() # 關閉進程池 pool.join() # 回收進程池 # Hello 3 # Hello 2 # Hello 0 # Hello 1
進程間通信(IPC)
由于進程間空間獨立,資源無法共享,此時在進程間通信就需要專門的通信方法。
進程間通信方法 : 管道 消息隊列 共享內存 信號信號量 套接字
管道通信(Pipe)
通信原理:在內存中開辟管道空間,生成管道操作對象,多個進程使用同一個管道對象進行讀寫即可實現通信
from multiprocessing import Pipe
fd1, fd2 = Pipe(duplex = True)
- 功能:創建管道
- 參數:默認表示雙向管道,如果為False 表示單向管道
- 返回值:表示管道兩端的讀寫對象;如果是雙向管道均可讀寫;如果是單向管道fd1只讀 fd2只寫
fd.recv()
- 功能 : 從管道獲取內容
- 返回值:獲取到的數據,當管道為空則阻塞
fd.send(data)
- 功能: 向管道寫入內容
- 參數: 要寫入的數據?
注意:
- multiprocessing中管道通信只能用于父子關系進程中?
- 管道對象在父進程中創建,子進程通過父進程獲取 ?
from multiprocessing import Pipe, Process fd1, fd2 = Pipe() # 創建管道,默認雙向管道 def fun1(): data = fd1.recv() # 從管道獲取消息 print ( " 管道2傳給管道1的數據 " , data) inpu = " 跟你說句悄悄話 " fd1.send(inpu) def fun2(): fd2.send( " 肥水不流外人天 " ) data = fd2.recv() print ( " 管道1傳給管道2的數據 " , data) p1 = Process(target= fun1) P2 = Process(target= fun2) p1.start() P2.start() p1.join() P2.join() # 管道2傳給管道1的數據 肥水不流外人天 # 管道1傳給管道2的數據 跟你說句悄悄話
消息隊列
從內存中開辟隊列結構空間,多個進程可以向隊列投放消息,在取出來的時候按照 先進先出 順序取出 ?
q = Queue(maxsize = 0)
創建隊列對象
- maxsize :默認表示系統自動分配隊列空間;如果傳入正整數則表示最多存放多少條消息
- 返回值 : 隊列對象
q.put(data,[block,timeout])
向隊列中存入消息
- data: 存放消息( python 數據類型)
- block: 默認為 True 表示當前隊列滿的時候阻塞,設置為 False 則表示非阻塞
- timeout: 當 block 為 True 表示超時時間
返回值:返回獲取的消息
q.get([block,timeout])
從隊列取出消息
- 參數:block 設置是否阻塞 False為非阻塞;timeout 超時檢測
- 返回值: 返回獲取到的內容
q.full() 判斷隊列是否為滿
q.empty() 判斷隊列是否為空
q.qsize() 判斷當前隊列有多少消息?
q.close() 關閉隊列
from multiprocessing import Process, Queue from time import sleep from random import randint # 創建消息隊列 q = Queue(3 ) # 請求進程 def request(): for i in range(2 ): x = randint(0, 100 ) y = randint(0, 100 ) q.put((x, y)) # 處理進程 def handle(): while True: sleep( 1 ) try : x, y = q.get(timeout=2 ) except : break else : print ( " %d + %d = %d " % (x, y, x + y)) p1 = Process(target= request) p2 = Process(target= handle) p1.start() p2.start() p1.join() p2.join() # 12 + 61 = 73 # 69 + 48 = 117
共享內存?
在內存中開辟一段空間,存儲數據,對多個進程可見,每次寫入共享內存中的數據會覆蓋之前的內容,效率高,速度快
from multiprocessing import Value, Array
obj = Value(ctype,obj)
功能 :開辟共享內存空間?
參數 :ctype 字符串 要轉變的c的數據類型,對比類型對照表
obj 共享內存的初始化數據
返回:共享內存對象
from multiprocessing import Process,Value import time from random import randint # 創建共享內存 money = Value( ' i ' , 5000 ) # 修改共享內存 def man(): for i in range(30 ): time.sleep( 0.2 ) money.value += randint(1, 1000 ) def girl(): for i in range(30 ): time.sleep( 0.15 ) money.value -= randint(100, 800 ) m = Process(target= man) g = Process(target= girl) m.start() g.start() m.join() g.join() print ( " 一月余額: " , money.value) # 獲取共享內存值 # 一月余額: 4264
obj = Array(ctype,obj)
功能 :開辟共享內存
參數 :ctype 要轉化的c的類型
obj 要存入共享的數據
如果是列表 將列表存入共享內存,要求數據類型一致
如果是正整數 表示開辟幾個數據空間
from multiprocessing import Process, Array # 創建共享內存 # shm = Array('i',[1,2,3]) # shm = Array('i',3) # 表示開辟三個空間的列表 shm = Array( ' c ' ,b " hello " ) # 字節串 def fun(): # 共享內存對象可迭代 for i in shm: print (i) shm[0] = b ' H ' p = Process(target= fun) p.start() p.join() for i in shm: # 子進程修改,父進程中也跟著修改 print (i) print (shm.value) # 打印字節串 b'Hello'
信號量(信號燈集)
通信原理:給定一個數量對多個進程可見。多個進程都可以操作該數量增減,并根據數量值決定自己的行為。
from multiprocessing import Semaphore
sem = Semaphore(num)
創建信號量對象
- 參數 : 信號量的初始值
- 返回值 : 信號量對象
sem.acquire()?
將信號量減1 當信號量為0時阻塞
sem.release()?
將信號量加1
sem.get_value()
?獲取信號量數量
from multiprocessing import Process, Semaphore sem = Semaphore(3) # 創建信號量,最多允許3個任務同時執行 def rnewu(): sem.acquire() # 每執行一次減少一個信號量 print ( " 執行任務.....執行完成 " ) sem.release() # 執行完成后增加信號量 for i in range(3): # 有3個人想要執行任務 p = Process(target= rnewu) p.start() p.join()
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
