假設我們的系統在運行的過程中,源源不斷的有新的任務需要處理(比如訂單處理),而且這些任務的處理是相互獨立的,沒有前后順序依賴性(順序依賴性是指,必須在任務
A
處理結束后才可開始
B
任務),那么我們就可以使用多個線程來同時處理多個任務。每個處理任務的線程稱為“工作者(線程)”。
我設計了
ESBasic.Threading.Engines.IWorkerEngine
工作者引擎,其目的就是使用多個線程來并行處理任務,提高系統的吞吐能力。

2. 適用場合:
設計工作者引擎 ESBasic.Threading.Engines.IWorkerEngine 的主要目的是為了解決類似下面的問題:
(1) 充分利用多 CPU 、多核計算資源。
(2) 減少因高速設備與低速設備之間速度差而產生計算資源浪費。
(3) 對于突發的大批量的任務(比如訂單系統經常在其它時段接受的訂單很少,但在某高峰期會有突發性的大量的訂單進來)進行緩沖處理,并最大限度地利用現有資源進行處理。
3 .設計思想與實現
IWorkerEngine 的設計思路是這樣的:我們使用一個隊列來存放需要處理的任務,新來的任務都會排隊到這個隊列中,然后有 N 個工作者線程不斷地從隊列中取出任務去處理,每個線程處理完當前任務后,又從隊列中取出下一個任務 …… ,如此循環。
IWorkerEngine 接口的源碼對應如下:
{
/// <summary>
/// IdleSpanInMSecs當沒有工作要處理時,工作者線程休息的時間間隔。默認為10ms
/// </summary>
int IdleSpanInMSecs{ get ; set ;}
/// <summary>
/// WorkerThreadCount工作者線程的數量。默認值為1。
/// </summary>
int WorkerThreadCount{ get ; set ;}
/// <summary>
/// WorkProcesser用于處理任務的處理器。
/// </summary>
IWorkProcesser < T > WorkProcesser{ set ;}
/// <summary>
/// WorkCount當前任務隊列中的任務數。
/// </summary>
int WorkCount{ get ;}
/// <summary>
/// MaxWaitWorkCount歷史中最大的處于等待狀態的任務數量。
/// </summary>
int MaxWaitWorkCount{ get ;}
void Initialize();
void Start();
void Stop();
/// <summary>
/// AddWork添加任務。
/// </summary>
void AddWork(Twork);
}
由于任務的類型不是固定的,所以我們使用的泛型參數 T 來表示要處理任務的類型。
所有的任務的具體執行都是由 IWorkProcesser 完成的:
{
void Process(Twork);
}
實現這個
IWorkerEngine
接口的時候要注意以下幾點:
(1) AddWork 方法會在多線程的環境中被調用,所以必須保證其是線程安全的。
(2) 每個工作者線程實際上就是一個我們前面介紹的循環引擎 ICycleEngine ,只不過將其 DetectSpanInSecs 設為 0 即可,表示不間斷地執行任務。 WorkerEngine 便是使用了 N 個 AgileCycleEngine 實例來作為工作者的。這些 AgileCycleEngine 實例在 Initialize 方法中被實例化。
(3) 所有的工作者最終都是執行私有的 DoWork 方法,這個方法就是從任務隊列中取出任務并且調用 IWorkProcesser 來處理任務,如果任務隊列為空,則等待 IdleSpanInMSecs 秒鐘后再重試。
(4) MaxWaitWorkCount 屬性用于記錄自從引擎運行以來最大的等待任務的數量,通過這個屬性我們可以推測任務量與任務處理速度之間的差距。
(5)
通過
Start
、
Stop
方法我們可以隨時停止、啟動工作者引擎,并可重復調用。
4. 使用時的注意事項
(1) 當引擎已經啟動并正在運行時,如果要修改 WorkerThreadCount 的值并使其生效,則必須先調用 Stop 方法停止引擎,然后重新調用 Initialize 方法初始化引擎,再調用 Start 方法啟動引擎。
(2)
關于工作者線程的個數
N
的設置的問題。這個數字不是越大越好,因為使用的線程越多,而
CPU
跟不上的話,那么消耗在線程切換上的浪費就越嚴重。所以,為了達到最好的性能,需要為工作者線程個數設置一個合適的值。
通常,這個值跟
CPU
的個數、
CPU
核的個數、任務的復雜度、慢速設備與快速設備之間的速度差以及它們的吞吐量有關。我們可以通過足夠的測試來發現適合我們系統的
N
值。
一般情況下的推薦值為:
CPU
個數
*
單個
CPU
的核數
*2 + 1
。
5. 擴展
( 1 )“一次性”的工作者引擎: BriefWorkerEngine
假設我們的系統可能會偶爾有一批任務要處理(也許永遠也不會有這樣的任務出現),我們希望只有當任務到來時,才使用一個工作者引擎實例來多線程處理它,處理完后,該引擎就可以釋放掉。
ESBasic.Threading.Engines.BriefWorkerEngine ,精簡的工作者引擎,便是為這一目的而設計的。它使用多線程處理一批任務,當這批任務處理結束后,工作者線程會被自動釋放,而該引擎實例也就可以被結束了。
為了方便使用,我將 BriefWorkerEngine 設計為從構造函數注入引擎運行所需要的參數,包括任務處理器、工作者線程個數、以及要處理的任務集合。在引擎實例被構造成功的同時,內部的循環引擎已經準備好了。注意, BriefWorkerEngine 實現了 IDisposable 接口,這表明當引擎被釋放時,內部所有的循環引擎都會停止運行,從而不再占有后臺線程池中的線程。
我們可以這樣來使用 BriefWorkerEngine :
IList < MyTask > taskList = ... ;
BriefWorkerEngine < MyTask > engine = new BriefWorkerEngine < MyTask > (processer, 5 ,taskList);
engine.Start();
while ( ! engine.IsFinished())
{
System.Threading.Thread.Sleep( 100 );
}
engine.Dispose();
// 執行到這里,表示所有任務已經處理完畢,引擎實例即將被釋放。
我們可以通過它的
IsFinished
方法來檢測執行是否已經完成。當
IsFinished
方法返回
true
時,引擎實例就可以被銷毀了。
(
2
)永不停止的工作者引擎
我們同樣可以考慮一個類似于循環引擎的擴展的情況,假設我們的系統要求在啟動時就將工作者引擎運行起來,而且在整個運行的生命周期中,都不需要停止引擎,那么我們就不想將 Start 方法、 Stop 方法暴露出來以免意外的調用 Stop 方法而導致引擎停止運行,那這個時候我們可以使用相同的技巧來做到:
{
private IWorkerEngine < MyTask > workerEngine;
public void Initialize()
{
this .workerEngine = new WorkerEngine < MyTask > ();
this .workerEngine.WorkerThreadCount = 5 ;
// this.workerEngine.WorkProcesser=

this .workerEngine.Initialize();
this .workerEngine.Start();
}
}
public class MyTask {}
其道理與循環引擎的擴展是一樣的。
注:ESBasic源碼可到
http://esbasic.codeplex.com/
下載。
ESBasic討論:37677395
ESBasic開源前言
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
