Python3快速入門(九)——Python3并發(fā)編程

一、Python線程模塊

1、線程簡介

一個(gè)標(biāo)準(zhǔn)的線程由線程ID,當(dāng)前指令指針(PC),寄存器集合和堆棧組成。線程是進(jìn)程中的一個(gè)實(shí)體,是被系統(tǒng)獨(dú)立調(diào)度和分派的基本單位,線程本身不擁有系統(tǒng)資源,與進(jìn)程內(nèi)的其它線程共享進(jìn)程的所有資源。一個(gè)進(jìn)程中至少有一個(gè)線程,并作為程序的入口,即主線程,其它線程稱為工作線程。
???? 多線程,是指從軟件或者硬件上實(shí)現(xiàn)多個(gè)線程并發(fā)執(zhí)行的技術(shù)。支持多線程能力的計(jì)算機(jī)因有硬件支持而能夠在同一時(shí)間執(zhí)行多個(gè)線程,進(jìn)而提升整體處理性能。

2、線程狀態(tài)

線程有就緒、阻塞、運(yùn)行三種基本狀態(tài)。就緒狀態(tài)是指線程具備運(yùn)行的所有條件,在等待CPU執(zhí)行;?運(yùn)行狀態(tài)是指線程占有CPU正在運(yùn)行;?阻塞狀態(tài)是指線程在等待一個(gè)事件,邏輯上不可執(zhí)行。
三種狀態(tài)的相互轉(zhuǎn)化如下圖所示:
Python3快速入門(九)——Python3并發(fā)編程_第1張圖片

2、threading線程模塊

Python3 通過 _thread 和 threading兩個(gè)模塊提供對(duì)線程的支持。
_thread 提供了低級(jí)別的、原始的線程以及簡單鎖,相比于 threading 模塊的功能比較有限,是對(duì)已經(jīng)廢棄的thread模塊的兼容性支持方案。
threading 模塊除了包含 _thread 模塊中的所有方法外,還提供的如下方法:
threading.currentThread(): 返回當(dāng)前的線程變量。
threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。
threading.activeCount(): 返回正在運(yùn)行線程的數(shù)量,與len(threading.enumerate())有相同的結(jié)果。
Thread類提供方法如下:
run():?用以表示線程活動(dòng)的方法。
start():啟動(dòng)線程活動(dòng)。
join([time]):?等待至線程中止。阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時(shí)發(fā)生。
isAlive():?返回線程是否活動(dòng)的。
getName():?返回線程名。
setName():?設(shè)置線程名。

3、multiprocessing模塊

multiprocessing模塊是跨平臺(tái)版本的多進(jìn)程模塊,提供了一個(gè)Process類代表一個(gè)進(jìn)程對(duì)象。創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個(gè)Process實(shí)例。
Process(self,group=None,target=None,name=None,args=(),kwargs=())
group參數(shù)未使用,值始終為None。
target表示調(diào)用的對(duì)象,子進(jìn)程要執(zhí)行的任務(wù)。
name可以為子進(jìn)程命名。
args指定傳結(jié)target函數(shù)的位置參數(shù),是一個(gè)元組形式,必須有逗號(hào),如:args=(‘monicx’,)
kwargs指定傳結(jié)target函數(shù)的關(guān)鍵字參數(shù),是一個(gè)字典,如kwargs={‘name’:‘monicx’,‘a(chǎn)ge’:18}
Process方法如下:
start():啟動(dòng)進(jìn)程,并調(diào)用子進(jìn)程的run()方法。
run():進(jìn)程啟動(dòng)進(jìn)運(yùn)行的方法,在run內(nèi)調(diào)用target指定的函數(shù),子進(jìn)程類中一定要實(shí)現(xiàn)run方法。
terminate():強(qiáng)制終止進(jìn)程,不會(huì)進(jìn)行任何清理操作,如果進(jìn)程創(chuàng)建了子進(jìn)程,子進(jìn)程會(huì)變成僵尸進(jìn)程;如果進(jìn)程還保存了一個(gè)鎖,則不會(huì)釋放進(jìn)程鎖,進(jìn)而導(dǎo)致死鎖。
is_alive():判斷進(jìn)程是否是“活著”的狀態(tài)。
join(timeout):讓主進(jìn)程程等待某一子進(jìn)程結(jié)束,才繼續(xù)執(zhí)行主進(jìn)程。timeout是可選的超時(shí)時(shí)間,超過一個(gè)時(shí)間主進(jìn)程就不等待。

4、全局解釋鎖GIL

