1. ThreadPool 實作 #3. AutoResetEvent / ManualResetEvent

    續上篇, 從眾多閒置的 worker thread 挑選一個起來接工作有兩種策略作法. 一種作法是 Thread Pool 自己決定, 最基本的就是誰等最久就叫誰起來, 或是 Thread Pool 有自己的演算法挑一個最菜的 worker thread 來做工都可以… 另一種作法就是不管它, 每個 worker thread 都靠運氣, 交給上天 (OS) 決定, 看誰搶到下一個 job. 看起來第一種好像比較好, 事實上不見得. 每個 thread 之間的排程是個學問, OS 多工的效率好不好就看這個. 舉例來說, 如果每個 worker thread 的優先順序不同, 或是某些 thread 正好碰到 GC, 或是正好被移到 virtaual memory 等等, 硬去叫它起來工作反而要花更多的時間. 而這些資訊都在 OS 的排程器裡才有足夠的資訊可以判斷, 以寫 AP 的角度很難顧級到這個層面. 這時最好的辦法就是不管它, 用齊頭式的平等, 把選擇權交給 OS 決定.

    又是一個說起來比 code 多的例子. 這兩種不同的策略, 寫成 code 其實只差一行… 就是選用 AutoResetEvent 跟 ManualResetEvent 的差別而以. .NET SDK 的 Class Reference 上這樣寫著:

    AutoResetEvent: Notifies a waiting thread that an event has occurred. ManualResetEvent: Notifies one or more waiting threads that an event has occurred.

    真正寫成 Code 來測試一下…

    static void Main(string[] args)
    {
        for (int count = 0; count < 5; count++)
        {
            Thread t = new Thread(new ThreadStart(ThreadTest));
            t.Start();
        }
        Thread.Sleep(1000);
        wait.Set();
        Thread.Sleep(1000);
        wait.Set();
        Thread.Sleep(1000);
        wait.Set();
        Thread.Sleep(1000);
        wait.Set();
        Thread.Sleep(1000);
        wait.Set();
    }
     
    private static AutoResetEvent wait = new AutoResetEvent(false);
    
    private static void ThreadTest()
    {
        Console.WriteLine("Thread[{0}]: wait...", Thread.CurrentThread.ManagedThreadId);
        wait.WaitOne();
        Console.WriteLine("Thread[{0}]: wakeup...", Thread.CurrentThread.ManagedThreadId);
    }
    

    執行結果:

    Thread[ 3 ]: wait...
    Thread[ 5 ]: wait...
    Thread[ 4 ]: wait...
    Thread[ 6 ]: wait...
    Thread[ 7 ]: wait...
    Thread[ 3 ]: wakeup...
    Thread[ 4 ]: wakeup...
    Thread[ 6 ]: wakeup...
    Thread[ 5 ]: wakeup...
    Thread[ 7 ]: wakeup... 
    

    程式過程中我加了幾個 Sleep, 首先我用同一個 AutoResetEvent, 讓五個 thread 都去等待同一個 notify event 來叫醒它. 而 AutoResetEvent 一次只能叫醒一個被 WaitOne blocked 住的 thread. 就是第一種先到先贏的作法, 後面幾行 wakeup 的 message 每隔一秒會跳一行出來.

    再來看一下 ManualResetEvent …

    static void Main(string[] args)
    {
        for (int count = 0; count < 5; count++)
        {
            Thread t = new Thread(new ThreadStart(ThreadTest));
            t.Start();
        }
     
        Thread.Sleep(1000);
        wait.Set();
    }
     
    private static ManualResetEvent wait = new ManualResetEvent(false);
    
    private static void ThreadTest()
    {
        Console.WriteLine("Thread[{0}]: wait...", Thread.CurrentThread.ManagedThreadId);
        wait.WaitOne();
        Console.WriteLine("Thread[{0}]: wakeup...", Thread.CurrentThread.ManagedThreadId);
    }
    

    執行結果:

    Thread[ 3 ]: wait...
    Thread[ 4 ]: wait...
    Thread[ 5 ]: wait...
    Thread[ 6 ]: wait...
    Thread[ 7 ]: wait...
    Thread[ 5 ]: wakeup...
    Thread[ 4 ]: wakeup...
    Thread[ 6 ]: wakeup...
    Thread[ 3 ]: wakeup...
    Thread[ 7 ]: wakeup... 
    

    除了把型別宣告從 AutoResetEvent 換成 ManualResetEvent 之外, 其它都沒變. 當然 line 10 一次就能叫醒所有的 thread, 所以後面四次 Set( ) 我就直接刪掉了. 程式 run 到 line 10, 後面五行 wakeup 的訊息就會一次全出現, 而出現的順序是隨機的, 每次都不大一樣.

    這種作法的解釋, 是一次 Set( ), 卡在 WaitOne( ) 的五個 thread 就全被叫醒了. 而這個現象如果套用在 SimpleThreadPool 的實作上, 它的作用相當於第二種作法. 一瞬間把所有的 worker thread 從 blocked 狀態移到 waiting 狀態. 而到底是那一個 thread 有幸第一個被 OS 移到 running 狀態? 就是根據 OS 自己的排程策略而定. 第一個移到 running 狀態的 thread 通常就能搶到 job queue 裡的工作, 剩下的沒搶到, 則又會因為沒有工作好做, 再度進入閒置狀態, 等待下一次機會再一起來碰一次運氣…

    就這一行, 花了最多篇幅來說明, 因為它最抽象. 說明這段的目的, 如果你的 ThreadPool 要更進階一點, 如果你想要改用先排隊先贏的策略, 把 WaitHandle 的型別改成 AutoResetEvent 就好. 如果你希望根據工作的特性來微調每個 thread 的 priority, 你就必需用 ManualResetEvent.

    好, 沒想到一百行左右的 SimpleThreadPool 有這麼多東西可以寫, 完整的 code 我直接貼在底下, 歡迎引用. 好用的話記得給個回應. 要用在你的 project 也歡迎, 只要禮貌性的支會我一聲. 讓我知道我寫的 code 被用在什麼地方就好. 寫到這裡總算告一段落. 謝謝收看 [:D]

    – 完整的 SimpleThreadPool.cs 原始碼:

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    using System.Diagnostics;
     
    namespace ChickenHouse.Core.Threading
    {
        public class SimpleThreadPool : IDisposable
        {
            private List<Thread> _workerThreads = new List<Thread>();
     
            private bool _stop_flag = false;
            private bool _cancel_flag = false;
     
            private TimeSpan _maxWorkerThreadTimeout = TimeSpan.FromMilliseconds(3000);
            private int _maxWorkerThreadCount = 0;
            private ThreadPriority _workerThreadPriority = ThreadPriority.Normal;
     
            private Queue<WorkItem> _workitems = new Queue<WorkItem>();
            private ManualResetEvent enqueueNotify = new ManualResetEvent(false);
     
            public SimpleThreadPool(int threads, ThreadPriority priority)
            {
                this._maxWorkerThreadCount = threads;
                this._workerThreadPriority = priority;
            }
     
            private void CreateWorkerThread()
            {
                Thread worker = new Thread(new ThreadStart(this.DoWorkerThread));
                worker.Priority = this._workerThreadPriority;
                this._workerThreads.Add(worker);
                worker.Start();
            }
     
            public bool QueueUserWorkItem(WaitCallback callback)
            {
                return this.QueueUserWorkItem(callback, null);
            }
     
            public bool QueueUserWorkItem(WaitCallback callback, object state)
            {
                if (this._stop_flag == true) return false;
     
                WorkItem wi = new WorkItem();
                wi.callback = callback;
                wi.state = state;
     
                if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) CreateWorkerThread();
     
                this._workitems.Enqueue(wi);
                this.enqueueNotify.Set();
     
                return true;
            }
     
            public void EndPool()
            {
                this.EndPool(false);
            }
     
            public void CancelPool()
            {
                this.EndPool(true);
            }
     
            public void EndPool(bool cancelQueueItem)
            {
                if (this._workerThreads.Count == 0) return;
     
                this._stop_flag = true;
                this._cancel_flag = cancelQueueItem;
                this.enqueueNotify.Set();
     
                do
                {
                    Thread worker = this._workerThreads[0];
                    worker.Join();
                    this._workerThreads.Remove(worker);
                } while (this._workerThreads.Count > 0);
            }
     
            private void DoWorkerThread()
            {
                while (true)
                {
                    while (this._workitems.Count > 0)
                    {
                        WorkItem item = null;
                        lock (this._workitems)
                        {
                            if (this._workitems.Count > 0) item = this._workitems.Dequeue();
                        }
                        if (item == null) continue;
     
                        try
                        {
                            item.Execute();
                        }
                        catch (Exception)
                        {
                            //
                            //  ToDo: exception handler
                            //
                        }
     
                        if (this._cancel_flag == true) break;
                    }
     
                    if (this._stop_flag == true || this._cancel_flag == true) break;
                    if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
                    break;
                }
     
                this._workerThreads.Remove(Thread.CurrentThread);
            }
     
            private class WorkItem
            {
                public WaitCallback callback;
                public object state;
     
                public void Execute()
                {
                    this.callback(this.state);
                }
            }
     
            public void Dispose()
            {
                this.EndPool(false);
            }
        }
    }
    

    2007/12/17 系列文章: Thread Pool 實作 .NET 作業系統 多執行緒 技術隨筆

  2. ThreadPool 實作 #2. 程式碼 (C#)

    既然上一篇都把 pseudo code 寫出來了, 現在就可以開始來寫真正的 Thread Pool 了. 開始之前, 我先把目標定一下. 這次寫的 Thread Pool 必需俱備這些能力:

    1. 要能由使用者控制 thread pool 的組態:
      • worker thread 數量上限
      • worker thread 優先權
      • thread idle timeout 時間 (超過 idle timeout, 代表 thread 是宂員, 可以下台了)
      • job queue 安全範圍 (超過代表需要找幫手 - 建立新的 worker thread)
    2. thread pool 在 job queue 超過安全範圍時, 要能動態建立新的 thread 來消化 queue 裡的工作
    3. worker thread 在 idle 時間超過 idle timeout 時, 則這個 worker thread 就要被回收
    4. 簡單的同步機制, 要能等待 thread pool 處理完所有的 job.
    5. 如果有多個 worker thread 要搶同一個 job 來執行, 要由 OS 決定, 不要由 thread pool 自己決定

    每次在寫這些描述, 都會覺的怎麼寫起來比 code 還多… @_@, 沒錯, code 短到我可以直接貼上來, 不需要附檔案.. 我會把完整的 code 貼在最下方. 其它說明的部份只會貼片段.

    首先, 先來決定 SimpleThreadPool 的 class define 為何. 依照需求及我希望它用起來的樣子, 為:

    public class SimpleThreadPool : IDisposable
    {
    	public SimpleThreadPool(int threads, ThreadPriority priority)
    	{
    	}
    
    	public bool QueueUserWorkItem(WaitCallback callback)
    	{
    	}
    
    	public bool QueueUserWorkItem(WaitCallback callback, object state)
    	{
    	}
    
    	public void EndPool()
    	{
    	}
    
    	public void CancelPool()
    	{
    	}
    
    	public void EndPool(bool cancelQueueItem)
    	{
    	}
    
    	private void DoWorkerThread()
    	{
    	}
    
    	public void Dispose()
    	{
    		this.EndPool(false);
    	}
    
    	// 略...
    
    

    這個 ThreadPool 我希望它用起來像這樣, 貼一段理想中的用法 sample code:

    SimpleThreadPool stp = new SimpleThreadPool(2, System.Threading.ThreadPriority.BelowNormal);
    
    for (int count = 0; count < 25; count++)
    {
        stp.QueueUserWorkItem(
            new WaitCallback(ShowMessage),
            string.Format("STP1[{0}]", count));
        Thread.Sleep(new Random().Next(500));
    }
    Console.WriteLine("wait stop");
    stp.EndPool();
    

    ThreadPool 想像成一個服務櫃台, 很多人排隊等著處理. 因此整個實作會像是個工作的佇列 (job queue), 只要把你的工作放到 queue 裡 (排隊), 而服務人員 (worker thread) 就會一個一個的處理. 最後你可以決定要把所有工作做完才收攤 (呼叫 EndPool(), 會 blocked 直到工作清光), 或是決定掛牌 “明日請早” (呼叫 CancelPool()), 只把作到一半的工作處理掉, 剩下還在排隊的改天再來.

    整個實作的關鍵部份是在 private void DoWorkerThread(), 裡面寫的 code 就是每一個 worker thread 要執行的所有內容. 補上實作的 code:

    private void DoWorkerThread()
    {
        while (true)
        {
            while (this._workitems.Count > 0)
            {
                WorkItem item = null;
                lock (this._workitems)
                {
                    if (this._workitems.Count > 0) item = this._workitems.Dequeue();
                }
                if (item == null) continue;
     
                try
                {
                    item.Execute();
                }
                catch (Exception)
                {
                    //
                    //  ToDo: exception handler
                    //
                }
     
                if (this._cancel_flag == true) break;
            }
     
            if (this._stop_flag == true || this._cancel_flag == true) break;
            if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
            break;
        }
     
        this._workerThreads.Remove(Thread.CurrentThread);
    }
    

    每個 worker thread 就只作很簡單的一件事, 就是進入無窮迴圈, 只要開始上班就不段的接工作來處理, 一直到下班為止. 整個最外層的 while loop 就是指這部份. 離開 loop 後就代表這個 worker thread 該下班了.

    迴圈內也很簡單, 上工的第一件事就是看 job queue 裡有沒有工作要做? 有就 dequeue 一個來處理, 一直重複到 job queue 空了為止, 或是直到老闆下令關店 (_cancel_flag 為 true).

    無論是要關店或是工作做完了, 流程會跳離 line 6 ~ 27 這個 while loop. 後序的關鍵在 line 30:

    if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
    

    呼叫 WaitHandleWaitOne( ) method, 會讓 worker thread 進入 blocked 狀態. 直到被叫醒為止 (叫醒它的 code 寫在 add queue 裡), 或是 idle timeout 時間到了. .NET API WaitHandle.WaitOne( ) 提供 option 指定 timeout 時間, 至於是被叫醒的 or 時間到了自己醒來, 就靠 return value 來判定. 以這段 code 來看, 被叫醒 (return true) 代表有新工作進來, 就執行 continue 指令, 繼續到 job queue 拿新的工作繼續努力, 如果是睡太飽自己醒的, 就執行 break, 準被收拾東西下班去…

    整個 worker thread 的生命周期就是靠這段 code 來運作. 接下來看一下如何把 job 加進來:

    private List<Thread> _workerThreads = new List<Thread>();
    private Queue<WorkItem> _workitems = new Queue<WorkItem>();
    private ManualResetEvent enqueueNotify = new ManualResetEvent(false);
     
    public bool QueueUserWorkItem(WaitCallback callback, object state)
    {
        if (this._stop_flag == true) return false;
     
        WorkItem wi = new WorkItem();
        wi.callback = callback;
        wi.state = state;
     
        if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();
     
        this._workitems.Enqueue(wi);
        this.enqueueNotify.Set();
     
        return true;
    }
    

    扣掉一大半準備 WorkItem 的 code 之外, 剩下的就是把 workitem 加到 queue 裡了. 兩個關鍵的地方是:

    if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();
    

    如果 job queue 堆的工作超過 0 個, 而總共的 worker thread 數量還沒超過上限, 就呼叫 CreateWorkerThread( ) 再叫一個 worker thread 來幫忙.

    line 14 把 work item 加到 queue 之後, line 15 就緊接著呼叫 WaitHandle.Set( ), 通知所有正卡在 WaitOne( ) 睡覺中的 worker thread 該醒來工作了. 其實到這裡, thread pool 主要結構都說明完了, 剩下的都是細部實作, 比如如何封裝 job 的物件, 如何得知共有幾個 worker thread 等等, 這些直接看 code 比較快我就不多說明了. 搭配前一篇, 提到有各種 synchrinization 機制可以使用, 這裡我用的是 ManualResetEvent, 為什麼要挑這個? 先弄清楚觀念上的問題: 假設有五個 worker thread 都睡著等待新的工作進來, 這時只有一個新的工作進來, 到底是誰該醒來作事? 是由誰決定?

    說明起來又是一大篇了… 改寫第三篇再繼續吧!

    2007/12/17 系列文章: Thread Pool 實作 .NET 作業系統 多執行緒 技術隨筆

  3. ThreadPool 實作 #1. 基本概念

    既然都花了力氣回憶起過去學的 ThreadPool Implementation, 而且都用 C# 寫好了, 不如就整理一下好了. 其實寫起來 code 真的都不難, 難的是人腦天生就不適合思考這種 multithreading 的東西, 想多了腦筋真的會打結. 另外一個障礙是有些東西要唸過 Operation System 才會懂, 沒這基礎的話, 光看 API 說明會一個頭兩個大…

    這篇還不會貼完整的 code, 先把必要的基礎及認知說明一下. ThreadPool 的概念其實很簡單, 這 design pattern 目的是把過去的 multi-threading programming model 簡化, 把複雜的 threads control 拆到 thread pool implementation 裡封裝起來, 使用它的人只要把你的 job 封裝成一個 job object, 丟到 pool 裡面代為執行就可以了. 然後裡面就套用 “生產者 / 消費者” 的模式, User 不斷的生出 job 給 thread pool, 而 thread pool 不斷的消化掉 (執行 job) 它. 實作這些東西要面臨到的課題, 有這幾項:

    1. 基本的 thread sync 機制
    2. thread pool 內部的 thread 管理, thread 動態建立 / 回收機制
    3. 封裝 job, job queue

    先從最抽像的 (1) 來說好了. 這是過去作業系統 (OS) 這門課, 特地花了一整章來說明的課題. 當年第一次碰到用 java 實作 thread pool 時, 我還特地把課本挖出來再看一次… @_@, 不過搬了兩次家, 課本也不曉得塞到那去了, 哈… 印像中記得裡面有 OS 管理下的 process 生命周期 state machine:

    圖片來源: http://en.wikipedia.org/wiki/Process_states

    中間三個狀態是主要的部份, running 當然就是指執行中, 而要等待別人喚醒則是進入 blocked 狀態, block 狀態時是不會被 OS 分配到任何 cpu time slice 的. 等到被喚醒後, 並不是直接跳到 running, 而是跳至 waiting, 等待 OS 把下一段 cpu time slice 分給這個 thread 時, 它就會再度進到 running 狀態.

    所謂的 synchronization, 就是指在 multi-threading 環境下, 每個 thread 各跑各的, 完全不管其它 thread 在作什麼. 但是在某些特殊情況下, 是要 threads 之間互相調整腳步的. 舉例來說, FlashGet 大家都用過吧? 它一次開好幾個 thread 下載同個檔案的不同部份. 那些情況需要 synchrinization ? 舉個 use case:

    下載到一半想停掉, UI thread 接受了 user 的指令 (按按鈕), 每個 thread 必需適當的中止目前的動作及存檔. UI thread 必需 “等待” 每個 thread 完成後才能回報 user 停止的指令已經完成.

    在 Multi-threading 的環境下, 千萬不要再去用老掉牙的同步方式 (啥? 就是用 global variable, 然後用 loop 一直去檢查), 正規的用法就是用 OS 提供的 synchronization 機制, 一邊 wait, 另一邊去叫醒它. 對照上面的圖來說, 也就是讓 thread 進到 blocked 狀態. 在 Java 裡就是 Thread 的 Notify 相關 method, 在 .NET 則是包裝成 WaitHandle 物件. 以這種最基本的 wait / notify, 在 .NET 可以用 ManualResetEvent 來達成. 簡單的寫兩段 code, 用起來像這樣:

    // thread 1: 等著被喚醒
    wait_handle.WaitOne( );
     
    // thread 2: 喚醒 thread 1
    wait_handle.Set( );
    

    更複雜的例子, 可以用其它不同型態的 WaitHandle 來達成. 在 .NET 是把所有這種用於同步機制的物件都抽像化成 System.Threading.WaitHandle, 它是 abstract class, 你要找一個合適的衍生類別來用. 這些機制原則上一定要靠 OS 的 API 才能提供, 請別異想天開自己搞一個…  列幾個常用的:

    1. AutoResetEvent: 叫醒單一個 wait thread
    2. ManualResetEvent: 叫醒多個 wait thread(s)
    3. Semaphore (旗標): 只允許有限數量的 thread 執行某段程式. 再舉 FlashGet 的例子, 如果某個網站只限最多 3 threads download, 就可以用 Sempahore.

    其它還有一些, Mutex, Monitor, SpinLock… 就不一一說明了, 直接去翻 OS 課本.. [H]

    為什麼花這麼多篇幅講這個? 因為這短短一兩行的 code, 可是控制 thread pool 運作的重要關鍵. Thread Pool 需要一個 Queue 來存放待處理的工作. ThreadPool 同時也要 “養” 數個 threads 來處理掉 Queue 裡面的工作. 正常情況下每個 thread 忙完後就到 Queue 再拿一個工作出來, 如果 Queue 空了, thread 就可以休息 (blocked). 如果 Queue 有新工作進來, 這些睡著 (blocked) 的 thread 就應該要醒來繼續處理堆在 queue 裡的工作.

    這是 Thread Pool 的基本型, 通常會用 thread pool 有幾個理由:

    1. 要處理的工作數量很多. 不可能用最古董的作法: 每個工作建一個 thread, 做完就丟掉...  (因為 thread create / delete, OS 是要花成本的, 同時 thread 太多也會造成效能及回應時間下降)
    1. 工作是不斷的持續的產生, 需要有 thread 事先在那邊等著接工作來做, 降低回應時間.
    1. 工作的性質適合以有限的 thread 來處理時
    

    最典型的例子就是 ASP.NET. ASP.NET Hosting 會養一堆 thread, 來服務前方 IIS 送來的一堆 request. 即使 CPU 只有一顆, 多個 thread 也可以有降低回應時間的好處. 記得照 MSDN 的建議, 一個 CPU 建議值是開 25 threads 的樣子… 因此會有一些變型, 以求效能更好一點. 通常 thread 的建立 / 回收, 很花時間. 養一堆 thread 也很花資源. 因此考量的重點都放在怎樣才不會重複建立/回收 thread, 怎樣才不會養太多不工作的 thread … 歸納一下:

    1. 現有 threads 不夠 (或未達上限), 而還有工作還卡在 Queue 裡沒人處理, 就建立新的 thread 加進來幫忙.
    1. 如果工作都做完了, 多餘的 thread 就可以讓它發呆. 發呆太久的 "宂員" 就可以把它 fired 了... 判定的依據一般都用 idle timeout. 當然也有不同的策略, 那就不管了.
    

    看起來很囉唆, 其實想通之後, 就像 recursive 一樣, 寫起來很簡潔, 多寫兩行都會覺的累贅… 我把流程用假的 code 整理一下:

    每個 thread 運作的 body 就像這樣:

      while (true)
      {
          //
          //  從 queue 裡找 job 來做, 直到做完為止.
          //
       
          //
          //  idle.
          //
          if (超過IDLE時間 == true) break;
      }
    

    另外, 就是把 Job 加到 Queue 裡的動作要像這樣:

      //
      //  把 Job 加到 Queue
      //
       
      if (Job太多)
      {
          //
          //  多建立一些 thread 來幫忙
          //
      }
       
      if (Idlethread)
      {
          //
          //  叫醒 thread 來工作
          //
      }
    

    上面兩段 code 關鍵就在如何讓 thread idle ? 如何判定 idle 超過某段時間? 另外就是如何叫醒 idle 的 thread? 答案其實就是用上面講的 synchronization 的機制來做. 這些 code 搞定後, 包裝在一起, thread pool 其實就完成了. 很簡單吧? 哈哈… 實際的 code 等下篇再說… 正好寫第二篇的時間, 就讓大家想一想到底該怎麼寫… [H], 敬請期待下集!

    2007/12/14 系列文章: Thread Pool 實作 .NET 多執行緒 技術隨筆

  4. 平行處理的技術演進

    現在 CPU 廠商大打 "多核心" 的口號, 讓大家都知道多核心的好處了, 不過每個評論的人也都會補上一句, "要有專為多核心設計的軟體才能發恢效能". 到底什麼叫作專為多核設計的軟體?

    簡單的說, 就是程式不能再以單一流程來思考, 必需引用平行處理的概念. 就像工作分派一樣, 有十個人幫你做事, 一定比一個人好. 不過這也是考驗你分派及管理的能力. 做的不好, 可能工作還是都只有一個人在做, 另外九個在偷懶, 更糟的是還造成溝通的問題, 比只有一個人還糟.

    在程式設計的領域裡, 實現平行處理, 它的困難有幾個:

    1. 並行的流程控制
    2. 多個程序之間的資料交換
    3. Critial Section 問題 (某些絕對不能同時執行的程式片段)
    4. 資料 Lock 及 Racing condition 的問題

    早期的 Unix 提供的 fork( ) 就是個典型的例子, 呼叫後有兩份一模一樣的程式一起執行, 你要自己想辦法分出誰是誰, 然後讓它們各自執行. 兩個程式怎麼溝通? 只能靠 IPC (Inter-Process Communication), 方式不外呼開 socket 或是 share memory, 互相等待要靠 signal( )... 總之你大概會有 80% 以上的精力是在解決這些問題, 不是在解決你要處理的問題.

    後來較新的 OS 紛紛引入了 thread 的關念, 解決了部份 IPC 問題, 其它的還是一樣困難. 直到 Java 出來, 寫 multi-threading 程式就簡單多了. 到了 .net 3.5, 又更進一步, 就是我這篇要講的主題.

    在 MSDN Magazine 看到一篇文章, 覺的還不錯, 就貼上來跟大家分享一下心得. 原文在此:
    為多重核心電腦最佳化 Managed 程式碼
    http://msdn.microsoft.com/msdnmag/issues/07/10/Futures/default.aspx?loc=zx

    細節我就不多討論了, 那篇文章裡面都說明的非常清楚, 很棒的一篇文章. 主要的重點是, 即使用了 Java / .NET 這樣的開發環境, 你仍然要面對許多 threading 的問題, 像是 thread 如何開始, 如何結束, 這個 thread 如何去通知另一個 thread 完成的問題. 但是大部份的情況下, 我只不過是有一堆工作, 想要丟給電腦處理, 最好就是所有可用 CPU 通通叫來幫忙...

    這篇文章介紹的 TPL ( Task Parallel Library ) 很巧妙的利用 delegate, 把整套 Library 包裝的像一個 for loop 一樣簡單. 基本的觀念就是, 只要用它提供的類似 for loop 寫法改寫你的程式, 原本 loop 裡要執行 100 次的工作, 現在就會自動的分配到你所有的 CPU 一同執行. 聽起來很酷, 真的用起來也是如此. 我簡單貼一下內文兩段 sample code:

       1:  // 一般的迴圈
    
       2:  for (int i = 0; i < 10; i++) {
    
       3:      a[ i ] = a[ i ] * a[ i ];
    
       4:  }
    
       5:   
    
       6:  // 改用 TPL 的迴圈
    
       7:  Parallel.For(0, 100, delegate(int i) {
    
       8:      a[ i ] = a[ i ] * a[ i ];
    
       9:  });
    

    上面兩段 code 做的事情都一樣, 就是把陣列 a 的內容, 每一筆都算平方, 再寫回來. 差別在於第一段程式會在同一個 thread 裡依序完成每一筆計算, 而第二個例子則利用 anonymous delegate, 讓 code 看起來像迴圈, 實際上每圈都是執行一次 delegate. 而這個 delegate 會自動透過 task manager 分配到合適的 thread 執行. 它跟 thread pool 有許多不同, 文章內有一些說明... 如此一來就能享用的到多核心 CPU 的好處, 你的每一個核心都會被充份的利用.

    其實這樣的作法, 並不是 Microsoft 或是 .NET 特有的創新, 早在更重視效能的 C/C++ 就有了. Intel 就大力的支持了這個 open source project: http://threadingbuildingblocks.org/ 裡面提供了大量的 C++ template, 也是用類似的方式替 C++ 加入了平行處理的支援.

    ZD Net 上面有一系列的 video (http://www.zdnet.com.tw/white_board/intel/video-8.htm), 講的相當不錯, 我很認同裡面的幾個觀念, 主講人 James Reinders 提到, 平行處理你第一個應該要想到的是函式庫, 或是編譯器等等, 讓你的程式不自覺就能自動享用到平行處理的好處, 第二是用這類 library, 針對各種的 loop 去調整, 讓他能適用平行處理, 最後也是最不建議的作法, 才是大家常聽到的 multi-threading. 原因很簡單, threading 必需是你設計軟體架構就考慮進去的東西, 相對之下開發不易, 效能也不見得比較好, 更糟的是你可能會設計用 4 threads 來處理問題, 結果就是你的程式在超過四核的系統上就沒有明顯的效能增強了.

    昨天晚上仔細的看完這幾篇文章, 果然科技的進步真是神速啊, 過去寫到翻掉的 Multi-Process 程式, 如今一個 For Loop 就解決掉, 沒走過這一段的人應該不能體會吧. 不過也因為這樣, 更能體會這些技術的價值在那裡. TPL 真的是個好東西, 強力推薦大家學一學!!

    --

    文中題到的 TPL 已經有 Tech Preview 可以下載了. 感謝 Unicorn 提供資訊.
    http://www.microsoft.com/downloads/details.aspx?FamilyID=e848dc1d-5be3-4941-8705-024bc7f180ba&DisplayLang=en

    --

    這篇是寫了貼在別的地方, 當然自己的 blog 也要貼一份... [H]

    2007/12/12 .NET 作業系統 多執行緒 技術隨筆

  5. Canon Raw Codec + WPF #2, ThreadPool

    效能問題, 就跟我自己寫的小工具一起講好了. 話說之前 Microsoft 提供了一個很讚的小工具: Resize Pictures Power Toys, 功能超簡單, 大概就是檔案總管把圖檔選一選, 按右鍵選 "Resize pictures" 就好了:

    image

    選了之後就有簡單的對話視窗:

    image

    很簡單吧, 我個人非常愛用, 而且轉出來的效果也不差, 看起來 JPEG quality 大約有 80% ~ 90% 吧... 無耐 windows xp 裡有幾個跟 image 相關的 power toys, 到了 vista 通通不能用. 看來應該都是碰到 GDI+ 要轉換到 WPF 的陣痛期吧, 這幾個小工具還真是讓我繼續撐在 XP 的主要理由之一...

    扯遠了, 所以我的目標就是寫個類似的小工具, 讓我簡單的做批次縮圖就好. 有了上一篇的基礎, 要寫這種 tools 實在是沒什麼挑戰, 大概會寫 winform 的拉兩下就可以收工了...

     

    不過, 大話說太早. 先貼一下成品的畫面, 後面說明比較清楚:

    image

    要做的東西很簡單. 選好一堆圖按右鍵選 resize pictures 後就跳這畫面, 按 Resize 就開始跑. 用的是前一篇弄好的 library. 結果碰到的障礙還不少. 雖然可以跑, 但是看了就很礙眼...

    1. 效能有點糟.
      比較好的架構一定會有額外的效能折損, 我倒可接受. 內建的 JPEG codec 還好, 比不上像 Google Picasa 那樣快速. 但是 canon raw codec 就慘不忍睹... 如果把 raw 轉成同大小的 jpg (4000x3000 pixel), 足足要 60 ~ 80 sec ...
    2. 沒針對多處理器最佳化
      簡單的說, 以我的雙核 CPU (Core2Duo E6300), 跑起來 CPU 利用率只有 5x% 而以.
    3. ThreadPool 也無法解決問題
      因為 (2), 就很直覺的聯想到, 我一次轉兩張, 同樣時間內可以完成兩張的轉檔, 單位時間的運算量還是有提升, 雖然每一張還是要花那麼久... 不過我錯了, 看來是 canon codec 的限制, 開 thread pool 跑下去, 一樣是卡在 60% cpu 使用率左右...
    4. UI thread 問題
      thread pool 也不完全沒有作用. jpeg encode / decode 的部份是可以充份利用到 thread pool 的好處的, 只是 canon raw decode 的部份用不到. 當部份時間是 canon raw decode + jpeg encode / decode 時, 剩餘的 CPU 運算能力還是吃的到. 但是 thread pool 無法控制 priority, pool 裡的 thread 就嚴重的影響到 UI thread 的作業. 常看到的現像就是進度列一直在跑, 不過預覽圖片的控制項卻一直跑不出來

     

    (1) 的問題其實沒這麼嚴重. Microsoft HD Photo 有一個 feature, 就是大檔放在網路上, 你也能夠很快的透過網路看到小圖. 有點類似早期漸進式的 jpeg 那樣. 不過看起來 codec 的設計更好一點. 實驗的結果是, Full Size .CR2 (4000x3000) 存成同大小的 JPEG 檔需要 60sec, 而存成 800x600 只要 5 sec. 但是拿對照組 .JPG (4000x3000) -> .JPG (800x600), 差距又沒這麼大. 因此推測起來, 應該在 decode 階段就已經針對這樣的需求設計過了.

    剩下的問題我試了好幾種方法, 目標都擺在如何安排這堆 thread 在合適的時間做合適的工作. canon codec 就不適合同時丟好幾個 thread 下去跑, 因為完全沒用, 反而拉長每個 .CR2 從開始到輸出的時間. jpeg 的部份就很適合, 因為時間短, 多核的好處也可以藉著多 thread 用的到. 另外 canon codec 因為限制較多, 我需要它以較高的 priority, 並且要在第一時間就開始跑, 才不會拖慢整個轉檔的處理時間... 理想的 task 安排狀況應該要像這樣:

    簡報1

     

    最後找到一個我比較滿意的解, 就是另外寫一個合用的 ThreadPool... @_@

     

    其實我是很不想做重新發明輪子這件事, 不過除此之外實在沒什麼好方法. 所幸 .net 下要自己弄出個 thread pool 也不難. 這一切都要感謝當年在 yam, 當時研發部的一位主管, 交大的學長, 現在跑到 Microsoft 去了, 我們都叫他 "旺旺" .. 他技術能力只能用 "神" 來形容... 當時他在公司內開了門課, 真是印像深刻. 就用 java 示範了如何寫 ThreadPool ... 寫起來還真沒幾行... 扣掉一堆 import (相當於 c# 的 using) 等宣告之類的 code, 整個功能 "完整" 的 thread pool 大概只有一百行左右的 code... 而且 thread 動態 create 跟回收等功能一樣不少... threads 之間同步問題也沒漏掉..

    我歸納了一下我需要的 ThreadPool 到底要什麼功能, 而內建的到底缺什麼... 要怎麼做就很清楚了... 要解決上面的問題, 我大概需要這樣的 thread pool 來支援我的想法:

    1. thread 的數量不需要是動態的, 固定的就夠了. 一次開太多 thread 效果不見得好.
    2. thread 一定要能設定 priority. 因為轉圖檔是 cpu bound 的工作, priority 設低一點對整體的回應時間比較好.
    3. 需要多組 thread pool. 我的想法是用一組專用的 thread pool 來處理 canon raw codec, 只要 1 thread 就夠 (以後 canon 真的改善的話再加大數量). 另外其它 (大部份都是 jpeg codec) 的工作就丟到有 4 threads 的 thread pool 去跑. 至少到四核的 cpu 都還能夠充份的利用到.
    4. 需要簡單的作法, 能夠 wait thread pool 裡的工作全處理完. .net 內建的也可以, 不過必需透過比較麻煩的 WaitHandle 自己去 wait ...

    這些剛好都是我需要, 但是 .NET 內建的 thread pool 做不到的需求. 因此自己寫了個簡單的 SimpleThreadPool .. 介面規格就儘量比照內建的 ThreadPool (因為 code 已經寫好不想改太多 [:P]). 用起來像這樣:

     

       1:          private static void SimpleThreadPoolTest()
    
       2:          {
    
       3:              SimpleThreadPool stp1 = new SimpleThreadPool(2, System.Threading.ThreadPriority.BelowNormal);
    
       4:              SimpleThreadPool stp2 = new SimpleThreadPool(1, System.Threading.ThreadPriority.Lowest);
    
       5:   
    
       6:              stp1.StartPool();
    
       7:              stp2.StartPool();
    
       8:   
    
       9:              for (int count = 0; count < 10; count++)
    
      10:              {
    
      11:                  stp1.QueueWorkItem(
    
      12:                      new WaitCallback(ShowMessage),
    
      13:                      string.Format("STP1[{0}]", count));
    
      14:                  stp2.QueueWorkItem(
    
      15:                      new WaitCallback(ShowMessage),
    
      16:                      string.Format("STP2[{0}]", count));
    
      17:   
    
      18:                  Thread.Sleep(13);
    
      19:              }
    
      20:   
    
      21:   
    
      22:              Console.WriteLine("wait stop");
    
      23:              stp1.EndPool();
    
      24:              stp2.EndPool();
    
      25:          }
    
      26:   
    
      27:   
    
      28:          private static void ShowMessage(object state)
    
      29:          {
    
      30:              Console.WriteLine("ThreadID: {0}, state: {1}", Thread.CurrentThread.ManagedThreadId, state);
    
      31:              Thread.Sleep((new Random()).Next(1000));
    
      32:          }
    

     

    嗯, 功力跟旺旺比差了一點, 不過也是一百出頭行就搞定這個 ThreadPool ... [:D], 接下來就是火力展示了... 因為介面跟內建的 ThreadPool 幾乎一樣. 就簡單測一下 125 JPEG + 20 G9 RAW + 2 G2 RAW files 一起做轉檔時的 CPU 使用率記錄...

    圖一. 用內建的 ThreadPool, 110 sec ( UI 回應正常, 進度列也會跑. 不過礙於 CPU loading 關係, ImageBox 的圖都沒出來)

    image

     

     

    圖二. 改用我自己寫的 SimpleThreadPool, 90 sec. 因為調整過 priority, 每張圖轉完 ImageBox 都會立即顯示出來.

    image

     

    第一張圖, 所有的 job 都依序執行, 簡單的 jpeg 都擠在前段, 那段 cpu 100% 就是這樣來的. 後面就都是 canon decoder 在跑, cpu 大約都維持在 50% 左右, 直到跑完為止.

    而第二張圖, jpeg / canon 都強迫同時一起執行. 而 canon 的 priority 略高於 jpeg. 因此排程的策略是優先執行較慢的 canon decoder, 而剩餘的 cpu 運算能力就拿來處理 jpeg 的部份. 因為 cpu 使用率的統計圖下的面積 (積分) 就是總共需要的運算量. 看的出來維持在 100% 的部份越短, 則總體完成的時間就會拉長... 後面自定的 thread pool 的作法, 不論在 UI 回應, 跟整體處理的時間都比較好. 看來適度的調整 thread 數量, 跟 thread priority 還是很有用的. 不過題外話, thread 再怎麼用, 效果還是不如 lib or compiler level 做的平行處理效果來的好. ZD Net 上有一系列 intel 提供的 video, 講的還不錯. Microsoft 也替 .NET 開發了一套 Library (download), 只要調整一點語法, 就可以把 loop 內的運算轉成平行運算, 這種效果遠比用 thread pool 來的聰明 & 有效. 不過還在 community preview 就暫時不考慮採用了.

    總算, 搞定了 thread pool, 也搞定了 metadata, 幾個主要的障礙都排除了. 兩個要開發的工具也完成了一個 ( image resizer ), 剩下的歸檔程式就剩下納入 video encoder 的部份也就大功告成了. 有力氣的話會再寫一篇吧, 敬請期待 [:D]

    2007/12/12 系列文章: Canon Raw Codec & WPF .NET WPF 作品集 多執行緒