微線程領域(至少在 Python 中)一直都是 Stackless Python 才能涉及的特殊增強部分。關于 Stackless 的話題以及最近它經歷的變化,可能本身就值得開辟一個專欄了。但其中簡單的道理就是,在“新的 Stackless”下,延續(continuation)顯然是不合時宜的,但微線程還是這個項目 存在的理由。這一點很復雜……
剛開始,我們還是先來回顧一些內容。那么,什么是微線程呢? 微線程基本上可以說是只需要很少的內部資源就可以運行的進程 ?D 并且是在 Python 解釋器的單個實例中(在公共內存空間中,等等)運行的進程。有了微線程,我們就可能在目前中等性能的 PC 機上運行數以萬計的并行進程,還可以每秒鐘幾十萬次地在上下文之間切換。對 fork() 的調用或標準的 OS 線程調用根本不能達到這個程度!甚至所謂的“輕量級”線程庫中的線程也比這里提出的微線程“重”好幾個數量級。
我在本專欄中介紹的輕便線程的含義與 OS 線程的含義有一點不同。就這點而言,它們與 Stackless 所提供的也不盡相同。在很多方面,輕便線程比大多數變體都簡單得多;大多數關于信號、鎖定及諸如此類的問題都不存在了。簡單性的代價就是,我提出了一種“協作多線程”的形式;我覺得在標準 Python 框架中加入搶占并不可行(至少在非 Stackless 的 Python 2.2 中 ― 沒有人知道 __future__ 會帶來什么)。
輕便線程在某種意義上會令人回想起較早的 Windows 和 MacOS 版本的協作多任務(不過是在單個應用程序中)。然而,在另一種意義上,輕便線程只不過是在程序中表達流的另一種方式;輕便線程所做的一切(至少在原則上)都可以用“真正龐大的 if/elif 塊”技術來完成(蠻干的程序員的黔驢之計)。
一種用簡單的生成器模擬協同程序的機制。這個機制的核心部分非常簡單。 scheduler() 函數中包裝了一組生成器對象,這個函數控制將控制流委托給合適的分支的過程。這些協同程序并不是 真正的協同程序,因為它們只控制到 scheduler() 函數和來自該函數的分支。不過出于實用的目的,您可以用非常少的額外代碼來完成同樣的事情。 scheduler() 就是類似于下面這樣的代碼:
清單 1. 模擬協同程序的 Scheduler()
def scheduler(gendct, start): global cargo coroutine = start while 1: (coroutine, cargo) = gendct[coroutine].next()
關于這個包裝器要注意的一點是,每個生成器/協同程序都會生成一個包含它的預期分支目標的元組。生成器/協同程序基本上都在 GOTO 目標處退出。為了方便起見,我還讓生成器生成了一個標準的 cargo 容器,作為形式化在協同程序之間傳送的數據的方法 ― 不過您也可以只用已經達成一致的全局變量或回調 setter/getter 函數來傳送數據。Raymond Hettinger 撰寫了一個 Python 增強倡議(Python Enhancement Proposal,PEP),旨在使傳送的數據能被更好地封裝;可能今后的 Python 將包括這個倡議。
新的調度程序
對于輕便線程來說,它們的需求與協同程序的需求稍有不同。不過我們還是可以在它的核心處使用 scheduler() 函數。不同之處在于,調度程序本身應該決定分支目標,而不是從生成器/協同程序接收分支目標。下面讓我向您展示一個完整的測試程序和樣本:
清單 2. microthreads.py 示例腳本
from __future__ import generators import sys, time threads = [] TOTALSWITCHES = 10**6 NUMTHREADS = 10**5def null_factory(): def empty(): while1: yield None return empty() def quitter(): for n in xrange(TOTALSWITCHES/NUMTHREADS): yield None def scheduler(): global threads try : while1: for thread in threads: thread.next() except StopIteration: passif __name__ == "__main__" : for i in range(NUMTHREADS): threads.append(null_factory()) threads.append(quitter()) starttime = time.clock() scheduler() print"TOTAL TIME: " , time.clock()-starttime print"TOTAL SWITCHES:" , TOTALSWITCHES print"TOTAL THREADS: " , NUMTHREADS
這大概就是您能夠選擇的最簡單的輕便線程調度程序了。每個線程都按固定順序進入,而且每個線程都有同樣的優先級。接下來,讓我們來看看如何處理細節問題。和前面部分所講的協同程序一樣,編寫輕便線程時應該遵守一些約定。
處理細節
大多數情況下,輕便線程的生成器都應該包括在 while 1: 循環中。這里設置調度程序的方法將導致在其中一個線程停止時整個調度程序停止。這在某種意義上“健壯性”不如 OS 線程 ?D 不過在 scheduler() 的循環 內捕獲異常不會比在循環外需要更多的機器資源。而且,我們可以從 threads 列表刪除線程,而不必終止(由它本身或其它線程終止)。我們其實并沒有提供讓刪除更加容易的詳細方法;不過比較常用的擴展方法可能是將線程存儲在字典或某種其它的結構中,而不是列表中。
該示例說明了最后終止調度程序循環的一種合理的方法。 quitter() 是一種特殊的生成器/線程,它監視某種條件(在本示例中只是一個上下文切換的計數),并在條件滿足時拋出 StopIteration (本示例中不捕獲其它異常)。請注意,在終止之后,其它所有生成器還是完整的,如果需要,還可以在今后恢復(在微線程調度程序或其它程序中)。顯然,如果需要,您可以 delete 這些生成器/線程。
這里討論的示例使用了特殊的無意義線程。它們什么也不做,而且以一種可能性最小的形式實現這一點。我們這樣建立該示例是為了說明一點 ?D 輕便線程的內在開銷是非常低的。在一臺比較老的只有 64 MB 內存的 Windows 98 Pentium II 膝上型電腦上創建 100,000 個輕便線程是輕而易舉的(如果達到了一百萬個線程,就會出現長時間的磁盤“猛轉”)。請用 OS 線程試試看! 而且,在這個比較慢的 366 MHz 芯片上可以在大約 10 秒內執行一百萬次上下文切換(所涉及的線程數對耗時并無重大影響)。顯然,真正的輕便線程應該 做一些事情,而這將根據任務使用更多的資源。不過線程本身卻贏得了“輕便”的名聲。
切換開銷
在輕便線程之間切換開銷很小,但還不是完全沒有開銷。為了測試這種情況,我構建了一個執行 某種工作(不過大約是您在線程中按道理可以完成的最少量)的示例。因為線程調度程序 真的等同于“執行 A,接著執行 B,然后執行 C,等等”的指令,所以要在主函數中創建一個完全并行的情況也不困難。
清單 3. overhead.py 示例腳本
from __future__ import generators import time TIMES = 100000 def stringops(): for n in xrange(TIMES): s = "Mary had a little lamb" s = s.upper() s = "Mary had a little lamb" s = s.lower() s = "Mary had a little lamb" s = s.replace('a','A') def scheduler(): for n in xrange(TIMES): for thread in threads: thread.next() def upper(): while1: s = "Mary had a little lamb" s = s.upper() yield None def lower(): while1: s = "Mary had a little lamb" s = s.lower() yield None def replace(): while1: s = "Mary had a little lamb" s = s.replace( 'a' , 'A' ) yield None if __name__== '__main__': start = time.clock() stringops() looptime = time.clock()-start print"LOOP TIME:" , looptime global threads threads.append(upper()) threads.append(lower()) threads.append(replace()) start = time.clock() scheduler() threadtime = time.clock()-start print"THREAD TIME:" , threadtime
結果表明,在直接循環的版本運行一次的時間內,輕便線程的版本運行了兩次還多一點點 ?D 也就相當于在上面提到的機器上,輕便線程運行了不到 3 秒,而直接循環運行了超過 6 秒。顯然,如果每個工作單元都相當于單個字符串方法調用的兩倍、十倍或一百倍,那么所花費的線程開銷比例就相應地更小了。
設計線程
輕便線程可以(而且通常應該)比單獨的概念性操作規模更大。無論是何種線程,都是用來表示描述一個特定 任務或 活動所需的流上下文的量。但是,任務花費的時間/大小可能比我們希望在單獨線程上下文中使用的要多/大。搶占將自動處理這種問題,不需要應用程序開發者作出任何特定干涉。不幸的是,輕便線程用戶需要注意“好好地處理”其它線程。
至少,輕便線程應該設計得足夠周全,在完成概念性操作時應該能夠 yield 。調度程序將回到這里以進行下一步。舉例來說:
清單 4. 偽碼友好的輕便線程
def nicethread():
??? while 1:
??????? ...operation A...
??????? yield None
??????? ...operation B...
??????? yield None
??????? ...operation C...
??????? yield None
多數情況下,好的設計將比在基本操作之間的邊界 yield 更多次。雖然如此,通常在概念上“基本”的東西都涉及對一個大集合的循環。如果情況如此(根據循環體耗費時間的程度),在循環體中加入一到兩個 yield (可能在特定數量的循環迭代執行過后再次發生)可能會有所幫助。和優先權線程的情況不同,一個行為不良的輕便線程會獲取無限量的獨占處理器時間。
調度的其它部分
迄今為止,上面的示例只展示了形式最基本的幾個線程調度程序。可能實現的還有很多(這個問題與設計一個好的生成器/線程沒什么關系)。讓我來順便向您展示幾個傳送中可能出現的增強。
更好的線程管理
一個簡單的 threads 列表就可以使添加調度程序要處理的生成器/線程非常容易。但是這種數據結構并不能使刪除或暫掛不再相關的線程變得容易。字典或類可能是線程管理中更好的數據結構。下面是一個快捷的示例,這個類能夠(幾乎能夠)順便訪問示例中的 threads 列表:
清單 5. 線程管理的 Python 類示例
class ThreadPool: """Enhanced threads list as class threads = ThreadPool() threads.append(threadfunc) # not generator object if threads.query(num) <>: threads.remove(num) """def __init__(self): self.threadlist = [] self.threaddict = {} self.avail = 1def __getitem__(self, n): return self.threadlist[n] def append(self, threadfunc, docstring=None): # Argument is the generator func, not the gen object # Every threadfunc should contain a docstring docstring = docstring or threadfunc.__doc__ self.threaddict[self.avail] = (docstring, threadfunc()) self.avail += 1 self.threadlist = [p[ 1] for p in self.threaddict.values()] return self.avail- 1# return the threadIDdef remove(self, threadID): del self.threaddict[threadID] self.threadlist = [p[ 1] for p in self.threaddict.values()] def query(self, threadID): " Information on thread, if it exists (otherwise None) return self.threaddict.get(threadID,[None])[0]
您可以實現更多內容,而這是個好的起點。
線程優先級
在簡單的示例中,所有線程都獲得調度程序同等的關注。至少有兩種普通方法可以實現調優程度更好的線程優先級系統。一個優先級系統可以只對“高優先級”線程投入比低優先級線程更多的注意力。我們可以用一種直接的方式實現它,就是創建一個新類 PriorityThreadPool(ThreadPool) ,這個類在線程迭代期間更頻繁地返回更重要的線程。最簡單的方法可能會在 .__getitem__() 方法中連續多次返回某些線程。那么,高優先級線程就可能接收到兩個,或多個,或一百個連續的“時間片”,而不只是原來的一個。這里的一個(非常弱的)“實時”變量最多可能返回散落在線程列表中各處的重要線程的多個副本。這將增加服務于高優先級線程的實際頻率,而不只是它們受到的所有關注。
在純 Python 中使用更復雜的線程優先級方法可能不是很容易(不過它是使用某種第三方特定于 OS/處理器的庫來實現的)。調度程序不是只給高優先級線程一個時間片的整型數,它還可以測量每個輕便線程中實際花費的時間,然后動態調整線程調度,使其對等待處理的線程更加“公平”(也許公平性和線程優先級是相關的)。不幸的是,Python 的 time.clock() 和它的系列都不是精度足夠高的計時器,不足以使這種方式有效。另一方面,沒有什么可以阻止“多時間片”方法中處理不足的線程去動態提高它自己的優先級。
將微線程和協作程序結合在一起
為了創建一個輕便線程(微線程)調度程序,我刪除了協作程序邏輯“please branch to here”。這樣做其實并不必要。示例中的輕便線程生成的通常都是 None ,而不是跳轉目標。我們完全可以把這兩個概念結合在一起:如果協同程序/線程生成了跳轉目標,調度程序就可以跳轉到被請求的地方(也許,除非被線程優先級覆蓋)。然而,如果協同程序/線程只生成 None ,調度程序就可以自己決定下一步要關注哪個適當的線程。決定(以及編寫)一個任意的跳轉究竟會如何與線性線程隊列交互將涉及到不少工作,不過這些工作中沒有什么特別神秘的地方。
快速而廉價 ― 為什么不喜歡它呢?
微線程模式(或者“輕便線程”)基本上可以歸結為 Python 中流控制的另一種奇怪的風格。本專欄的前面幾個部分已經談到了另外幾種風格。有各種控制機制的引人之處在于,它讓開發者將代碼功能性隔離在其邏輯組件內,并最大化代碼的上下文相關性。
其實,要實現做任何可能做到的事的 可能性并不復雜(只要用一個“loop”和一個“if”就可以了)。對于輕易地分解為很多細小的“代理”、“服務器”或“進程”的一類問題來說,輕便線程可能是表達應用程序的底層“業務邏輯”的最清楚的模型。當然,輕便線程與一些大家更熟知的流機制相比速度可能非常快,就這點而言并無大礙。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