Python并不支持真正意義上的多線程。Python中提供了多線程模塊,但如果想通過多線程提高代碼的速度,并不推薦使用多線程模塊。Python中有一個(gè)全局鎖Global Interpreter Lock(GIL),全局鎖會(huì)確保任何時(shí)候多個(gè)線程中只有一個(gè)會(huì)被執(zhí)行。線程的執(zhí)行速度非??欤瑫?huì)誤以為線程是并行執(zhí)行的,但實(shí)際上都是輪流執(zhí)行。經(jīng)過GIL處理后,會(huì)增加線程執(zhí)行的開銷。
全局鎖 GIL(Global interpreter lock) 并不是 Python 的特性,而是在實(shí)現(xiàn) Python 解析器(CPython)時(shí)所引入的一個(gè)概念。Python有CPython,PyPy,Psyco 等不同的 Python 執(zhí)行環(huán)境,其中 JPython 沒有GIL。CPython 是大部分環(huán)境下默認(rèn)的 Python 執(zhí)行環(huán)境,GIL 并不是 Python 的特性,Python 完全可以不依賴于 GIL。
GIL 限制了同一時(shí)刻只能有一個(gè)線程運(yùn)行,無法發(fā)揮多核 CPU 的優(yōu)勢(shì)。GIL 本質(zhì)是互斥鎖,都是將并發(fā)運(yùn)行變成串行,以此來控制同一時(shí)間內(nèi)共享數(shù)據(jù)只能被一個(gè)任務(wù)所修改,進(jìn)而保證數(shù)據(jù)安全。在一個(gè) Python 的進(jìn)程內(nèi),不僅有主線程或者由主線程開啟的其它線程,還有解釋器開啟的垃圾回收等解釋器級(jí)別的線程。進(jìn)程內(nèi),所有數(shù)據(jù)都是共享的,代碼作為一種數(shù)據(jù)也會(huì)被所有線程共享,多個(gè)線程先訪問到解釋器的代碼,即拿到執(zhí)行權(quán)限,然后將 target 的代碼交給解釋器的代碼去執(zhí)行,解釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問到解釋器的代碼而去執(zhí)行,因此為了保證數(shù)據(jù)安全需要加鎖處理,即 GIL。
由于GIL 的存在,同一時(shí)刻同一進(jìn)程中只有一個(gè)線程被執(zhí)行。多核 CPU可以并行完成計(jì)算,因此多核可以提升計(jì)算性能,但 CPU 一旦遇到 I/O 阻塞,仍然需要等待,所以多核CPU對(duì) I/O 密集型任務(wù)提升不明顯。根據(jù)執(zhí)行任務(wù)是計(jì)算密集型還是I/O 密集型,不同場(chǎng)景使用不同的方法,對(duì)于計(jì)算密集型任務(wù),多進(jìn)程占優(yōu)勢(shì),對(duì)于 I/O 密集型任務(wù),多線程占優(yōu)勢(shì)。
計(jì)算密集型任務(wù)-多進(jìn)程方案:

            
              # -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

def work():
    result = 0
    for x in range(100000000):
        result *= x

if __name__ == "__main__":
    processes = []
    print("CPU: ", os.cpu_count())
    start = time.time()
    for i in range(4):
        p = Process(target=work)
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    end = time.time()
    print("計(jì)算密集型任務(wù),多進(jìn)程耗時(shí) %s" % (end - start))

# output:
# CPU:  4
# 計(jì)算密集型任務(wù),多進(jìn)程耗時(shí) 9.485123872756958
            
          

計(jì)算密集型任務(wù)-多線程方案:

            
              # -*- coding:utf-8 -*-
from threading import Thread
import os, time

def work():
    res = 0
    for x in range(100000000):
        res *= x

if __name__ == "__main__":
    threads = []
    print("CPU: ",os.cpu_count())
    start = time.time()
    for i in range(4):
        thread = Thread(target=work)  # 多進(jìn)程
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()
    end = time.time()
    print("計(jì)算密集型任務(wù),多線程耗時(shí) %s" % (end - start))

# output:
# CPU:  4
# 計(jì)算密集型任務(wù),多線程耗時(shí) 18.434288501739502
            
          

IO密集型任務(wù)-多進(jìn)程方案:

            
              # -*- coding:utf-8 -*-
from multiprocessing import Process
import os, time

def work():
    time.sleep(2)
    print("hello,Python----------------------------------------------------", file=open("tmp.txt", "w"))

if __name__ == "__main__":
    processes = []
    print("CPU: ", os.cpu_count())
    start = time.time()
    for i in range(400):
        p = Process(target=work)  # 多進(jìn)程
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    stop = time.time()
    print("I/0密集型任務(wù),多進(jìn)程耗時(shí) %s" % (stop - start))

# output:
# CPU:  4
# I/0密集型任務(wù),多進(jìn)程耗時(shí) 2.8894519805908203
            
          

IO密集型任務(wù)-多線程方案:

            
              # -*- coding:utf-8 -*-
from threading import Thread
import os, time

def work():
    time.sleep(2)
    print("hello,Python----------------------------------------------------", file=open("tmp.txt", "w"))

if __name__ == "__main__":
    threads = []
    print("CPU: ", os.cpu_count())
    start = time.time()

    for x in range(400):
        thread = Thread(target=work)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()
    end = time.time()
    print("IO密集型任務(wù),多線程耗時(shí) %s" % (end - start))

# output:
# CPU:  4
# IO密集型任務(wù),多線程耗時(shí) 2.044438362121582
            
          

二、創(chuàng)建線程

1、threading.Thread實(shí)例化

threading.Thread構(gòu)造函數(shù)如下:

            
              def __init__(self, group=None, target=None, name=None,
             args=(), kwargs=None, *, daemon=None):
            
          

創(chuàng)建 threading.Thread 實(shí)例,調(diào)用其 start() 方法。

            
              # -*- coding:utf-8 -*-
import time
import threading

def work_task(counter):
    print("%s %s" % (threading.current_thread().name, time.ctime(time.time())))
    n = counter;
    while n > 0:
        time.sleep(1)
        n -= 1

if __name__ == "__main__":
    print("main thread start:", time.strftime("%Y-%m-%d %H:%M:%S"))

    threads = []
    for x in range(10):
        thread = threading.Thread(target=work_task, args=(x, ))
        threads.append(thread)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("main thread end:", time.strftime("%Y-%m-%d %H:%M:%S"))

