一、進程和線程
進程
假如有兩個程序A和B,程序A在執(zhí)行到一半的過程中,需要讀取大量的數(shù)據(jù)輸入(I/
O操作),
而此時CPU只能靜靜地等待任務(wù)A讀取完數(shù)據(jù)才能繼續(xù)執(zhí)行,這樣就白白浪費了CPU資源。
是不是在程序A讀取數(shù)據(jù)的過程中,讓程序B去執(zhí)行,當程序A讀取完數(shù)據(jù)之后,讓
程序B暫停,然后讓程序A繼續(xù)執(zhí)行?
當然沒問題,但這里有一個關(guān)鍵詞:切換
既然是切換,那么這就涉及到了狀態(tài)的保存,狀態(tài)的恢復(fù),加上程序A與程序B所需要的系統(tǒng)資
源(內(nèi)存,硬盤,鍵盤等等)是不一樣的。自然而然的就需要有一個東西去記錄程序A和程序B
分別需要什么資源,怎樣去識別程序A和程序B等等,所以就有了一個叫進程的抽象
進程定義:
進程就是一個程序在一個數(shù)據(jù)集上的一次動態(tài)執(zhí)行過程。
進程一般由程序、數(shù)據(jù)集、進程控制塊三部分組成。
我們編寫的程序用來描述進程要完成哪些功能以及如何完成;
數(shù)據(jù)集則是程序在執(zhí)行過程中所需要使用的資源;
進程控制塊用來記錄進程的外部特征,描述進程的執(zhí)行變化過程,系統(tǒng)可以利用它來控制和管理進程,它是系
統(tǒng)感知進程存在的唯一標志。
舉一例說明進程:
想象一位有一手好廚藝的計算機科學(xué)家正在為他的女兒烘制生日蛋糕。他有做生日蛋糕的食譜,廚房里有所需
的原料:面粉、雞蛋、糖、香草汁等。在這個比喻中,做蛋糕的食譜就是程序(即用適當形式描述的算法)計算機科學(xué)家就是處理器(cpu),
而做蛋糕的各種原料就是輸入數(shù)據(jù)。進程就是廚師閱讀食譜、取來各種原料以及烘制蛋糕等一系列動作的總和。
現(xiàn)在假設(shè)計算機科學(xué)家的兒子哭著跑了進來,說他的頭被一只蜜蜂蟄了。計算機科學(xué)家就記錄下他
照著食譜做到哪兒了(保存進程的當前狀態(tài)),然后拿出一本急救手冊,按照其中的指示處理蟄傷。這
里,我們看到處理機從一個進程(做蛋糕)切換到另一個高優(yōu)先級的進程(實施醫(yī)療救治),每個進程
擁有各自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完之后,這位計算機科學(xué)家又回來做蛋糕,從他
離開時的那一步繼續(xù)做下去。
線程
線程的出現(xiàn)是為了降低上下文切換的消耗,提高系統(tǒng)的并發(fā)性,并突破一個進程只能干一樣事的缺陷,
使到進程內(nèi)并發(fā)成為可能。
假設(shè),一個文本程序,需要接受鍵盤輸入,將內(nèi)容顯示在屏幕上,還需要保存信息到硬盤中。若只有
一個進程,勢必造成同一時間只能干一樣事的尷尬(當保存時,就不能通過鍵盤輸入內(nèi)容)。若有多
個進程,每個進程負責一個任務(wù),進程A負責接收鍵盤輸入的任務(wù),進程B負責將內(nèi)容顯示在屏幕上的
任務(wù),進程C負責保存內(nèi)容到硬盤中的任務(wù)。這里進程A,B,C間的協(xié)作涉及到了進程通信問題,而且
有共同都需要擁有的東西-------
文本內(nèi)容,不停的切換造成性能上的損失。若有一種機制,可以使
任務(wù)A,B,C共享資源,這樣上下文切換所需要保存和恢復(fù)的內(nèi)容就少了,同時又可以減少通信所帶
來的性能損耗,那就好了。是的,這種機制就是線程。
線程也叫輕量級進程,它是一個基本的CPU執(zhí)行單元,也是程序執(zhí)行過程中的最小單元,由線程ID、程序
計數(shù)器、寄存器集合和堆棧共同組成。線程的引入減小了程序并發(fā)執(zhí)行時的開銷,提高了操作系統(tǒng)的并發(fā)
性能。線程沒有自己的系統(tǒng)資源。
線程進程的關(guān)系區(qū)別:
1、一個程序至少有一個進程,一個進程至少有一個線程.(進程可以理解成線程的容器)
2、進程在執(zhí)行過程中擁有獨立的內(nèi)存單元,而多個線程共享內(nèi)存,從而極大地提高了程序的運行效率。
3、線程在執(zhí)行過程中與進程還是有區(qū)別的。每個獨立的線程有一個程序運行的入口、順序執(zhí)行序列和
程序的出口。但是線程不能夠獨立執(zhí)行,必須依存在應(yīng)用程序中,由應(yīng)用程序提供多個線程執(zhí)行控制。?
4、進程是具有一定獨立功能的程序關(guān)于某個數(shù)據(jù)集合上的一次運行活動,進程是系統(tǒng)進行資源分配和調(diào)
度的一個獨立單位.?
線程是進程的一個實體,是CPU調(diào)度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程
自己基本上不擁有系統(tǒng)資源,只擁有一點在運行中必不可少的資源(如程序計數(shù)器,一組寄存器和棧)但是
它可與同屬一個進程的其他的線程共享進程所擁有的全部資源.?
一個線程可以創(chuàng)建和撤銷另一個線程;同一個進程中的多個線程之間可以并發(fā)執(zhí)行.
python的GIL
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-
safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執(zhí)行的時候會淡定的在同一時刻只允許一個線程運行
?
二、python的線程與threading模塊
1、線程的兩種調(diào)用方式
threading 模塊建立在thread模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊通過對thread進行二次封裝,提供了更方便的api來處理線程。
?
直接調(diào)用:
import
threading
import
time
def sayhi(num): #定義每個線程要運行的函數(shù)
print("running on number:%s" %
num)
time.sleep(3
)
if __name__ == '__main__'
:
t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例
t1.start() #啟動線程 t2.start() #啟動另一個線程 print(t1.getName()) #獲取線程名 print(t2.getName())
?
繼承式調(diào)用:
import
threading
import
time
class
MyThread(threading.Thread):
def __init__
(self,num):
threading.Thread.__init__
(self)
self.num =
num def run(self):#定義每個線程要運行的函數(shù) print("running on number:%s" %
self.num) time.sleep(3
) if __name__ == '__main__'
: t1 = MyThread(1
) t2 = MyThread(2
) t1.start() t2.start() print("ending......")? ? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
三、threading.thread的實例方法
join&Daemon方法
import
threading
from time import
ctime,sleep
import
time
def
ListenMusic(name):
print ("Begin listening to %s. %s" %
(name,ctime()))
sleep(3
) print("end listening %s"%
ctime()) def
RecordBlog(title): print ("Begin recording the %s! %s" %
(title,ctime())) sleep(5
) print('end recording %s'%
ctime()) threads =
[] t1 = threading.Thread(target=ListenMusic,args=('殺手'
,)) t2 = threading.Thread(target=RecordBlog,args=('python線程'
,)) threads.append(t1) threads.append(t2) if __name__ == '__main__'
: for t in
threads: #t.setDaemon(True) #注意:一定在start之前設(shè)置
t.start() # t.join() # t1.join()
t1.setDaemon(True) #t2.join()########考慮這三種join位置下的結(jié)果? print ("all over %s" %ctime())
join():在子線程完成運行之前,這個子線程的父線程將一直被阻塞。
setDaemon(True):
將線程聲明為守護線程,必須在start() 方法調(diào)用之前設(shè)置, 如果不設(shè)置為守護線程程序會被無限掛起。這個方法基本和join是相反的。
當我們 在程序運行中,執(zhí)行一個主線程,如果主線程又創(chuàng)建一個子線程,主線程和子線程 就分兵兩路,分別運行,那么當主線程完成
想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是 只要主線程
完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦
其他方法:
?
# run(): 線程被cpu調(diào)度后自動執(zhí)行線程對象的run方法
# start():啟動線程活動。
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設(shè)置線程名。
threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結(jié)束前,不包括啟動前和終止后的線程。
# threading.activeCount(): 返回正在運行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。
四、同步鎖(Lock)
import
threading
import
time
def
addNum():
global num #在每個線程中都獲取這個全局變量
#num-=1
temp=
num
#print('--get num:',num )
time.sleep(0.1
) num =temp-1 #對此公共變量進行-1操作
num = 1000 #設(shè)定一個共享變量 thread_list =
[] for i in range(1000
): t = threading.Thread(target=
addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執(zhí)行完畢
t.join() print('final num:', num )
觀察:time.sleep(0.1)? /0.001/0.0000001 結(jié)果分別是多少?
多個線程都在同時操作同一個共享資源,所以造成了資源破壞,怎么辦呢?(join會造成串行,失去所線程的意義)
我們可以通過同步鎖來解決這種問題
R=
threading.Lock()
####
def
sub():
global
num
R.acquire()
temp=num-1
time.sleep(0.1
)
num=
temp R.release()
五、遞歸鎖
在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源并且同時等待對方的資源,就會造成死鎖,因為系統(tǒng)判斷這部分資源都正在使用,所以這兩個線程在無外力作用下將一直等待下去。死鎖的例子
import
threading,time
class
myThread(threading.Thread):
def
doA(self):
lockA.acquire()
print(self.name,"gotlockA"
,time.ctime())
time.sleep(3
)
lockB.acquire()
print(self.name,"gotlockB"
,time.ctime()) lockB.release() lockA.release() def
doB(self): lockB.acquire() print(self.name,"gotlockB"
,time.ctime()) time.sleep(2
) lockA.acquire() print(self.name,"gotlockA"
,time.ctime()) lockA.release() lockB.release() def
run(self): self.doA() self.doB() if __name__=="__main__"
: lockA=
threading.Lock() lockB=
threading.Lock() threads=
[] for i in range(5
): threads.append(myThread()) for t in
threads: t.start() for t in
threads: t.join()#等待線程結(jié)束,后面再講。
解決辦法:使用遞歸鎖,將
lockA=
threading.Lock()
lockB=threading.Lock()
#--------------
lock=threading.RLock()
為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內(nèi)部維護著一個Lock和一個counter變量,counter記錄了acquire的次數(shù),從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
應(yīng)用
import
time
import
threading
class
Account:
def __init__
(self, _id, balance):
self.id =
_id
self.balance =
balance self.lock =
threading.RLock() def
withdraw(self, amount): with self.lock: self.balance -=
amount def
deposit(self, amount): with self.lock: self.balance +=
amount def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的場景
with self.lock: interest=0.05
count=amount+amount*
interest self.withdraw(count) def
transfer(_from, to, amount): #鎖不可以加在這里 因為其他的其它線程執(zhí)行的其它方法在不加鎖的情況下數(shù)據(jù)同樣是不安全的
_from.withdraw(amount) to.deposit(amount) alex = Account('alex',1000
) yuan = Account('yuan',1000
) t1=threading.Thread(target = transfer, args = (alex,yuan, 100
)) t1.start() t2=threading.Thread(target = transfer, args = (yuan,alex, 200
)) t2.start() t1.join() t2.join() print('>>>'
,alex.balance) print('>>>',yuan.balance)
六、同步條件(Event)
import
threading,time
class
Boss(threading.Thread):
def
run(self):
print("BOSS:今晚大家都要加班到23:00。"
)
print
(event.isSet())
event.set()
time.sleep(5
) print("BOSS:<23:00>可以下班了。"
) print
(event.isSet()) event.set() class
Worker(threading.Thread): def
run(self): event.wait() print("Worker:哎……苦逼!"
) time.sleep(1
) event.clear() event.wait() print("Worker:OhYeah!"
) if __name__=="__main__"
: event=
threading.Event() threads=
[] for i in range(5
): threads.append(Worker()) threads.append(Boss()) for t in
threads: t.start() for t in
threads: t.join()
?
?七、信號量(Semaphore)
信號量用來控制線程并發(fā)數(shù)的,BoundedSemaphore或Semaphore管理一個內(nèi)置的計數(shù) 器,每當調(diào)用acquire()時-1,調(diào)用release()時+1。
計數(shù)器不能小于0,當計數(shù)器為 0時,acquire()將阻塞線程至同步鎖定狀態(tài),直到其他線程調(diào)用release()。(類似于停車位的概念)
BoundedSemaphore與Semaphore的唯一區(qū)別在于前者將在調(diào)用release()時檢查計數(shù) 器的值是否超過了計數(shù)器的初始值,如果超過了將拋出一個異常。
import
threading,time
class
myThread(threading.Thread):
def
run(self):
if
semaphore.acquire():
print
(self.name)
time.sleep(3
) semaphore.release() if __name__=="__main__"
: semaphore=threading.Semaphore(5
) thrs=
[] for i in range(100
): thrs.append(myThread()) for t in
thrs: t.start() ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
八、多線程利器-----隊列(queue)
?
1、列表是不安全的數(shù)據(jù)結(jié)構(gòu)
import
threading,time
li=[1,2,3,4,5
]
def
pri():
while
li:
a=li[-1
]
print
(a) time.sleep(1
) try
: li.remove(a) except
Exception as e: print('----'
,a,e) t1=threading.Thread(target=pri,args=
()) t1.start() t2=threading.Thread(target=pri,args=
()) t2.start()
思考:如何通過對列來完成上述功能?
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
2、queue隊列類的方法
創(chuàng)建一個“隊列”對象
import
Queue
q = Queue.Queue(maxsize = 10
)
Queue.Queue類即是一個隊列的同步實現(xiàn)。隊列長度可為無限或者有限。可通過Queue的構(gòu)造函數(shù)的可選參數(shù)maxsize來設(shè)定隊列長度。如果maxsize小于1就表示隊列長度無限。
將一個值放入隊列中
q.put(10
)
調(diào)用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數(shù),第一個item為必需的,為插入項目的值;第二個block為可選參數(shù),默認為
1
。如果隊列當前為空且block為1,put()方法就使調(diào)用線程暫停,直到空出一個數(shù)據(jù)單元。如果block為0,put方法將引發(fā)Full異常。
將一個值從隊列中取出
q.get()
調(diào)用隊列對象的get()方法從隊頭刪除并返回一個項目。可選參數(shù)為block,默認為True。如果隊列為空且block為True,
get()就使調(diào)用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發(fā)Empty異常。
Python Queue模塊有三種隊列及構(gòu)造函數(shù):
1、Python Queue模塊的FIFO隊列先進先出。 class
queue.Queue(maxsize) 2、LIFO類似于堆,即先進后出。 class
queue.LifoQueue(maxsize) 3、還有一種是優(yōu)先級隊列級別越低越先出來。 class
queue.PriorityQueue(maxsize) 此包中的常用方法(q =
Queue.Queue()): q.qsize() 返回隊列的大小 q.empty() 如果隊列為空,返回True,反之False q.full() 如果隊列滿了,返回True,反之False q.full 與 maxsize 大小對應(yīng) q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 相當q.get(False) 非阻塞 q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 相當q.put(item, False) q.task_done() 在完成一項工作之后,q.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊列發(fā)送一個信號 q.join() 實際上意味著等到隊列為空,再執(zhí)行別的操作
3、other mode:
import
queue
#先進后出
q=
queue.LifoQueue()
q.put(34
)
q.put(56
)
q.put(12
) #優(yōu)先級 # q=queue.PriorityQueue() # q.put([5,100]) # q.put([7,200]) # q.put([3,"zhurui"]) # q.put([4,{"name":"simon"}]) while 1
: data=
q.get() print(data)
生產(chǎn)者消費者模型:
為什么要使用生產(chǎn)者和消費者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。
什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前臺,而客戶去飯菜也不需要不找廚師,直接去前臺領(lǐng)取即可,這也是一個結(jié)耦的過程。
import
time,random
import
queue,threading
q =
queue.Queue()
def
Producer(name):
count =
0
while count <10
: print("making........"
) time.sleep(random.randrange(3
)) q.put(count) print('Producer %s has produced %s baozi..' %
(name, count)) count +=1 #q.task_done() #q.join() print("ok......"
) def
Consumer(name): count =
0 while count <10
: time.sleep(random.randrange(4
)) if not
q.empty(): data =
q.get() #q.task_done() #q.join() print
(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %
(name, data)) else
: print("-----no baozi anymore----"
) count +=1
p1 = threading.Thread(target=Producer, args=('A'
,)) c1 = threading.Thread(target=Consumer, args=('B'
,)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',))
p1.start() c1.start() # c2.start() # c3.start()
?
九、多進程模塊 multiprocessing
由于GIL的存在,python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。
multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創(chuàng)建一個進程。該進程可以運行在Python程序內(nèi)部編寫的函數(shù)。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數(shù)傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境
1、進程的調(diào)用
調(diào)用方式1
from multiprocessing import
Process
import
time
def
f(name):
time.sleep(1
)
print('hello'
, name,time.ctime())
if __name__ == '__main__'
: p_list=
[] for i in range(3
): p = Process(target=f, args=('alvin'
,)) p_list.append(p) p.start() for i in
p_list: p.join() print('end')
調(diào)用方式2
from multiprocessing import
Process
import
time
class
MyProcess(Process):
def __init__
(self):
super(MyProcess, self).__init__
()
#self.name = name
def
run(self): time.sleep(1
) print ('hello'
, self.name,time.ctime()) if __name__ == '__main__'
: p_list=
[] for i in range(3
): p =
MyProcess() p.start() p_list.append(p) for p in
p_list: p.join() print('end')
例子3:
from multiprocessing import
Process
import
os
import
time
def
info(title):
print("title:"
,title)
print('parent process:'
, os.getppid()) print('process id:'
, os.getpid()) def
f(name): info('function f'
) print('hello'
, name) if __name__ == '__main__'
: info('main process line'
) time.sleep(1
) print("------------------"
) p = Process(target=info, args=('yuan'
,)) p.start() p.join()
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

