1. 該如何學好 "寫程式" #5. 善用 TRACE / ASSERT

    哈哈,這篇拖的夠久了 :P

    上篇扯太多,寫到一半寫不完就留到這篇了。寫出可靠的程式,這是軟體工程師的基本要求。上篇提到了 TRACE / ASSERT 的應用,來複習一下:

    TRACE: 原本是 C 的除錯用巨集,目的是用適合的方式輸出除錯用的訊息,用來跟一般的訊息輸出有所區別。因為用的是不同的方式輸出,可以很容易的統一關掉。隨著工具的進步,輸出的方式也越來越適合除錯,比如輸出到開發工具的除錯視窗,或是輸出成記錄檔等等。

    ASSERT: 也是除錯用巨集,它接受一個 bool 參數,輸入值為 TRUE 時一切正常,就像沒呼叫一樣,輸入 FALSE 則會中斷程式,或是輸出顯目的警告訊息。目的在於確保程式的每個步驟情況都如預料般的順利。

    這兩個東西從 C 的巨集,衍生出各種語言及環境都有各自的版本。它的目的很簡單,就是 [Writing Solid Code] 裡提到的:

    用同一套程式碼,同時維護兩個版本 (RELEASE / DEBUG),讓錯誤自動跑出來。

    2008/11/03 系列文章: 如何學好寫程式 .NET C# 作品集 專欄 技術隨筆 有的沒的 物件導向

  2. 也是 "生產者 & 消費者" ...

    IMG_3467 (Canon PowerShot G9)

    哈哈,貼一下家裡魚缸的照片... 家裡養的孔雀魚一直生就算了,無意間丟進來的一隻蝸牛,沒兩個月竟然也生了一堆,現在算算大概有四十隻吧 @_@,照片裡紅紅的都是...

    不過有了蝸牛 (消費者),把水裡的魚大便跟水藻都吃的乾乾淨淨的也不錯啦,以前每週要換一次水,現在偷懶撐久一點都無所謂了 :D

    2008/11/01 有的沒的

  3. 關不掉的 Vista UAC !?

    不知道是更新了啥 PATCH,還是那次沒正常關機,我公司 VISTA 的 UAC 突然莫名奇妙的被打開了。怪的是控制台裡看到的還是關掉的,不管怎麼改狀態也不會改變 (一直都是關的) ...。

    直覺告訴我一定是控制台的 AP 那邊出問題,設定值寫不進去造成的...,於是我就開使找其它可以修改 UAC 設定的方法...,最後找到這個,還真的成功了 :D,看來沒機會動用 ProcessMonitor 追追看問題了..

    找到的方法是: msconfig.exe

    在開始 --> 執行裡輸入 msconfig.exe 後,可以看到這一項:

    image

     

    看來是直接修改 registry, 果然有效,直接執行後 REBOOT 就一切正常了 -_-, 如果有人也碰過一樣的問題可以試看看!

    2008/10/31 Tips

  4. 該如何學好 "寫程式" #4. 你的程式夠 "可靠" 嗎?

    撐了很久,續篇來了。這次要進階一點,直接從 software engineer (軟體工程師) 的階段開始。

    所謂的軟體工程師,我對它的定義是在這個領域已經算是資深人員了。programmer 該作的是把程式寫好,要挑正確的方式及技術寫好你的程式 (如之前幾篇介紹的演算法及資料結構等等)。而軟體工程師呢? 之前介紹的那些已經不夠了,你該好好的安排你的 code 及工具,要能把你的 solution (如會用到的演算法及資料結構),跟你手上能運用的資源 (如程式語言、開發工具及函式庫) 作最佳化的搭配及整合。

    2008/10/20 系列文章: 如何學好寫程式 .NET C# Tips 作品集 專欄 技術隨筆 有的沒的

  5. 生產者 vs 消費者 - BlockQueue 實作

    過去寫了好幾篇跟執行緒相關的文章,講的都是如何精確控制執行緒的問題。不過實際上有在寫的人就知道,那些只是 "工具",最重要的還是你該怎樣安排你的程式,讓它能有效率的用到執行緒的好處,那才是重點。大部份能有效利用到多執行緒的程式,大都是大量且獨立的小動作,可以很簡單的撒下去給ThreadPool處理,不過當你的程式沒辦法這樣切,就要想點別的辦法了。

     

    開始看 code 前先講講簡單的概念。這篇要講的是另一種模式: "生產者 v.s. 消費者"。這是個很典型的供需問題,唸過作業系統 (Operation System) 的人應該都被考過這個課題吧 @_@。簡單的說如果你的程式要處理的動作可以分為 "生產者" (產生資料,載入檔案,或是第一階段的運算等等) 及消費者 (匯出資料,或是第二階段的運算等等) 這種模式,而前後兩個階段各自又適合用執行緒來加速的話,那你就值得來研究一下這種模式。第一手資料就是去看看作業系統的書,恐龍書足足有一整章在講,足夠你研究了。本篇重點會擺在怎樣用 C# / .NET 實作的部份。

     

    舉個具體一點的例子,如果你想寫個程式,從網站下載幾百個檔案,同時要把它們壓縮成一個 ZIP 檔,在過去你大概只能全部下載完之後,再開始ZIP的壓縮動作。第一階段都是 IO (網路) bound 的程式,第二階段則是 CPU bound。如果是完全獨立的兩個程式,很適合擺在一起執行,因為它們需要的資源不一樣,不會搶來搶去。但是就敗在他們要處理的資料是卡在一起的。

     

    把這個動作想像成我們有兩組人分別負責下載及壓縮的動作,下載的部份可以多執行緒同時進行沒問題,但是下載好一個檔案,就可以先丟給後面的那組人開始壓縮了,不用等期它人下載完成。如果下載的暫存目錄空間有限,我們甚至可以這樣調整: 當 TEMP 滿了的話,下載動作就暫停,等到 TEMP 裡的東西壓縮好清掉一部份後再繼續。而壓縮的部份則相反,如果 TEMP 已經空了就暫停,等到有東西進來再繼續,直到完成為止。

     

    image

    前後兩階段該如何利用多執行緒,我就跳過去了,過去那幾篇就足以應付。這種模式的關鍵在於前後兩階段的進度該如何平衡。有些範例是有照規矩的把這模式實作出來,不過... 你也知道,看起來就是像作業的那種,完全不像是可以拿來正規的用途。

     

    我認定 "好" 的實作,是像 System.Collections.Generics 之於 DataStructure 那樣,能很漂亮的把細節封裝起來,很容易重複利用的才是我認為好的實作。不能容易的使用,那就只能像作業一樣寫完就丟...。這個問題看過有人用 Semaphore 來做,也是作的很棒,不過我比較推薦的是 QUEUE 的作法。

     

    從上圖來看,生產者跟消費者都很簡單,就是過去講的多執行緒或是執行緒集區就搞定,關鍵在於中間的控制。我的想法是把庫存管理的東西實作成佇列 (QUEUE),生產者產出的東西就放到 QUEUE,而消費者就去 QUEUE 把東西拿出來。不過現成的 QUEUE 不會告訴你 QUEUE 滿了,QUEUE 空了也只會丟 EXCEPTION 而以。這次我做了個 BlockQueue 就是希望解決這個問題。

     

    我期望這個 QUEUE 能跟一般的 QUEUE 一樣使用,但是要有三個地方不一樣:

    1. 要設定大小限制,當 QUEUE 達到容量上限時 EnQueue 的動作會被暫停 (Block),而不是丟出例外。
    2. QUEUE 已經空了的時後,DeQueue 的動作會被暫停 (Block),而不是丟出例外。
    3. 要多個 QUEUE 關機的動作 (SHUTDOWN),以免生產者都不出貨了,消費者還苦苦的等下去的窘況。

     

    先看看這樣的 QUEUE 我希望它怎麼被使用。看一下簡單的範例程式 (主程式,不包含 BlockQueue):

    使用 BlockQueue 來實作的生產者/消費者範例[copy code]
            public static BlockQueue<string> queue = new BlockQueue<string>(10);        public static void Main(string[] args)        {            List<Thread> ps = new List<Thread>();            List<Thread> cs = new List<Thread>();            for (int index = 0; index < 5; index++)            {                Thread t = new Thread(Producer);                ps.Add(t);                t.Start();            }            for (int index = 0; index < 10; index++)            {                Thread t = new Thread(Consumer);                cs.Add(t);                t.Start();            }            foreach (Thread t in ps)            {                t.Join();            }            WriteLine("Producer shutdown.");            queue.Shutdown();            foreach (Thread t in cs)            {                t.Join();            }        }        public static long sn = 0;        public static void Producer()        {            for (int count = 0; count < 30; count++)            {                RandomWait();                string item = string.Format("item:{0}", Interlocked.Increment(ref sn));                WriteLine("Produce Item: {0}", item);                queue.EnQueue(item);            }            WriteLine("Producer Exit");        }        public static void Consumer()        {            try            {                while (true)                {                    RandomWait();                    string item = queue.DeQueue();                    WriteLine("Cust Item: {0}", item);                }            }            catch            {                WriteLine("Consumer Exit");            }        }        private static void RandomWait()        {            Random rnd = new Random();            Thread.Sleep((int)(rnd.NextDouble() * 300));        }        private static void WriteLine(string patterns, params object[] arguments)        {            Console.WriteLine(string.Format("[#{0:D02}] ", Thread.CurrentThread.ManagedThreadId) + patterns, arguments);        }
    
       1:  public static BlockQueue<string> queue = new BlockQueue<string>(10);
    
       2:  public static void Main(string[] args)
    
       3:  {
    
       4:      List<Thread> ps = new List<Thread>();
    
       5:      List<Thread> cs = new List<Thread>();
    
       6:      for (int index = 0; index < 5; index++)
    
       7:      {
    
       8:          Thread t = new Thread(Producer);
    
       9:          ps.Add(t);
    
      10:          t.Start();
    
      11:      }
    
      12:      for (int index = 0; index < 10; index++)
    
      13:      {
    
      14:          Thread t = new Thread(Consumer);
    
      15:          cs.Add(t);
    
      16:          t.Start();
    
      17:      }
    
      18:      foreach (Thread t in ps)
    
      19:      {
    
      20:          t.Join();
    
      21:      }
    
      22:      WriteLine("Producer shutdown.");
    
      23:      queue.Shutdown();
    
      24:      foreach (Thread t in cs)
    
      25:      {
    
      26:          t.Join();
    
      27:      }
    
      28:  }
    
      29:  public static long sn = 0;
    
      30:  public static void Producer()
    
      31:  {
    
      32:      for (int count = 0; count < 30; count++)
    
      33:      {
    
      34:          RandomWait();
    
      35:          string item = string.Format("item:{0}", Interlocked.Increment(ref sn));
    
      36:          WriteLine("Produce Item: {0}", item);
    
      37:          queue.EnQueue(item);
    
      38:      }
    
      39:      WriteLine("Producer Exit");
    
      40:  }
    
      41:  public static void Consumer()
    
      42:  {
    
      43:      try
    
      44:      {
    
      45:          while (true)
    
      46:          {
    
      47:              RandomWait();
    
      48:              string item = queue.DeQueue();
    
      49:              WriteLine("Cust Item: {0}", item);
    
      50:          }
    
      51:      }
    
      52:      catch
    
      53:      {
    
      54:          WriteLine("Consumer Exit");
    
      55:      }
    
      56:  }
    
      57:  private static void RandomWait()
    
      58:  {
    
      59:      Random rnd = new Random();
    
      60:      Thread.Sleep((int)(rnd.NextDouble() * 300));
    
      61:  }
    
      62:  private static void WriteLine(string patterns, params object[] arguments)
    
      63:  {
    
      64:      Console.WriteLine(string.Format("[#{0:D02}] ", Thread.CurrentThread.ManagedThreadId) + patterns, arguments);
    
      65:  }
    

     

     

     

     

     

    主程式很簡單,你知道怎麼寫多執行緒程式的話那麼一看就懂了。一開始替 Producer / Consumer 各建立三個執行續,而每個 Producer 只作很簡單的事,就是連續生產 30 個字串放到 BlockQueue, 當所有的 Producer thread 都執行完後,會呼叫 queue.Shutdown( ); 通知 QUEUE 已經全部生產完畢。

    Consumer 也很簡單,每個 Consumer 只是去 Queue 拿東西出來,顯示在 Console 上。直到 Dequeue 動作失敗,接到 Exception 之後就結束。

    要試試生產者/消費者模式的各種狀況,可以試著調整兩者的執行緒數量。舉例來說,調大 Producer 執行緒數量時 (P: 10 / C:5),結果是這樣:

    image

    Producer 的進度大約就是領先 Consumer 的進度 10 筆資料左右,領先的幅度就暫停了,不會無止境的成長下去。證明卡在 QUEUE 內的數量受到控制。接下來再來看看調高 Consumer 的執行緒數量的結果:

    image

    好像 iPhone 上市搶購熱潮一樣 @_@,供不應求,Producer 提供的資料馬上被搶走了...。

     

    效果不錯,看來這樣的實作有達成它的目的。最後來看的是 BlockQueue 的程式碼:

     

    BlockQueue<T> 實作的完整程式碼[copy code]
        public class BlockQueue<T>    {        public readonly int SizeLimit = 0;        private Queue<T> _inner_queue = null;        private ManualResetEvent _enqueue_wait = null;        private ManualResetEvent _dequeue_wait = null;        public BlockQueue(int sizeLimit)        {            this.SizeLimit = sizeLimit;            this._inner_queue = new Queue<T>(this.SizeLimit);            this._enqueue_wait = new ManualResetEvent(false);            this._dequeue_wait = new ManualResetEvent(false);        }        public void EnQueue(T item)        {            if (this._IsShutdown == true) throw new InvalidCastException("Queue was shutdown. Enqueue was not allowed.");            while (true)            {                lock (this._inner_queue)                {                    if (this._inner_queue.Count < this.SizeLimit)                    {                        this._inner_queue.Enqueue(item);                        this._enqueue_wait.Reset();                        this._dequeue_wait.Set();                        break;                    }                }                this._enqueue_wait.WaitOne();            }        }        public T DeQueue()        {            while (true)            {                if (this._IsShutdown == true)                {                    lock (this._inner_queue) return this._inner_queue.Dequeue();                }                lock (this._inner_queue)                {                    if (this._inner_queue.Count > 0)                    {                        T item = this._inner_queue.Dequeue();                        this._dequeue_wait.Reset();                        this._enqueue_wait.Set();                        return item;                    }                }                this._dequeue_wait.WaitOne();            }        }        private bool _IsShutdown = false;        public void Shutdown()        {            this._IsShutdown = true;            this._dequeue_wait.Set();        }    }
    
       1:  public class BlockQueue<T>
    
       2:  {
    
       3:      public readonly int SizeLimit = 0;
    
       4:      private Queue<T> _inner_queue = null;
    
       5:      private ManualResetEvent _enqueue_wait = null;
    
       6:      private ManualResetEvent _dequeue_wait = null;
    
       7:      public BlockQueue(int sizeLimit)
    
       8:      {
    
       9:          this.SizeLimit = sizeLimit;
    
      10:          this._inner_queue = new Queue<T>(this.SizeLimit);
    
      11:          this._enqueue_wait = new ManualResetEvent(false);
    
      12:          this._dequeue_wait = new ManualResetEvent(false);
    
      13:      }
    
      14:      public void EnQueue(T item)
    
      15:      {
    
      16:          if (this._IsShutdown == true) throw new InvalidCastException("Queue was shutdown. Enqueue was not allowed.");
    
      17:          while (true)
    
      18:          {
    
      19:              lock (this._inner_queue)
    
      20:              {
    
      21:                  if (this._inner_queue.Count < this.SizeLimit)
    
      22:                  {
    
      23:                      this._inner_queue.Enqueue(item);
    
      24:                      this._enqueue_wait.Reset();
    
      25:                      this._dequeue_wait.Set();
    
      26:                      break;
    
      27:                  }
    
      28:              }
    
      29:              this._enqueue_wait.WaitOne();
    
      30:          }
    
      31:      }
    
      32:      public T DeQueue()
    
      33:      {
    
      34:          while (true)
    
      35:          {
    
      36:              if (this._IsShutdown == true)
    
      37:              {
    
      38:                  lock (this._inner_queue) return this._inner_queue.Dequeue();
    
      39:              }
    
      40:              lock (this._inner_queue)
    
      41:              {
    
      42:                  if (this._inner_queue.Count > 0)
    
      43:                  {
    
      44:                      T item = this._inner_queue.Dequeue();
    
      45:                      this._dequeue_wait.Reset();
    
      46:                      this._enqueue_wait.Set();
    
      47:                      return item;
    
      48:                  }
    
      49:              }
    
      50:              this._dequeue_wait.WaitOne();
    
      51:          }
    
      52:      }
    
      53:      private bool _IsShutdown = false;
    
      54:      public void Shutdown()
    
      55:      {
    
      56:          this._IsShutdown = true;
    
      57:          this._dequeue_wait.Set();
    
      58:      }
    
      59:  }
    

     

    重點只在重新包裝 Queue 的 Enqueue / Dequeue ,及追加的 Shutdown( ) 裡做的執行緒同步機制。在 BlockQueue 尚未 Shutdown 之前,Enqueue / Dequeue 都不會引發 Exception, 取代的是用 ManualResetEvent 的 WaitOne( ) 來暫停這個動作,直到另一端資料準備好為止。

    然而當 Shutdown 被呼叫過之後,Queue 就不再接受新的東西被塞進來了,而東西拿光因為不再補貨,所以就維持原本 Queue 的設計扔出 Exception。

     

    其實真的要挖的話,這個 Queue 可以進一步的改善,以資料結構來看,這種有固定 SIZE 上限的 QUEUE,最適合用 CircleQueue 來實作了。有興趣的朋友們可以換上回介紹過的 NGenerics 改看看,我就不再示範了。其實還有其它變型,像是 Priority Queue, 進去跟出來的順序不一定一樣,意思是你地位比較高的話是可以 "插隊" 的,後加入 QUEUE 的物件,可以優先被拿出來。這些機制都是可以進一步改善 "生產者/消費者" 模式的方法,有需要的讀者們可以朝這個方向思考看看!

     

    這篇只是個開始,運用這種機制,可以進一步延伸出 Pipeline 模式 (生產線),甚至更進一步運用到串流 (Stream) 的應用。運氣好的話下個月應該看的到完整的探討跟解說吧 ...,敬請期待 :D

    2008/10/18 系列文章: 多執行緒的處理技巧 .NET C# 作品集 作業系統 多執行緒 專欄 技術隨筆 有的沒的 物件導向