# output:
# main thread start: 2019-07-03 21:49:58
# Thread-1 Wed Jul  3 21:49:58 2019
# Thread-2 Wed Jul  3 21:49:58 2019
# Thread-3 Wed Jul  3 21:49:58 2019
# Thread-4 Wed Jul  3 21:49:58 2019
# Thread-5 Wed Jul  3 21:49:58 2019
# Thread-6 Wed Jul  3 21:49:58 2019
# Thread-7 Wed Jul  3 21:49:58 2019
# Thread-8 Wed Jul  3 21:49:58 2019
# Thread-9 Wed Jul  3 21:49:58 2019
# Thread-10 Wed Jul  3 21:49:58 2019
# main thread end: 2019-07-03 21:50:07
            
          

2、threading.Thread子線程

可以通過直接從 threading.Thread類繼承創(chuàng)建一個(gè)新的子類,在子類中重寫 run() 和 init() 方法,實(shí)例化后調(diào)用 start() 方法啟動(dòng)新線程,start函數(shù)內(nèi)部會(huì)調(diào)用線程的 run() 方法。

            
              # -*- coding:utf-8 -*-
import threading
import time

class WorkThread(threading.Thread):
    def __init__(self, thread_id, name):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name

    def run(self):
        print("start thread: ", self.name)
        work(self.name, self.thread_id)
        print("end thread: ", self.name)

def work(thread_name, thread_id):
    print("%s %s %s" % (thread_name, thread_id, time.ctime(time.time())))
    i = 0;
    while i < 2:
        i += 1
        time.sleep(1)

if __name__ == '__main__':
    thread1 = WorkThread(1, "Thread1")
    thread2 = WorkThread(2, "Thread2")

    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

    print("exit main thread")

# output:
# start thread:  Thread1
# Thread1 1 Tue Jul  2 20:39:42 2019
# start thread:  Thread2
# Thread2 2 Tue Jul  2 20:39:42 2019
# end thread:  end thread: Thread1
#  Thread2
# exit main thread
            
          

如果需要從外部傳入函數(shù),可以將傳入?yún)?shù)作為子線程實(shí)例屬性,在run實(shí)例方法內(nèi)進(jìn)行調(diào)用。
self.target = target
self.args = args

            
              # -*- coding:utf-8 -*-
import threading
import time

class WorkThread(threading.Thread):
    def __init__(self, target, args):
        threading.Thread.__init__(self)
        self.target = target
        self.args = args

    def run(self):
        print("start thread: ", self.name)
        self.target(*self.args)
        print("end thread: ", self.name)

def work_task(counter):
    time.sleep(1)
    print("%s %s" % (threading.currentThread().name, time.ctime(time.time())))
    i = counter;
    while i > 0:
        i -= 1

if __name__ == '__main__':
    print("main thread start:", time.strftime("%Y-%m-%d %H:%M:%S"))

    threads = []
    for x in range(10):
        thread = threading.Thread(target=work_task, args=(x,))
        threads.append(thread)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("main thread end:", time.strftime("%Y-%m-%d %H:%M:%S"))

# output:
# main thread start: 2019-07-03 22:02:32
# Thread-1 Wed Jul  3 22:02:33 2019Thread-5 Wed Jul  3 22:02:33 2019
# Thread-2 Wed Jul  3 22:02:33 2019
# Thread-3 Wed Jul  3 22:02:33 2019
# Thread-4 Wed Jul  3 22:02:33 2019
#
# Thread-7 Wed Jul  3 22:02:33 2019Thread-6 Wed Jul  3 22:02:33 2019
# Thread-10 Wed Jul  3 22:02:33 2019
# Thread-8 Wed Jul  3 22:02:33 2019
#
# Thread-9 Wed Jul  3 22:02:33 2019
# main thread end: 2019-07-03 22:02:33
            
          

3、start與run

            
              import threading
import time

def work_task(counter):
    n = counter
    while n > 0:
        n -= 1
        print("thread name: %s, id: %s" % (threading.currentThread().name, threading.currentThread().ident))

if __name__ == "__main__":
    print("main thread start")
    thread1 = threading.Thread(target=work_task, args=(5,))
    thread2 = threading.Thread(target=work_task, args=(5,))
    thread1.start()
    thread2.start()

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1, id: 139926959064832thread name: Thread-2, id: 139926880384768main thread end
#
#
# thread name: Thread-1, id: 139926959064832
# thread name: Thread-2, id: 139926880384768thread name: Thread-1, id: 139926959064832
#
# thread name: Thread-1, id: 139926959064832
# thread name: Thread-2, id: 139926880384768thread name: Thread-1, id: 139926959064832
#
# thread name: Thread-2, id: 139926880384768
# thread name: Thread-2, id: 139926880384768
            
          

使用start()方法啟動(dòng)了兩個(gè)新的子線程并交替運(yùn)行,每個(gè)子進(jìn)程ID也不同,啟動(dòng)的線程名是定義線程對(duì)象時(shí)設(shè)置的name="xxxx"值,如果沒有設(shè)置name參數(shù)值,則會(huì)打印系統(tǒng)分配的Thread-x名稱。

            
              import threading
import time

def work_task(counter):
    n = counter
    while n > 0:
        n -= 1
        print("thread name: %s, id: %s" % (threading.currentThread().name, threading.currentThread().ident))

if __name__ == "__main__":
    print("main thread start")
    thread1 = threading.Thread(target=work_task, args=(5,))
    thread2 = threading.Thread(target=work_task, args=(5,))
    thread1.run()
    thread2.run()

    print("main thread end")

# output:
# main thread start
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# thread name: MainThread, id: 140683421988672
# main thread end
            
          

用run()方法啟動(dòng)線程,打印的線程名是MainThread,即主線程。兩個(gè)線程都用run()方法啟動(dòng),但卻先運(yùn)行thread1.run(),運(yùn)行完后才按順序運(yùn)行thread2.run(),兩個(gè)線程都工作在主線程,沒有啟動(dòng)新線程,因此,run()方法僅是普通函數(shù)調(diào)用。

