一、進程和線程
進程
假如有兩個程序A和B,程序A在執行到一半的過程中,需要讀取大量的數據輸入(I/
O操作),
而此時CPU只能靜靜地等待任務A讀取完數據才能繼續執行,這樣就白白浪費了CPU資源。
是不是在程序A讀取數據的過程中,讓程序B去執行,當程序A讀取完數據之后,讓
程序B暫停,然后讓程序A繼續執行?
當然沒問題,但這里有一個關鍵詞:切換
既然是切換,那么這就涉及到了狀態的保存,狀態的恢復,加上程序A與程序B所需要的系統資
源(內存,硬盤,鍵盤等等)是不一樣的。自然而然的就需要有一個東西去記錄程序A和程序B
分別需要什么資源,怎樣去識別程序A和程序B等等,所以就有了一個叫進程的抽象
進程定義:
進程就是一個程序在一個數據集上的一次動態執行過程。
進程一般由程序、數據集、進程控制塊三部分組成。
我們編寫的程序用來描述進程要完成哪些功能以及如何完成;
數據集則是程序在執行過程中所需要使用的資源;
進程控制塊用來記錄進程的外部特征,描述進程的執行變化過程,系統可以利用它來控制和管理進程,它是系
統感知進程存在的唯一標志。
舉一例說明進程:
想象一位有一手好廚藝的計算機科學家正在為他的女兒烘制生日蛋糕。他有做生日蛋糕的食譜,廚房里有所需
的原料:面粉、雞蛋、糖、香草汁等。在這個比喻中,做蛋糕的食譜就是程序(即用適當形式描述的算法)計算機科學家就是處理器(cpu),
而做蛋糕的各種原料就是輸入數據。進程就是廚師閱讀食譜、取來各種原料以及烘制蛋糕等一系列動作的總和。
現在假設計算機科學家的兒子哭著跑了進來,說他的頭被一只蜜蜂蟄了。計算機科學家就記錄下他
照著食譜做到哪兒了(保存進程的當前狀態),然后拿出一本急救手冊,按照其中的指示處理蟄傷。這
里,我們看到處理機從一個進程(做蛋糕)切換到另一個高優先級的進程(實施醫療救治),每個進程
擁有各自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完之后,這位計算機科學家又回來做蛋糕,從他
離開時的那一步繼續做下去。
線程
線程的出現是為了降低上下文切換的消耗,提高系統的并發性,并突破一個進程只能干一樣事的缺陷,
使到進程內并發成為可能。
假設,一個文本程序,需要接受鍵盤輸入,將內容顯示在屏幕上,還需要保存信息到硬盤中。若只有
一個進程,勢必造成同一時間只能干一樣事的尷尬(當保存時,就不能通過鍵盤輸入內容)。若有多
個進程,每個進程負責一個任務,進程A負責接收鍵盤輸入的任務,進程B負責將內容顯示在屏幕上的
任務,進程C負責保存內容到硬盤中的任務。這里進程A,B,C間的協作涉及到了進程通信問題,而且
有共同都需要擁有的東西-------
文本內容,不停的切換造成性能上的損失。若有一種機制,可以使
任務A,B,C共享資源,這樣上下文切換所需要保存和恢復的內容就少了,同時又可以減少通信所帶
來的性能損耗,那就好了。是的,這種機制就是線程。
線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程中的最小單元,由線程ID、程序
計數器、寄存器集合和堆棧共同組成。線程的引入減小了程序并發執行時的開銷,提高了操作系統的并發
性能。線程沒有自己的系統資源。
線程進程的關系區別:
1、一個程序至少有一個進程,一個進程至少有一個線程.(進程可以理解成線程的容器)
2、進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序的運行效率。
3、線程在執行過程中與進程還是有區別的。每個獨立的線程有一個程序運行的入口、順序執行序列和
程序的出口。但是線程不能夠獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。?
4、進程是具有一定獨立功能的程序關于某個數據集合上的一次運行活動,進程是系統進行資源分配和調
度的一個獨立單位.?
線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程
自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧)但是
它可與同屬一個進程的其他的線程共享進程所擁有的全部資源.?
一個線程可以創建和撤銷另一個線程;同一個進程中的多個線程之間可以并發執行.
python的GIL
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-
safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行
?
二、python的線程與threading模塊
1、線程的兩種調用方式
threading 模塊建立在thread模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊通過對thread進行二次封裝,提供了更方便的api來處理線程。
?
直接調用:
import
threading
import
time
def sayhi(num): #定義每個線程要運行的函數
print("running on number:%s" %
num)
time.sleep(3
)
if __name__ == '__main__'
:
t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例
t1.start() #啟動線程 t2.start() #啟動另一個線程 print(t1.getName()) #獲取線程名 print(t2.getName())
?
繼承式調用:
import
threading
import
time
class
MyThread(threading.Thread):
def __init__
(self,num):
threading.Thread.__init__
(self)
self.num =
num def run(self):#定義每個線程要運行的函數 print("running on number:%s" %
self.num) time.sleep(3
) if __name__ == '__main__'
: t1 = MyThread(1
) t2 = MyThread(2
) t1.start() t2.start() print("ending......")? ? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
三、threading.thread的實例方法
join&Daemon方法
import
threading
from time import
ctime,sleep
import
time
def
ListenMusic(name):
print ("Begin listening to %s. %s" %
(name,ctime()))
sleep(3
) print("end listening %s"%
ctime()) def
RecordBlog(title): print ("Begin recording the %s! %s" %
(title,ctime())) sleep(5
) print('end recording %s'%
ctime()) threads =
[] t1 = threading.Thread(target=ListenMusic,args=('殺手'
,)) t2 = threading.Thread(target=RecordBlog,args=('python線程'
,)) threads.append(t1) threads.append(t2) if __name__ == '__main__'
: for t in
threads: #t.setDaemon(True) #注意:一定在start之前設置
t.start() # t.join() # t1.join()
t1.setDaemon(True) #t2.join()########考慮這三種join位置下的結果? print ("all over %s" %ctime())
join():在子線程完成運行之前,這個子線程的父線程將一直被阻塞。
setDaemon(True):
將線程聲明為守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程序會被無限掛起。這個方法基本和join是相反的。
當我們 在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那么當主線程完成
想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是 只要主線程
完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦
其他方法:
?
# run(): 線程被cpu調度后自動執行線程對象的run方法
# start():啟動線程活動。
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。
threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
四、同步鎖(Lock)
import
threading
import
time
def
addNum():
global num #在每個線程中都獲取這個全局變量
#num-=1
temp=
num
#print('--get num:',num )
time.sleep(0.1
) num =temp-1 #對此公共變量進行-1操作
num = 1000 #設定一個共享變量 thread_list =
[] for i in range(1000
): t = threading.Thread(target=
addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢
t.join() print('final num:', num )
觀察:time.sleep(0.1)? /0.001/0.0000001 結果分別是多少?
多個線程都在同時操作同一個共享資源,所以造成了資源破壞,怎么辦呢?(join會造成串行,失去所線程的意義)
我們可以通過同步鎖來解決這種問題
R=
threading.Lock()
####
def
sub():
global
num
R.acquire()
temp=num-1
time.sleep(0.1
)
num=
temp R.release()
五、遞歸鎖
在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源并且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所以這兩個線程在無外力作用下將一直等待下去。死鎖的例子
import
threading,time
class
myThread(threading.Thread):
def
doA(self):
lockA.acquire()
print(self.name,"gotlockA"
,time.ctime())
time.sleep(3
)
lockB.acquire()
print(self.name,"gotlockB"
,time.ctime()) lockB.release() lockA.release() def
doB(self): lockB.acquire() print(self.name,"gotlockB"
,time.ctime()) time.sleep(2
) lockA.acquire() print(self.name,"gotlockA"
,time.ctime()) lockA.release() lockB.release() def
run(self): self.doA() self.doB() if __name__=="__main__"
: lockA=
threading.Lock() lockB=
threading.Lock() threads=
[] for i in range(5
): threads.append(myThread()) for t in
threads: t.start() for t in
threads: t.join()#等待線程結束,后面再講。
解決辦法:使用遞歸鎖,將
lockA=
threading.Lock()
lockB=threading.Lock()
#--------------
lock=threading.RLock()
為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
應用
import
time
import
threading
class
Account:
def __init__
(self, _id, balance):
self.id =
_id
self.balance =
balance self.lock =
threading.RLock() def
withdraw(self, amount): with self.lock: self.balance -=
amount def
deposit(self, amount): with self.lock: self.balance +=
amount def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的場景
with self.lock: interest=0.05
count=amount+amount*
interest self.withdraw(count) def
transfer(_from, to, amount): #鎖不可以加在這里 因為其他的其它線程執行的其它方法在不加鎖的情況下數據同樣是不安全的
_from.withdraw(amount) to.deposit(amount) alex = Account('alex',1000
) yuan = Account('yuan',1000
) t1=threading.Thread(target = transfer, args = (alex,yuan, 100
)) t1.start() t2=threading.Thread(target = transfer, args = (yuan,alex, 200
)) t2.start() t1.join() t2.join() print('>>>'
,alex.balance) print('>>>',yuan.balance)
六、同步條件(Event)
import
threading,time
class
Boss(threading.Thread):
def
run(self):
print("BOSS:今晚大家都要加班到23:00。"
)
print
(event.isSet())
event.set()
time.sleep(5
) print("BOSS:<23:00>可以下班了。"
) print
(event.isSet()) event.set() class
Worker(threading.Thread): def
run(self): event.wait() print("Worker:哎……苦逼!"
) time.sleep(1
) event.clear() event.wait() print("Worker:OhYeah!"
) if __name__=="__main__"
: event=
threading.Event() threads=
[] for i in range(5
): threads.append(Worker()) threads.append(Boss()) for t in
threads: t.start() for t in
threads: t.join()
?
?七、信號量(Semaphore)
信號量用來控制線程并發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小于0,當計數器為 0時,acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。(類似于停車位的概念)
BoundedSemaphore與Semaphore的唯一區別在于前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了將拋出一個異常。
import
threading,time
class
myThread(threading.Thread):
def
run(self):
if
semaphore.acquire():
print
(self.name)
time.sleep(3
) semaphore.release() if __name__=="__main__"
: semaphore=threading.Semaphore(5
) thrs=
[] for i in range(100
): thrs.append(myThread()) for t in
thrs: t.start() ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
八、多線程利器-----隊列(queue)
?
1、列表是不安全的數據結構
import
threading,time
li=[1,2,3,4,5
]
def
pri():
while
li:
a=li[-1
]
print
(a) time.sleep(1
) try
: li.remove(a) except
Exception as e: print('----'
,a,e) t1=threading.Thread(target=pri,args=
()) t1.start() t2=threading.Thread(target=pri,args=
()) t2.start()
思考:如何通過對列來完成上述功能?
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
2、queue隊列類的方法
創建一個“隊列”對象
import
Queue
q = Queue.Queue(maxsize = 10
)
Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小于1就表示隊列長度無限。
將一個值放入隊列中
q.put(10
)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為
1
。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。
將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除并返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,
get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
Python Queue模塊有三種隊列及構造函數:
1、Python Queue模塊的FIFO隊列先進先出。 class
queue.Queue(maxsize) 2、LIFO類似于堆,即先進后出。 class
queue.LifoQueue(maxsize) 3、還有一種是優先級隊列級別越低越先出來。 class
queue.PriorityQueue(maxsize) 此包中的常用方法(q =
Queue.Queue()): q.qsize() 返回隊列的大小 q.empty() 如果隊列為空,返回True,反之False q.full() 如果隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 相當q.get(False) 非阻塞 q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 相當q.put(item, False) q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味著等到隊列為空,再執行別的操作
3、other mode:
import
queue
#先進后出
q=
queue.LifoQueue()
q.put(34
)
q.put(56
)
q.put(12
) #優先級 # q=queue.PriorityQueue() # q.put([5,100]) # q.put([7,200]) # q.put([3,"zhurui"]) # q.put([4,{"name":"simon"}]) while 1
: data=
q.get() print(data)
生產者消費者模型:
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前臺,而客戶去飯菜也不需要不找廚師,直接去前臺領取即可,這也是一個結耦的過程。
import
time,random
import
queue,threading
q =
queue.Queue()
def
Producer(name):
count =
0
while count <10
: print("making........"
) time.sleep(random.randrange(3
)) q.put(count) print('Producer %s has produced %s baozi..' %
(name, count)) count +=1 #q.task_done() #q.join() print("ok......"
) def
Consumer(name): count =
0 while count <10
: time.sleep(random.randrange(4
)) if not
q.empty(): data =
q.get() #q.task_done() #q.join() print
(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %
(name, data)) else
: print("-----no baozi anymore----"
) count +=1
p1 = threading.Thread(target=Producer, args=('A'
,)) c1 = threading.Thread(target=Consumer, args=('B'
,)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',))
p1.start() c1.start() # c2.start() # c3.start()
?
九、多進程模塊 multiprocessing
由于GIL的存在,python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。
multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境
1、進程的調用
調用方式1
from multiprocessing import
Process
import
time
def
f(name):
time.sleep(1
)
print('hello'
, name,time.ctime())
if __name__ == '__main__'
: p_list=
[] for i in range(3
): p = Process(target=f, args=('alvin'
,)) p_list.append(p) p.start() for i in
p_list: p.join() print('end')
調用方式2
from multiprocessing import
Process
import
time
class
MyProcess(Process):
def __init__
(self):
super(MyProcess, self).__init__
()
#self.name = name
def
run(self): time.sleep(1
) print ('hello'
, self.name,time.ctime()) if __name__ == '__main__'
: p_list=
[] for i in range(3
): p =
MyProcess() p.start() p_list.append(p) for p in
p_list: p.join() print('end')
例子3:
from multiprocessing import
Process
import
os
import
time
def
info(title):
print("title:"
,title)
print('parent process:'
, os.getppid()) print('process id:'
, os.getpid()) def
f(name): info('function f'
) print('hello'
, name) if __name__ == '__main__'
: info('main process line'
) time.sleep(1
) print("------------------"
) p = Process(target=info, args=('yuan'
,)) p.start() p.join()
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
