前一陣子研究了幾種可以在 .NET 善用多核心的作法之後, 最近剛好又在在 MSDN Magazine 看到這篇不錯的文章: http://msdn.microsoft.com/msdnmag/issues/08/02/NETMatters/default.aspx 裡面點出了另一種作法: 串流管線 (Stream Pipeline)...
如果你是以多核心能有效利用為目標, 那這篇對你可能沒什麼用... 不過如果你要處理的資料必需經過幾個 Stream 處理的話, 這篇就派的上用場. 這篇是以一個常見的例子為起點: 資料壓縮 + 加密...
在 .NET 要壓縮資料很簡單, 只要把資料寫到 GzipStream 就好, 建立 GzipStream 時再指定它要串接的 FileStream 就可以寫到檔案了 (串 Network 就可以透過網路傳出去... etc), 而加密則是用 CryptoStream, 也是一樣的用法, 可以串接其它的 Stream ...
正好利用這個特性, 資料要壓縮又要加密, 可以這樣作:
(single thread) [INPUT] --> GzipStream --> CryptoStream --> [OUTPUT]
不過那裡跟多核 CPU 扯上關係? 因為壓縮跟加密都是需要大量 CPU 運算, 這樣的作法等於是只用單一 thread 來負責 Gzip 跟 Crypto 的工作. 即使透過串接 stream , 動作被切成很多小段, 壓縮一點就加密一點, 但是仍然只有一個 thread... 再忙也仍然只有用到單一核心, 除非等到未來改板, 用類似上一篇 [TPL] 的方法改寫的 library 才有機會改善...
作者提出另一個觀點, 這兩個 stream 能不能分在兩個 thread 各別執行? 弄的像生產線 (pipeline) 一樣, 第一個作業員負責壓縮, 第二個作業員負責加密, 同時進行, 就可以有兩倍的處理速度... 答案當然是 "可以" ! 我佩服 Stephen Toub 的地方在於他很漂亮的解決了這個問題, 就是它寫了 BlockingStream 就把問題解掉了, 乾淨又漂亮... 真是甘拜下風.. 這也是我為什麼想多寫這篇的原因.
之前在念作業系統, 講到多工的課題, 有提過 "生產者 / 消費者" 的設計, 就是一部份模組負責丟出一堆工作, 而另一個模組則負責把工作處理掉. 如何在中間協調管控就是個重要的課題. 生產的太快工作會太多, 因此多到某個程度就要降低生產者的速度, 通常就是暫停. 消費者消化的太快也會沒事作, 就需要暫停來休息一下等待新的工作. 而消費者這端如果是用 thread pool 來實作, 就剛好是 [上篇] 提到的動態調整 thread pool 裡的 worker thread 數量的機制. 工作太多會動態增加 worker thread 來處理, 工作太少就會讓多餘的 worker thread 睡覺, 睡太久就把它砍了...
而這次 Stephen Toub 解決的方式: BlockingStream 則是處理生產者跟消費者之間的橋樑. Stream 這種類型, 正好是身兼消費及生產者的角色. 他舉了郵件處理的例子. 如果你有一堆信要寄, 你也許會找一個人把卡片折好, 放進信封貼郵票. 這些動作由一個人 (one thread) 來作, 就是原本的作法. 生產者就是那堆要寄的信件, 這個可憐的工人就是要把這些信裝好信封貼好郵票的消費者.
如果有第二個人呢? 他們可以分工, 一個裝信封, 一個黏郵票. 第一個人 ( thread 1 ) 裝好信封交給第二個人 (thread 2), 第二個人再黏郵票就搞定了. 這就是典型的生產線模式. 這樣在第一個人裝信封的同時, 第二個人可以貼上一封信的郵票... 因此除了裝第一封信, 第二個人沒事作, 及貼最後一張郵票, 第一個人沒事作之外, 其它過程中兩個人都有工作, 對應到程式就是一個 thread 負責壓縮, 壓縮好一部份後就交給後面的 thread 負責加密, 同時前面的 thread 則繼續壓縮下一塊資料... 剛好搭配雙核 cpu, 效能就能提高. 兩個 stream 之間就靠 StreamPipeline 及 BlockingStream 隔開在兩個不同的 thread. 原本的流程改為:
[INPUT] --> GzipStream (thread 1) --> BlockingStream --> CryptoStream (thread 2) --> [OUTPUT]
雖然技術細節才是精華, 但是這部份就請各位看原文就好, 我寫再多也比不過原文講的清楚... 我想要比較的是這樣作跟之前碰到的幾種平行處理 or thread pool 差在那裡? 這種作法有什麼好處?
其實最大的差異在於, pipeline 以外的作法 (TPL, ThreadPool) 都是把大量的工作分散給多個工人來處理. 每個人處理的內容其實都是獨立的. 比如之前我處理的問題, 要把一堆照片轉成縮圖. 每個動作都互獨立, 可以很容易的分給多個人處理, 也能得到明顯的加速, 更大的好處是人手越多效能越好.
但是事情不是一直這麼理想. 某些情況是另一個極端, 沒有辦法靠人海戰術解決. 比如這個寄信的例子, 折信紙裝到信封不難, 但是要貼郵票的話, 工人可能就得放下信紙, 到旁邊拿起郵票膠水貼上去. 反覆下來他會浪費時間在交換作這兩件事情上. 這時較好的分工就不是找兩個人一人各處理一半的信, 而是一人負責折信裝信封, 一人負責貼郵票.
當然還有另外一種考量, 如果這些信 *必需* 依照順序處理, 你也無法用 thread pool 這種人海戰術來處理, 一定要把工作切成不同階段, 由專人負責才可以. 這就是 pipeline 能夠應用的時機. 其實這種作法在 CPU 很普遍, 多虧之前修過 micro-processor, 也多少學到皮毛. X86 CPU 從 80386 開始稱作 "超純量" (super scaler) CPU, 就是代表它在單一 clock cycle 能執行完超過一個指令, 靠的也是 pipeline.
再講下去就離題了, 簡單的說以往平行處理都是用 multi thread 來分配工作, 就像銀行有好幾個櫃台, 每個客戶在單一櫃台就辦完他的手續. 而 pipeline 就像拿選票一樣, 第一位查證件, 第二位蓋章, 第三位給你選票, 第四位問你要不要領公投票... 咳咳 (我實在不支持公投題目這麼無聊, 無關黨派)... 這是屬於兩種不同維度的工作切割方式. 使用 pipeline 的方式切割有幾個好處:
不過呢, 缺點也不少, 文章內也列了幾點:
廢話講了很多, 主要是這篇文章提供了另一種平行處理的方式, 是較少看到的, 就順手把心得寫一寫了, 自己留個心得, 以後才不會忘掉 [:D]. 最後, 很豬頭的是, 整篇英文很辛苦的 K 完, 才發現有中文版的, 真是... 下次各位看文章前請先注意一下有沒有中文版 [H], 當然你英文很好的話就沒差... 哈哈..
看來 Google 沒幾個看的懂中文的員工... 不過我的破英文倒是看的懂, 真是不簡單... [Y]
話說前陣子我的文章被盜連, 最初只是點了 Blogger 上面的回報不恰當內容的標幟, 嗯, 不鳥我...
後來在 Google 網站找了半天, 才找到回報 Google 所有不當內容的網頁, 看它訊息都寫中文的, 我就不客氣的用中文回報了... 等了一兩個禮拜果然沒鳥我...
上禮拜就用我的破英文再回報一次, 這次只貼了兩三句鼈腳英文, 簡單說明文章是我寫的, 他盜用我文章 blah blah... 耶! 果然有效... 現在再點那篇文章就變這樣了:
看來 Google 還是有在做事, 哈哈... 特別在此感謝一下..
小熊子告訴我我才發現, 竟然有其它Blog轉貼我的文章, 還在上面擺廣告... [:@] 回報 Blogger 也不知道有沒有用, 嘖嘖...
看來這網站也不是人工轉貼的, 八成 search blog 後用 RSS 自動全貼過去, 看它的內容有上千篇, 都是 canon / nikon 相機的內容... 算了, 我還介紹它幹嘛? 應該想辦法快讓它下架才對...
貼它的網址可能會幫它打廣告, 不過想一想不貼也只是讓它活更久而以... 網址就不打馬賽克了, 大家幫我點上面的 [標幟BLOG] 吧, 讓它早日下架... [:@]
http://canon-vs-nikon.blogspot.com/2007/11/canon-raw-codec-12-net-framework-30-wpf.html
續上篇, 從眾多閒置的 worker thread 挑選一個起來接工作有兩種策略作法. 一種作法是 Thread Pool 自己決定, 最基本的就是誰等最久就叫誰起來, 或是 Thread Pool 有自己的演算法挑一個最菜的 worker thread 來做工都可以… 另一種作法就是不管它, 每個 worker thread 都靠運氣, 交給上天 (OS) 決定, 看誰搶到下一個 job. 看起來第一種好像比較好, 事實上不見得. 每個 thread 之間的排程是個學問, OS 多工的效率好不好就看這個. 舉例來說, 如果每個 worker thread 的優先順序不同, 或是某些 thread 正好碰到 GC, 或是正好被移到 virtaual memory 等等, 硬去叫它起來工作反而要花更多的時間. 而這些資訊都在 OS 的排程器裡才有足夠的資訊可以判斷, 以寫 AP 的角度很難顧級到這個層面. 這時最好的辦法就是不管它, 用齊頭式的平等, 把選擇權交給 OS 決定.
又是一個說起來比 code 多的例子. 這兩種不同的策略, 寫成 code 其實只差一行… 就是選用 AutoResetEvent 跟 ManualResetEvent 的差別而以. .NET SDK 的 Class Reference 上這樣寫著:
AutoResetEvent: Notifies a waiting thread that an event has occurred. ManualResetEvent: Notifies one or more waiting threads that an event has occurred.
真正寫成 Code 來測試一下…
static void Main(string[] args)
{
for (int count = 0; count < 5; count++)
{
Thread t = new Thread(new ThreadStart(ThreadTest));
t.Start();
}
Thread.Sleep(1000);
wait.Set();
Thread.Sleep(1000);
wait.Set();
Thread.Sleep(1000);
wait.Set();
Thread.Sleep(1000);
wait.Set();
Thread.Sleep(1000);
wait.Set();
}
private static AutoResetEvent wait = new AutoResetEvent(false);
private static void ThreadTest()
{
Console.WriteLine("Thread[{0}]: wait...", Thread.CurrentThread.ManagedThreadId);
wait.WaitOne();
Console.WriteLine("Thread[{0}]: wakeup...", Thread.CurrentThread.ManagedThreadId);
}
執行結果:
Thread[ 3 ]: wait...
Thread[ 5 ]: wait...
Thread[ 4 ]: wait...
Thread[ 6 ]: wait...
Thread[ 7 ]: wait...
Thread[ 3 ]: wakeup...
Thread[ 4 ]: wakeup...
Thread[ 6 ]: wakeup...
Thread[ 5 ]: wakeup...
Thread[ 7 ]: wakeup...
程式過程中我加了幾個 Sleep, 首先我用同一個 AutoResetEvent, 讓五個 thread 都去等待同一個 notify event 來叫醒它. 而 AutoResetEvent 一次只能叫醒一個被 WaitOne blocked 住的 thread. 就是第一種先到先贏的作法, 後面幾行 wakeup 的 message 每隔一秒會跳一行出來.
再來看一下 ManualResetEvent …
static void Main(string[] args)
{
for (int count = 0; count < 5; count++)
{
Thread t = new Thread(new ThreadStart(ThreadTest));
t.Start();
}
Thread.Sleep(1000);
wait.Set();
}
private static ManualResetEvent wait = new ManualResetEvent(false);
private static void ThreadTest()
{
Console.WriteLine("Thread[{0}]: wait...", Thread.CurrentThread.ManagedThreadId);
wait.WaitOne();
Console.WriteLine("Thread[{0}]: wakeup...", Thread.CurrentThread.ManagedThreadId);
}
執行結果:
Thread[ 3 ]: wait...
Thread[ 4 ]: wait...
Thread[ 5 ]: wait...
Thread[ 6 ]: wait...
Thread[ 7 ]: wait...
Thread[ 5 ]: wakeup...
Thread[ 4 ]: wakeup...
Thread[ 6 ]: wakeup...
Thread[ 3 ]: wakeup...
Thread[ 7 ]: wakeup...
除了把型別宣告從 AutoResetEvent 換成 ManualResetEvent 之外, 其它都沒變. 當然 line 10 一次就能叫醒所有的 thread, 所以後面四次 Set( ) 我就直接刪掉了. 程式 run 到 line 10, 後面五行 wakeup 的訊息就會一次全出現, 而出現的順序是隨機的, 每次都不大一樣.
這種作法的解釋, 是一次 Set( ), 卡在 WaitOne( ) 的五個 thread 就全被叫醒了. 而這個現象如果套用在 SimpleThreadPool 的實作上, 它的作用相當於第二種作法. 一瞬間把所有的 worker thread 從 blocked 狀態移到 waiting 狀態. 而到底是那一個 thread 有幸第一個被 OS 移到 running 狀態? 就是根據 OS 自己的排程策略而定. 第一個移到 running 狀態的 thread 通常就能搶到 job queue 裡的工作, 剩下的沒搶到, 則又會因為沒有工作好做, 再度進入閒置狀態, 等待下一次機會再一起來碰一次運氣…
就這一行, 花了最多篇幅來說明, 因為它最抽象. 說明這段的目的, 如果你的 ThreadPool 要更進階一點, 如果你想要改用先排隊先贏的策略, 把 WaitHandle 的型別改成 AutoResetEvent 就好. 如果你希望根據工作的特性來微調每個 thread 的 priority, 你就必需用 ManualResetEvent.
好, 沒想到一百行左右的 SimpleThreadPool 有這麼多東西可以寫, 完整的 code 我直接貼在底下, 歡迎引用. 好用的話記得給個回應. 要用在你的 project 也歡迎, 只要禮貌性的支會我一聲. 讓我知道我寫的 code 被用在什麼地方就好. 寫到這裡總算告一段落. 謝謝收看 [:D]
– 完整的 SimpleThreadPool.cs 原始碼:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace ChickenHouse.Core.Threading
{
public class SimpleThreadPool : IDisposable
{
private List<Thread> _workerThreads = new List<Thread>();
private bool _stop_flag = false;
private bool _cancel_flag = false;
private TimeSpan _maxWorkerThreadTimeout = TimeSpan.FromMilliseconds(3000);
private int _maxWorkerThreadCount = 0;
private ThreadPriority _workerThreadPriority = ThreadPriority.Normal;
private Queue<WorkItem> _workitems = new Queue<WorkItem>();
private ManualResetEvent enqueueNotify = new ManualResetEvent(false);
public SimpleThreadPool(int threads, ThreadPriority priority)
{
this._maxWorkerThreadCount = threads;
this._workerThreadPriority = priority;
}
private void CreateWorkerThread()
{
Thread worker = new Thread(new ThreadStart(this.DoWorkerThread));
worker.Priority = this._workerThreadPriority;
this._workerThreads.Add(worker);
worker.Start();
}
public bool QueueUserWorkItem(WaitCallback callback)
{
return this.QueueUserWorkItem(callback, null);
}
public bool QueueUserWorkItem(WaitCallback callback, object state)
{
if (this._stop_flag == true) return false;
WorkItem wi = new WorkItem();
wi.callback = callback;
wi.state = state;
if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) CreateWorkerThread();
this._workitems.Enqueue(wi);
this.enqueueNotify.Set();
return true;
}
public void EndPool()
{
this.EndPool(false);
}
public void CancelPool()
{
this.EndPool(true);
}
public void EndPool(bool cancelQueueItem)
{
if (this._workerThreads.Count == 0) return;
this._stop_flag = true;
this._cancel_flag = cancelQueueItem;
this.enqueueNotify.Set();
do
{
Thread worker = this._workerThreads[0];
worker.Join();
this._workerThreads.Remove(worker);
} while (this._workerThreads.Count > 0);
}
private void DoWorkerThread()
{
while (true)
{
while (this._workitems.Count > 0)
{
WorkItem item = null;
lock (this._workitems)
{
if (this._workitems.Count > 0) item = this._workitems.Dequeue();
}
if (item == null) continue;
try
{
item.Execute();
}
catch (Exception)
{
//
// ToDo: exception handler
//
}
if (this._cancel_flag == true) break;
}
if (this._stop_flag == true || this._cancel_flag == true) break;
if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
break;
}
this._workerThreads.Remove(Thread.CurrentThread);
}
private class WorkItem
{
public WaitCallback callback;
public object state;
public void Execute()
{
this.callback(this.state);
}
}
public void Dispose()
{
this.EndPool(false);
}
}
}
既然上一篇都把 pseudo code 寫出來了, 現在就可以開始來寫真正的 Thread Pool 了. 開始之前, 我先把目標定一下. 這次寫的 Thread Pool 必需俱備這些能力:
每次在寫這些描述, 都會覺的怎麼寫起來比 code 還多… @_@, 沒錯, code 短到我可以直接貼上來, 不需要附檔案.. 我會把完整的 code 貼在最下方. 其它說明的部份只會貼片段.
首先, 先來決定 SimpleThreadPool 的 class define 為何. 依照需求及我希望它用起來的樣子, 為:
public class SimpleThreadPool : IDisposable
{
public SimpleThreadPool(int threads, ThreadPriority priority)
{
}
public bool QueueUserWorkItem(WaitCallback callback)
{
}
public bool QueueUserWorkItem(WaitCallback callback, object state)
{
}
public void EndPool()
{
}
public void CancelPool()
{
}
public void EndPool(bool cancelQueueItem)
{
}
private void DoWorkerThread()
{
}
public void Dispose()
{
this.EndPool(false);
}
// 略...
這個 ThreadPool 我希望它用起來像這樣, 貼一段理想中的用法 sample code:
SimpleThreadPool stp = new SimpleThreadPool(2, System.Threading.ThreadPriority.BelowNormal);
for (int count = 0; count < 25; count++)
{
stp.QueueUserWorkItem(
new WaitCallback(ShowMessage),
string.Format("STP1[{0}]", count));
Thread.Sleep(new Random().Next(500));
}
Console.WriteLine("wait stop");
stp.EndPool();
把 ThreadPool
想像成一個服務櫃台, 很多人排隊等著處理. 因此整個實作會像是個工作的佇列 (job queue), 只要把你的工作放到 queue 裡 (排隊), 而服務人員 (worker thread) 就會一個一個的處理. 最後你可以決定要把所有工作做完才收攤 (呼叫 EndPool()
, 會 blocked 直到工作清光), 或是決定掛牌 “明日請早” (呼叫 CancelPool()
), 只把作到一半的工作處理掉, 剩下還在排隊的改天再來.
整個實作的關鍵部份是在 private void DoWorkerThread()
, 裡面寫的 code 就是每一個 worker thread 要執行的所有內容. 補上實作的 code:
private void DoWorkerThread()
{
while (true)
{
while (this._workitems.Count > 0)
{
WorkItem item = null;
lock (this._workitems)
{
if (this._workitems.Count > 0) item = this._workitems.Dequeue();
}
if (item == null) continue;
try
{
item.Execute();
}
catch (Exception)
{
//
// ToDo: exception handler
//
}
if (this._cancel_flag == true) break;
}
if (this._stop_flag == true || this._cancel_flag == true) break;
if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
break;
}
this._workerThreads.Remove(Thread.CurrentThread);
}
每個 worker thread 就只作很簡單的一件事, 就是進入無窮迴圈, 只要開始上班就不段的接工作來處理, 一直到下班為止. 整個最外層的 while loop 就是指這部份. 離開 loop 後就代表這個 worker thread 該下班了.
迴圈內也很簡單, 上工的第一件事就是看 job queue 裡有沒有工作要做? 有就 dequeue 一個來處理, 一直重複到 job queue 空了為止, 或是直到老闆下令關店 (_cancel_flag
為 true).
無論是要關店或是工作做完了, 流程會跳離 line 6 ~ 27 這個 while loop. 後序的關鍵在 line 30:
if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
呼叫 WaitHandle
的 WaitOne( )
method, 會讓 worker thread 進入 blocked 狀態. 直到被叫醒為止 (叫醒它的 code 寫在 add queue 裡), 或是 idle timeout 時間到了. .NET API WaitHandle.WaitOne( )
提供 option 指定 timeout 時間, 至於是被叫醒的 or 時間到了自己醒來, 就靠 return value 來判定. 以這段 code 來看, 被叫醒 (return true) 代表有新工作進來, 就執行 continue 指令, 繼續到 job queue 拿新的工作繼續努力, 如果是睡太飽自己醒的, 就執行 break, 準被收拾東西下班去…
整個 worker thread 的生命周期就是靠這段 code 來運作. 接下來看一下如何把 job 加進來:
private List<Thread> _workerThreads = new List<Thread>();
private Queue<WorkItem> _workitems = new Queue<WorkItem>();
private ManualResetEvent enqueueNotify = new ManualResetEvent(false);
public bool QueueUserWorkItem(WaitCallback callback, object state)
{
if (this._stop_flag == true) return false;
WorkItem wi = new WorkItem();
wi.callback = callback;
wi.state = state;
if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();
this._workitems.Enqueue(wi);
this.enqueueNotify.Set();
return true;
}
扣掉一大半準備 WorkItem 的 code 之外, 剩下的就是把 workitem 加到 queue 裡了. 兩個關鍵的地方是:
if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();
如果 job queue 堆的工作超過 0 個, 而總共的 worker thread 數量還沒超過上限, 就呼叫 CreateWorkerThread( )
再叫一個 worker thread 來幫忙.
line 14 把 work item 加到 queue 之後, line 15 就緊接著呼叫 WaitHandle.Set( )
, 通知所有正卡在 WaitOne( )
睡覺中的 worker thread 該醒來工作了. 其實到這裡, thread pool 主要結構都說明完了, 剩下的都是細部實作, 比如如何封裝 job 的物件, 如何得知共有幾個 worker thread 等等, 這些直接看 code 比較快我就不多說明了. 搭配前一篇, 提到有各種 synchrinization 機制可以使用, 這裡我用的是 ManualResetEvent
, 為什麼要挑這個? 先弄清楚觀念上的問題: 假設有五個 worker thread 都睡著等待新的工作進來, 這時只有一個新的工作進來, 到底是誰該醒來作事? 是由誰決定?
說明起來又是一大篇了… 改寫第三篇再繼續吧!