4、join方法

當(dāng)一個(gè)進(jìn)程啟動(dòng)后,會(huì)默認(rèn)產(chǎn)生一個(gè)主線程,因?yàn)榫€程是程序執(zhí)行流的最小單元,當(dāng)設(shè)置多線程時(shí),主線程會(huì)創(chuàng)建多個(gè)子線程。在Python中,默認(rèn)情況下主線程執(zhí)行完自己的任務(wù)后,就會(huì)退出,此時(shí)子線程會(huì)繼續(xù)執(zhí)行自己的任務(wù),直到自己的任務(wù)結(jié)束。

            
              import threading
import time

def work_task(counter):
    n = counter
    while n > 0:
        n -= 1
        print("thread name: %s, id: %s" % (threading.currentThread().name, threading.currentThread().ident))

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)
    for thread in threads:
        thread.start()

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1, id: 140306042726144thread name: Thread-2, id: 140306034333440
# thread name: Thread-2, id: 140306034333440
# thread name: Thread-2, id: 140306034333440
# thread name: Thread-2, id: 140306034333440
# thread name: Thread-2, id: 140306034333440
# thread name: Thread-3, id: 140306025940736
#
# thread name: Thread-3, id: 140306025940736
# thread name: Thread-3, id: 140306025940736
# thread name: Thread-1, id: 140306042726144thread name: Thread-3, id: 140306025940736
# thread name: Thread-3, id: 140306025940736
#
# thread name: Thread-1, id: 140306042726144
# thread name: Thread-1, id: 140306042726144
# thread name: Thread-1, id: 140306042726144
# thread name: Thread-4, id: 140306034333440
# thread name: Thread-4, id: 140306034333440
# thread name: Thread-5, id: 140306042726144thread name: Thread-4, id: 140306034333440
# main thread endthread name: Thread-4, id: 140306034333440
# thread name: Thread-4, id: 140306034333440
#
# thread name: Thread-5, id: 140306042726144
#
# thread name: Thread-5, id: 140306042726144
# thread name: Thread-5, id: 140306042726144
# thread name: Thread-5, id: 140306042726144
            
          

當(dāng)使用setDaemon(True)方法,設(shè)置子線程為守護(hù)線程時(shí),主線程一旦執(zhí)行結(jié)束,則全部線程全部被終止執(zhí)行,可能會(huì)出現(xiàn)子線程的任務(wù)還沒有完全執(zhí)行結(jié)束,就被迫停止。設(shè)置setDaemon必須在啟動(dòng)子線程前進(jìn)行設(shè)置。

            
              import threading
import time

def work_task(counter):
    n = counter
    time.sleep(1)
    while n > 0:
        n -= 1
        print("thread name: %s, id: %s" % (threading.currentThread().name, threading.currentThread().ident))

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)
    for thread in threads:
        thread.setDaemon(True)
        thread.start()

    print("main thread end")

# output:
# main thread start
# main thread end
            
          

join方法用于讓主線程等待子線執(zhí)行完并返回結(jié)果后,再執(zhí)行主線程剩下的內(nèi)容,子線程不執(zhí)行完,主線程就一直等待狀態(tài)。

            
              import threading
import time

def work_task(counter):
    n = counter
    time.sleep(1)
    while n > 0:
        n -= 1
        print("thread name: %s" % threading.currentThread().name)

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)
    for thread in threads:
        thread.setDaemon(True)
        thread.start()
        thread.join()

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1
# thread name: Thread-1
# thread name: Thread-1
# thread name: Thread-1
# thread name: Thread-1
# thread name: Thread-2
# thread name: Thread-2
# thread name: Thread-2
# thread name: Thread-2
# thread name: Thread-2
# thread name: Thread-3
# thread name: Thread-3
# thread name: Thread-3
# thread name: Thread-3
# thread name: Thread-3
# thread name: Thread-4
# thread name: Thread-4
# thread name: Thread-4
# thread name: Thread-4
# thread name: Thread-4
# thread name: Thread-5
# thread name: Thread-5
# thread name: Thread-5
# thread name: Thread-5
# thread name: Thread-5
# main thread end
            
          

join有一個(gè)timeout參數(shù),當(dāng)設(shè)置守護(hù)線程時(shí),主線程對(duì)子線程等待timeout時(shí)間,給每個(gè)子線程一個(gè)timeout時(shí)間,讓子線程執(zhí)行,時(shí)間一到,不管任務(wù)有沒有完成,直接殺死。 如果有多個(gè)子線程,全部的等待時(shí)間是每個(gè)子線程timeout的累加和。

            
              import threading
import time

def work_task(counter):
    print("thread name: %s work task start" % threading.currentThread().name)
    n = counter
    time.sleep(4)
    while n > 0:
        n -= 1
    else:
        print("thread name: %s work task end" % threading.currentThread().name)

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)

    for x in range(5):
        threads[x].setDaemon(True)
        threads[x].start()
        threads[x].join(1)

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1 work task start
# thread name: Thread-2 work task start
# thread name: Thread-3 work task start
# thread name: Thread-4 work task start
# thread name: Thread-5 work task start
# thread name: Thread-1 work task end
# main thread end
            
          

沒有設(shè)置守護(hù)線程時(shí),主線程將會(huì)等待timeout的累加和的一段時(shí)間,時(shí)間一到,主線程結(jié)束,但并沒有殺死子線程,子線程依然可以繼續(xù)執(zhí)行,直到子線程全部結(jié)束,程序退出。

            
              import threading
import time

def work_task(counter):
    print("thread name: %s work task start" % threading.currentThread().name)
    n = counter
    time.sleep(4)
    while n > 0:
        n -= 1
    else:
        print("thread name: %s work task end" % threading.currentThread().name)

if __name__ == "__main__":
    print("main thread start")
    threads = []
    for x in range(5):
        thread = threading.Thread(target=work_task, args=(5,))
        threads.append(thread)

    for x in range(5):
        threads[x].start()
        threads[x].join(1)

    print("main thread end")

# output:
# main thread start
# thread name: Thread-1 work task start
# thread name: Thread-2 work task start
# thread name: Thread-3 work task start
# thread name: Thread-4 work task start
# thread name: Thread-5 work task start
# thread name: Thread-1 work task end
# main thread end
# thread name: Thread-2 work task end
# thread name: Thread-3 work task end
# thread name: Thread-4 work task end
# thread name: Thread-5 work task end
            
          

三、線程同步

如果多個(gè)線程共同對(duì)某個(gè)數(shù)據(jù)修改,則可能出現(xiàn)不可預(yù)料的結(jié)果,為了保證數(shù)據(jù)的正確性,需要對(duì)多個(gè)線程進(jìn)行同步。

1、互斥鎖

threading.Thread 類的 Lock 鎖和 Rlock 鎖可以實(shí)現(xiàn)簡單線程同步,Lock 鎖和 Rlock 鎖都有 acquire 方法和 release 方法,每次只允許一個(gè)線程操作的數(shù)據(jù)需要將其操作放到 acquire 和 release 方法之間。

            
              # -*- coding:utf-8 -*-
import threading
import time

class WorkThread(threading.Thread):
    def __init__(self, thread_id, name):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name

    def run(self):
        thread_locker.acquire()
        print("start thread: ", self.name)
        work(self.name, self.thread_id)
        print("end thread: ", self.name)
        thread_locker.release()

def work(thread_name, thread_id):
    print("%s %s %s" % (thread_name, thread_id, time.ctime(time.time())))
    i = 0;
    while i < 2:
        i += 1
        time.sleep(1)

thread_locker = threading.Lock()
threads = []

if __name__ == '__main__':
    thread1 = WorkThread(1, "Thread1")
    thread2 = WorkThread(2, "Thread2")

    thread1.start()
    thread2.start()

    threads.append(thread1)
    threads.append(thread2)

    for t in threads:
        t.join()

    print("exit main thread")

# output:
# start thread:  Thread1
# Thread1 1 Tue Jul  2 20:48:05 2019
# end thread:  Thread1
# start thread:  Thread2
# Thread2 2 Tue Jul  2 20:48:07 2019
# end thread:  Thread2
# exit main thread
            
          

2、信號(hào)量

互斥鎖同時(shí)只允許一個(gè)線程訪問共享數(shù)據(jù),而信號(hào)量同時(shí)允許一定數(shù)量的線程訪問共享數(shù)據(jù),如銀行柜臺(tái)有 5 個(gè)窗口,則允許同時(shí)有 5 個(gè)人辦理業(yè)務(wù),后面的人只能等待前面有人辦完業(yè)務(wù)后才可以進(jìn)入柜臺(tái)辦理。

            
              # -*- coding:utf-8 -*-
import threading
import time

semaphore = threading.BoundedSemaphore(5)
threads = []

def do_work(name):
    semaphore.acquire()
    time.sleep(2)
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {threading.currentThread().name} is carrying on business")
    semaphore.release()

if __name__ == '__main__':
    print("main thread start:", time.strftime("%Y-%m-%d %H:%M:%S"))

    for i in range(10):
        t = threading.Thread(target=do_work, args=(i,))
        threads.append(t)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("main thread end:", time.strftime("%Y-%m-%d %H:%M:%S"))

# output:
# main thread start: 2019-07-03 22:31:06
# 2019-07-03 22:31:08 Thread-1 is carrying on business
# 2019-07-03 22:31:08 Thread-3 is carrying on business
# 2019-07-03 22:31:08 Thread-2 is carrying on business
# 2019-07-03 22:31:08 Thread-4 is carrying on business
# 2019-07-03 22:31:08 Thread-5 is carrying on business
# 2019-07-03 22:31:10 Thread-6 is carrying on business
# 2019-07-03 22:31:10 Thread-7 is carrying on business
# 2019-07-03 22:31:10 Thread-9 is carrying on business
# 2019-07-03 22:31:10 Thread-8 is carrying on business
# 2019-07-03 22:31:10 Thread-10 is carrying on business
# main thread end: 2019-07-03 22:31:10
            
          

3、條件變量

條件變量能讓一個(gè)線程 A 停下來,等待其它線程 B ,線程 B 滿足了某個(gè)條件后通知(notify)線程 A 繼續(xù)運(yùn)行。線程首先獲取一個(gè)條件變量鎖,如果條件不滿足,則線程等待(wait)并釋放條件變量鎖;如果條件滿足則執(zhí)行線程,也可以通知其它狀態(tài)為 wait 的線程。其它處于 wait 狀態(tài)的線程接到通知后會(huì)重新判斷條件。

            
              import threading
import time

class ThreadA(threading.Thread):
    def __init__(self, con, name):
        super(ThreadA, self).__init__()
        self.cond = con
        self.name = name

    def run(self):
        self.cond.acquire()
        print(self.name + ": What can I do for you?")
        self.cond.notify()
        self.cond.wait()
        print(self.name + ": Five yuan.")
        self.cond.notify()
        self.cond.wait()
        print(self.name + ": You are welcome.")
        self.cond.release()

class ThreadB(threading.Thread):
    def __init__(self, con, name):
        super(ThreadB, self).__init__()
        self.cond = con
        self.name = name

    def run(self):
        self.cond.acquire()
        time.sleep(1)
        print(self.name + ": A hot dog, please!")
        self.cond.notify()
        self.cond.wait()
        print(self.name + ": Thanks.")
        self.cond.notify()
        self.cond.release()

if __name__ == "__main__":
    cond = threading.Condition()
    thread1 = ThreadA(cond, "ThreadA")
    thread2 = ThreadB(cond, "ThreadB")
    thread1.start()
    thread2.start()

# output:
# ThreadA: What can I do for you?
# ThreadB: A hot dog, please!
# ThreadA: Five yuan.
# ThreadB: Thanks.
# ThreadA: You are welcome.
            
          

4、事件

事件用于線程間通信。一個(gè)線程發(fā)出一個(gè)信號(hào),其它一個(gè)或多個(gè)線程等待,調(diào)用 event 對(duì)象的 wait 方法,線程則會(huì)阻塞等待,直到其它線程 set 后,才會(huì)被喚醒。

            
              import threading
import time

class ThreadA(threading.Thread):
    def __init__(self, _event, name):
        super(ThreadA, self).__init__()
        self.event = _event
        self.name = name

    def run(self):
        print(self.name + ": What can I do for you?")
        self.event.set()
        time.sleep(0.5)
        self.event.wait()
        print(self.name + ": Five yuan.")
        self.event.set()
        time.sleep(0.5)
        self.event.wait()
        self.event.clear()
        print(self.name + ": You are welcome!")

class ThreadB(threading.Thread):
    def __init__(self, _event, name):
        super(ThreadB, self).__init__()
        self.event = _event
        self.name = name

    def run(self):
        self.event.wait()
        self.event.clear()
        print(self.name + ": A hot dog, please!")
        self.event.set()
        time.sleep(0.5)
        self.event.wait()
        print(self.name + ": Thanks!")
        self.event.set()

if __name__ == "__main__":
    event = threading.Event()
    thread1 = ThreadA(event, "ThreadA")
    thread2 = ThreadB(event, "ThreadB")
    thread1.start()
    thread2.start()

# output:
# ThreadA: What can I do for you?
# ThreadB: A hot dog, please!
# ThreadA: Five yuan.
# ThreadB: Thanks!
# ThreadA: You are welcome!
            
          

5、線程優(yōu)先級(jí)隊(duì)列

Python 的 Queue 模塊中提供了同步的、線程安全的隊(duì)列類,包括FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,優(yōu)先級(jí)隊(duì)列 PriorityQueue。
Queue,LifoQueue,PriorityQueue都實(shí)現(xiàn)了鎖原語,能夠在多線程中直接使用,可以使用隊(duì)列來實(shí)現(xiàn)線程間的同步。
Queue 模塊中的常用方法:
Queue.qsize() 返回隊(duì)列的大小
Queue.empty() 如果隊(duì)列為空,返回True,否則返回False
Queue.full() 如果隊(duì)列滿,返回True,否則返回False
Queue.get([block[, timeout]])獲取隊(duì)列,timeout等待時(shí)間
Queue.get_nowait() 相當(dāng)Queue.get(False)
Queue.put(item) 寫入隊(duì)列,timeout等待時(shí)間
Queue.put_nowait(item) 相當(dāng)Queue.put(item, False)
Queue.task_done() 在完成一項(xiàng)工作后,Queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
Queue.join() 阻塞直到隊(duì)列為空,再執(zhí)行別的操作

            
              # -*- coding:utf-8 -*-
import threading
import time
import queue

exitFlag = 0

class WorkThread(threading.Thread):
    def __init__(self, id, name, q):
        threading.Thread.__init__(self)
        self.thread_id = id
        self.name = name
        self.queue = q

    def run(self):
        work(self.name, self.queue)

def work(thread_name, q):
    while not exitFlag:
        thread_locker.acquire()
        if not work_queue.empty():
            data = q.get()
            print("%s processing %s" % (thread_name, data))
            thread_locker.release()
        else:
            thread_locker.release()
        time.sleep(1)

thread_locker = threading.Lock()
thread_list = ["Thread1", "Thread2", "Thread3"]
work_queue = queue.Queue(10)
messages = ["one", "two", "three", "four", "five"]
threads = []
thread_id = 1

if __name__ == '__main__':
    # 創(chuàng)建新線程
    for name in thread_list:
        thread = WorkThread(thread_id, name, work_queue)
        thread.start()
        threads.append(thread)
        thread_id += 1

    # 填充隊(duì)列
    thread_locker.acquire()
    for word in messages:
        work_queue.put(word)
    thread_locker.release()

    # 等待隊(duì)列清空
    while not work_queue.empty():
        pass

    # 通知線程是時(shí)候退出
    exitFlag = 1

    # 等待所有線程完成
    for t in threads:
        t.join()
    print("exit main thread")

# output:
# Thread1 processing one
# Thread3 processing two
# Thread2 processing three
# Thread1 processing four
# Thread3 processing five
# exit main thread
            
          

6、線程死鎖

死鎖是指兩個(gè)或兩個(gè)以上的進(jìn)程或線程在執(zhí)行過程中,因爭奪資源而造成的一種互相等待的現(xiàn)象。在線程間共享多個(gè)資源的時(shí)候,如果分別占有一部分資源并且同時(shí)在等待對(duì)方的資源,就會(huì)造成死鎖。例如數(shù)據(jù)庫操作時(shí)A線程需要B線程的結(jié)果進(jìn)行操作,B線程的需要A線程的結(jié)果進(jìn)行操作,當(dāng)A,B線程同時(shí)在進(jìn)行操作還沒有結(jié)果出來時(shí),此時(shí)A,B線程將會(huì)一直處于等待對(duì)方結(jié)束的狀態(tài)。

            
              import time
import threading

class Account:
    def __init__(self, _id, balance, lock):
        self.id = _id
        self.balance = balance
        self.lock = lock

    def withdraw(self, amount):
        self.balance -= amount

    def deposit(self, amount):
        self.balance += amount

def transfer(_from, to, amount):
    if _from.lock.acquire():
        _from.withdraw(amount)
        time.sleep(1)
        print('wait for lock...')
        if to.lock.acquire():
            to.deposit(amount)
            to.lock.release()
        _from.lock.release()
    print('finish...')

if __name__ == "__main__":
    a = Account('a', 1000, threading.Lock())
    b = Account('b', 1000, threading.Lock())
    threading.Thread(target=transfer, args=(a, b, 100)).start()
    threading.Thread(target=transfer, args=(b, a, 200)).start()
            
          

解決死鎖問題的一種方案是為程序中的每一個(gè)鎖分配一個(gè)唯一的id,然后只允許按照升序規(guī)則來使用多個(gè)鎖。

四、線程池

1、線程池簡介

線程池在系統(tǒng)啟動(dòng)時(shí)即創(chuàng)建大量空閑的線程,程序只要將一個(gè)任務(wù)函數(shù)提交給線程池,線程池就會(huì)啟動(dòng)一個(gè)空閑的線程來執(zhí)行它。當(dāng)任務(wù)函數(shù)執(zhí)行結(jié)束后,線程并不會(huì)死亡,而是再次返回到線程池中變成空閑狀態(tài),等待執(zhí)行下一個(gè)任務(wù)函數(shù)。
使用線程池可以有效地控制系統(tǒng)中并發(fā)線程的數(shù)量。當(dāng)系統(tǒng)中包含有大量的并發(fā)線程時(shí),會(huì)導(dǎo)致系統(tǒng)性能急劇下降,甚至導(dǎo)致?Python?解釋器崩潰,而線程池的最大線程數(shù)參數(shù)可以控制系統(tǒng)中并發(fā)線程的數(shù)量不超過此數(shù)。
concurrent.futures模塊中的 Executor是線程池的抽象基類,Executor 提供了兩個(gè)子類,即ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于創(chuàng)建線程池,ProcessPoolExecutor 用于創(chuàng)建進(jìn)程池。
Exectuor 提供了如下常用接口:
submit(fn, *args, **kwargs) :將 fn 函數(shù)提交給線程池。 args 代表傳給 fn 函數(shù)的參數(shù),是元組類型, kwargs 代表以關(guān)鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù),是字典類型。submit 方法會(huì)返回一個(gè) Future 對(duì)象
map(func, *iterables, timeout=None, chunksize=1) :map函數(shù)將會(huì)啟動(dòng)多個(gè)線程,以異步方式立即對(duì) iterables 執(zhí)行 map 處理。
shutdown(wait=True):關(guān)閉線程池。
Future 提供了如下方法:
cancel():取消Future 代表的線程任務(wù)。如果任務(wù)正在執(zhí)行,不可取消,則返回 False;否則,程序會(huì)取消任務(wù),并返回 True。
cancelled():返回Future代表的線程任務(wù)是否被成功取消。
running():如果Future 代表的線程任務(wù)正在執(zhí)行、不可被取消,則返回 True。
done():如果Funture 代表的線程任務(wù)被成功取消或執(zhí)行完成,則返回 True。
result(timeout=None):獲取Future 代表的線程任務(wù)最后的返回結(jié)果。如果 Future 代表的線程任務(wù)還未完成,result方法將會(huì)阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。
exception(timeout=None):獲取Future 代表的線程任務(wù)所引發(fā)的異常。如果任務(wù)成功完成,沒有異常,則該方法返回 None。
add_done_callback(fn):為Future 代表的線程任務(wù)注冊(cè)一個(gè)“回調(diào)函數(shù)”,當(dāng)線程任務(wù)成功完成時(shí),程序會(huì)自動(dòng)觸發(fā)fn 函數(shù)。
使用用完一個(gè)線程池后,應(yīng)該調(diào)用線程池的 shutdown() 方法, shutdown方法將啟動(dòng)線程池的關(guān)閉序列。調(diào)用 shutdown() 方法后的線程池不再接收新任務(wù),但會(huì)將所有的已提交任務(wù)執(zhí)行完成。當(dāng)線程池中的所有任務(wù)都執(zhí)行完成后,線程池中的所有線程都會(huì)死亡。

2、線程池

ThreadPoolExecutor(max_works),如果未顯式指定max_works,默認(rèn)線程池會(huì)創(chuàng)建 CPU的數(shù)目*5 數(shù)量的線程?。

            
              # -*- coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import threading
import time
import os
import string

class WorkThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        print('Process[%s]:%s start and run task' % (os.getpid(), threading.currentThread().getName()))
        time.sleep(2)
        return "Process[{}]:{} end".format(os.getpid(), threading.currentThread().getName())

def work_task(thread_name):
    print('Process[%s]:%s start and run task' % (os.getpid(), threading.currentThread().getName()))
    time.sleep(5)
    return "Process[{}]:{} end".format(os.getpid(), threading.currentThread().getName())

def get_call_back(future):
    print(future.result())

if __name__ == '__main__':
    print('main thread start')

    # create thread pool
    thread_pool = ThreadPoolExecutor(5)
    futures = []
    for i in range(5):
        thread = WorkThread()
        future = thread_pool.submit(thread.run)
        futures.append(future)

    for i in range(5):
        future = thread_pool.submit(work_task, i)
        futures.append(future)

    for future in futures:
        future.add_done_callback(get_call_back)

    # thread_pool.map(work_task, (2, 3, 4))
    thread_pool.shutdown()

# output:
# main thread start
# Process[718]:ThreadPoolExecutor-0_0 start and run task
# Process[718]:ThreadPoolExecutor-0_1 start and run task
# Process[718]:ThreadPoolExecutor-0_2 start and run task
# Process[718]:ThreadPoolExecutor-0_3 start and run task
# Process[718]:ThreadPoolExecutor-0_4 start and run task
# Process[718]:ThreadPoolExecutor-0_3 end
# Process[718]:ThreadPoolExecutor-0_3 start and run task
# Process[718]:ThreadPoolExecutor-0_1 end
# Process[718]:ThreadPoolExecutor-0_1 start and run task
# Process[718]:ThreadPoolExecutor-0_2 end
# Process[718]:ThreadPoolExecutor-0_2 start and run task
# Process[718]:ThreadPoolExecutor-0_0 end
# Process[718]:ThreadPoolExecutor-0_0 start and run task
# Process[718]:ThreadPoolExecutor-0_4 end
# Process[718]:ThreadPoolExecutor-0_4 start and run task
# Process[718]:ThreadPoolExecutor-0_2 end
# Process[718]:ThreadPoolExecutor-0_3 end
# Process[718]:ThreadPoolExecutor-0_1 end
# Process[718]:ThreadPoolExecutor-0_4 end
# Process[718]:ThreadPoolExecutor-0_0 end
            
          

3、進(jìn)程池

ProcessPoolExecutor(max_works),如果未顯式指定max_works,默認(rèn)進(jìn)程池會(huì)創(chuàng)建 CPU的數(shù)目*5 數(shù)量的進(jìn)程?。
進(jìn)程池同步方案:

            
              from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def work_task(n):
    print('Process[%s] is running' % os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':
    start = time.time()
    pool = ProcessPoolExecutor()
    for i in range(5):
        obj = pool.submit(work_task, i).result()
    pool.shutdown()
    print('='*30)
    print("time: ", time.time() - start)

# output;
# Process[7372] is running
# Process[7373] is running
# Process[7374] is running
# Process[7375] is running
# Process[7372] is running
# ==============================
# time:  10.023026466369629
            
          

進(jìn)程池異步方案:

            
              from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def work_task(n):
    print('Process[%s] is running' % os.getpid())
    time.sleep(random.randint(1, 3))
    return n**2

if __name__ == '__main__':
    start = time.time()
    pool = ProcessPoolExecutor()
    objs = []
    for i in range(5):
        obj = pool.submit(work_task, i)
        objs.append(obj)
    pool.shutdown()
    print('='*30)
    print([obj.result() for obj in objs])
    print("time: ", time.time() - start)

# output;
# Process[8268] is running
# Process[8269] is running
# Process[8270] is running
# Process[8271] is running
# Process[8270] is running
# ==============================
# [0, 1, 4, 9, 16]
# time:  2.0124566555023193
            
          

五、生產(chǎn)者消費(fèi)者模型

            
              import threading
from queue import Queue
from urllib.request import urlopen

ips = ["www.baidu.com",
       "www.taobao.com",
       "www.huawei.com",
       "www.alibaba.com",
       "www.meituan.com",
       "www.xiaomi.com"]
ports = [80, 443]

class Producer(threading.Thread):
    def __init__(self, _queue):
        super(Producer, self).__init__()
        self.queue = _queue

    def run(self):
        urls = ["http://%s:%s" % (ip, port) for ip in ips for port in ports]
        for url in urls:
            self.queue.put(url)

class Consumer(threading.Thread):
    def __init__(self, _queue):
        super(Consumer, self).__init__()
        self.queue = _queue

    def run(self):
        try:
            url = self.queue.get()
            urlopen(url)
        except Exception as e:
            print("%s is unknown url" % url)
        else:
            print("%s is ok" % url)

if __name__ == "__main__":
    # 實(shí)例化一個(gè)隊(duì)列
    queue = Queue()

    for i in range(2):
        producer = Producer(queue)
        producer.start()

    for i in range(30):
        consumer = Consumer(queue)
        consumer.start()

# output:
# http://www.taobao.com:443 is unknown url
# http://www.huawei.com:443 is unknown url
# http://www.huawei.com:443 is unknown urlhttp://www.taobao.com:443 is unknown url
#
# http://www.baidu.com:80 is ok
# http://www.baidu.com:443 is unknown url
# http://www.xiaomi.com:443 is unknown url
# http://www.baidu.com:80 is ok
# http://www.baidu.com:443 is unknown url
# http://www.xiaomi.com:443 is unknown url
# http://www.alibaba.com:443 is unknown url
# http://www.alibaba.com:443 is unknown url
# http://www.meituan.com:443 is unknown urlhttp://www.meituan.com:443 is unknown url
#
# http://www.huawei.com:80 is ok
# http://www.huawei.com:80 is ok
# http://www.xiaomi.com:80 is ok
# http://www.xiaomi.com:80 is ok
# http://www.taobao.com:80 is ok
# http://www.meituan.com:80 is ok
# http://www.meituan.com:80 is ok
# http://www.taobao.com:80 is ok
# http://www.alibaba.com:80 is ok
# http://www.alibaba.com:80 is ok