9/15/2009 2:26:31 AM

[設計案例] 生命遊戲#3, 時序的控制

Microsoft.NET | C# | Threading | 作業系統 | 技術隨筆 | 物件導向 Facebook Share

原本的範例,其實有些盲點,不知各位有沒看到? 一樣的起始狀態,一樣的遊戲規則,你不一定會得到一樣的結果。為什麼? 因為這會跟你程式 SCAN 的順序有關。怎麼說? 因為到目前為只,整個遊戲就好像下棋一樣,是 "回合制",我下完了換你... 一路一直輪下去。

這時先下後下就會影響結果了。現實世界的生命不是這樣的啊... 不知有沒有人玩過早期的太空戰士 (Final Fantasy) 系列遊戲? 當年 FF 有個很重要的突破,就是把 RPG 從傳統的 "回合制" 改成即時戰鬥... 每個人都有個倒數的碼錶,數到 0 你就可以發動下一次的攻擊... 這樣才接近現實世界啊。套用到我們的生命遊戲,這次我們想作的改變,就是把程式改成這種模式。

因此來調整一下規則,每個細胞每隔 1000ms 後會進到下一個狀態。不過生命總是沒有完全一樣的,因此每個細胞進到下一個狀態的時間差,都會有 10% 的誤差 (也就是 950ms ~ 1050ms 之間的時間都有可能)。其它規責則維持不變,來看看程式該怎麼改寫。

這種 "即時制",是比較合乎現實的情況的,如果未來你想發展到像 facebook 上的那些小遊戲,或是其它線上遊戲一樣的話, "回合制" 是決對行不通的... 這時,我們可以想像,每個細胞都有自己的執行緒,每換過一次狀態後就 Sleep() 一段時間,醒來再換到下一次狀態... 一直到指定的世代 (generation) 到達為止。

來看一下改版過的程式。我們先不動原本的 Cell, 只追加一個 method: WholeLife( ), 呼叫後就會一直更新這個細胞的狀態,直到它結束為止 (不是死掉喔,是 generation 到達)。而整個世界的所有細胞,都是獨立的個體,都有個專屬的執行緒在運作...。這時 Game Host 就得換個方式來讓這些細胞過日子 (執行),同時 Game Host 好像有個人造衛星一樣,不斷的在上空拍照來更新畫面,而完全不影響這些細胞的生命進行。

來看一下改寫過的 Cell 追加的 method:

Cell 追加的 method[copy code]
   1:  public void WholeLife(object state)
   2:  {
   3:      int generation = (int)state;
   4:      for (int index = 0; index < generation; index++)
   5:      {
   6:          this.OnNextStateChange();
   7:          Thread.Sleep(_rnd.Next(950, 1050));
   8:      }
   9:  }

 

改變不大,只是多個簡單的迴圈,跟 sleep 來控制時間而已。再來看看 Game Host 要怎麼改:

多執行緒版本的 Game Host[copy code]
   1:  static void Main(string[] args)
   2:  {
   3:      int worldSizeX = 30;
   4:      int worldSizeY = 30;
   5:      int maxGenerationCount = 100;
   6:      World realworld = new World(worldSizeX, worldSizeY);
   7:      // init threads for each cell
   8:      List<Thread> threads = new List<Thread>();
   9:      for (int positionX = 0; positionX < worldSizeX; positionX++)
  10:      {
  11:          for (int positionY = 0; positionY < worldSizeY; positionY++)
  12:          {
  13:              Cell cell = realworld.GetCell(positionX, positionY);
  14:              Thread t = new Thread(cell.WholeLife);
  15:              threads.Add(t);
  16:              t.Start(maxGenerationCount);
  17:          }
  18:      }
  19:      // reflesh maps
  20:      do
  21:      {
  22:          realworld.ShowMaps("");
  23:          Thread.Sleep(100);
  24:      } while (IsAllThreadStopped(threads) == false);
  25:      // wait all thread exit.
  26:      foreach (Thread t in threads) t.Join();
  27:  }
  28:  private static bool IsAllThreadStopped(List<Thread> threads)
  29:  {
  30:      foreach (Thread t in threads)
  31:      {
  32:          if (t.ThreadState != ThreadState.Stopped) return false;
  33:      }
  34:      return true;
  35:  }

 

其實這卅幾行 code, 大都花在控制執行緒上面,有興趣的讀者可以翻翻我之前寫的那系列文章,我就不多作說明了。調整之後,這個世界變的更不可測了,一樣的起始環境,連上帝 (在這模擬世界裡,我就是上帝 XD) 都無法預測下一秒會發生什麼事...

image

 

感覺就好像看電視一樣。畫面不斷的在閃動,而畫面裡的細胞會不規責的跳動,不像上一版程式一樣,每刷一次就變一次那樣的枯燥無聊。如果畫面呈現的地方再多用點心思,就可以弄的像卡通一樣,每個細胞都各自用自己的步調在活著...

到這裡,如何? 應該沒有人把作業寫到這個樣子了吧 XD (就說別抄我的程式去交作業了)。不適當的利用執行緒,也做的到類似的結果。不過,你花費的代價會很大,因為你的程式得自己來做 context switch (這些是 OS + thread scheduler 會幫你解決掉的,只要你曉得要用 thread)。

接下來下一篇,我們再繼續調整這世界的遊戲規則,加入更多元素進去,看看程式會變怎樣? 多執行緒解決時間的問題了,再來我們要用繼承及多型,讓不同的生命可以在同一個世界下共同生活...  ((待續))

 



10/18/2008 11:53:00 AM

生產者 vs 消費者 - BlockQueue 實作

543 | C# | Threading | 作業系統 | 我的作品 | 技術隨筆 | 物件導向 | Microsoft.NET | [精選文章] Facebook Share

過去寫了好幾篇跟執行緒相關的文章,講的都是如何精確控制執行緒的問題。不過實際上有在寫的人就知道,那些只是 "工具",最重要的還是你該怎樣安排你的程式,讓它能有效率的用到執行緒的好處,那才是重點。大部份能有效利用到多執行緒的程式,大都是大量且獨立的小動作,可以很簡單的撒下去給ThreadPool處理,不過當你的程式沒辦法這樣切,就要想點別的辦法了。

 

開始看 code 前先講講簡單的概念。這篇要講的是另一種模式: "生產者 v.s. 消費者"。這是個很典型的供需問題,唸過作業系統 (Operation System) 的人應該都被考過這個課題吧 @_@。簡單的說如果你的程式要處理的動作可以分為 "生產者" (產生資料,載入檔案,或是第一階段的運算等等) 及消費者 (匯出資料,或是第二階段的運算等等) 這種模式,而前後兩個階段各自又適合用執行緒來加速的話,那你就值得來研究一下這種模式。第一手資料就是去看看作業系統的書,恐龍書足足有一整章在講,足夠你研究了。本篇重點會擺在怎樣用 C# / .NET 實作的部份。

 

舉個具體一點的例子,如果你想寫個程式,從網站下載幾百個檔案,同時要把它們壓縮成一個 ZIP 檔,在過去你大概只能全部下載完之後,再開始ZIP的壓縮動作。第一階段都是 IO (網路) bound 的程式,第二階段則是 CPU bound。如果是完全獨立的兩個程式,很適合擺在一起執行,因為它們需要的資源不一樣,不會搶來搶去。但是就敗在他們要處理的資料是卡在一起的。

 

把這個動作想像成我們有兩組人分別負責下載及壓縮的動作,下載的部份可以多執行緒同時進行沒問題,但是下載好一個檔案,就可以先丟給後面的那組人開始壓縮了,不用等期它人下載完成。如果下載的暫存目錄空間有限,我們甚至可以這樣調整: 當 TEMP 滿了的話,下載動作就暫停,等到 TEMP 裡的東西壓縮好清掉一部份後再繼續。而壓縮的部份則相反,如果 TEMP 已經空了就暫停,等到有東西進來再繼續,直到完成為止。

 

image

前後兩階段該如何利用多執行緒,我就跳過去了,過去那幾篇就足以應付。這種模式的關鍵在於前後兩階段的進度該如何平衡。有些範例是有照規矩的把這模式實作出來,不過... 你也知道,看起來就是像作業的那種,完全不像是可以拿來正規的用途。

 

我認定 "好" 的實作,是像 System.Collections.Generics 之於 DataStructure 那樣,能很漂亮的把細節封裝起來,很容易重複利用的才是我認為好的實作。不能容易的使用,那就只能像作業一樣寫完就丟...。這個問題看過有人用 Semaphore 來做,也是作的很棒,不過我比較推薦的是 QUEUE 的作法。

 

從上圖來看,生產者跟消費者都很簡單,就是過去講的多執行緒或是執行緒集區就搞定,關鍵在於中間的控制。我的想法是把庫存管理的東西實作成佇列 (QUEUE),生產者產出的東西就放到 QUEUE,而消費者就去 QUEUE 把東西拿出來。不過現成的 QUEUE 不會告訴你 QUEUE 滿了,QUEUE 空了也只會丟 EXCEPTION 而以。這次我做了個 BlockQueue 就是希望解決這個問題。

 

我期望這個 QUEUE 能跟一般的 QUEUE 一樣使用,但是要有三個地方不一樣:

  1. 要設定大小限制,當 QUEUE 達到容量上限時 EnQueue 的動作會被暫停 (Block),而不是丟出例外。
  2. QUEUE 已經空了的時後,DeQueue 的動作會被暫停 (Block),而不是丟出例外。
  3. 要多個 QUEUE 關機的動作 (SHUTDOWN),以免生產者都不出貨了,消費者還苦苦的等下去的窘況。

 

先看看這樣的 QUEUE 我希望它怎麼被使用。看一下簡單的範例程式 (主程式,不包含 BlockQueue):

使用 BlockQueue 來實作的生產者/消費者範例[copy code]
        public static BlockQueue<string> queue = new BlockQueue<string>(10);        public static void Main(string[] args)        {            List<Thread> ps = new List<Thread>();            List<Thread> cs = new List<Thread>();            for (int index = 0; index < 5; index++)            {                Thread t = new Thread(Producer);                ps.Add(t);                t.Start();            }            for (int index = 0; index < 10; index++)            {                Thread t = new Thread(Consumer);                cs.Add(t);                t.Start();            }            foreach (Thread t in ps)            {                t.Join();            }            WriteLine("Producer shutdown.");            queue.Shutdown();            foreach (Thread t in cs)            {                t.Join();            }        }        public static long sn = 0;        public static void Producer()        {            for (int count = 0; count < 30; count++)            {                RandomWait();                string item = string.Format("item:{0}", Interlocked.Increment(ref sn));                WriteLine("Produce Item: {0}", item);                queue.EnQueue(item);            }            WriteLine("Producer Exit");        }        public static void Consumer()        {            try            {                while (true)                {                    RandomWait();                    string item = queue.DeQueue();                    WriteLine("Cust Item: {0}", item);                }            }            catch            {                WriteLine("Consumer Exit");            }        }        private static void RandomWait()        {            Random rnd = new Random();            Thread.Sleep((int)(rnd.NextDouble() * 300));        }        private static void WriteLine(string patterns, params object[] arguments)        {            Console.WriteLine(string.Format("[#{0:D02}] ", Thread.CurrentThread.ManagedThreadId) + patterns, arguments);        }
   1:  public static BlockQueue<string> queue = new BlockQueue<string>(10);
   2:  public static void Main(string[] args)
   3:  {
   4:      List<Thread> ps = new List<Thread>();
   5:      List<Thread> cs = new List<Thread>();
   6:      for (int index = 0; index < 5; index++)
   7:      {
   8:          Thread t = new Thread(Producer);
   9:          ps.Add(t);
  10:          t.Start();
  11:      }
  12:      for (int index = 0; index < 10; index++)
  13:      {
  14:          Thread t = new Thread(Consumer);
  15:          cs.Add(t);
  16:          t.Start();
  17:      }
  18:      foreach (Thread t in ps)
  19:      {
  20:          t.Join();
  21:      }
  22:      WriteLine("Producer shutdown.");
  23:      queue.Shutdown();
  24:      foreach (Thread t in cs)
  25:      {
  26:          t.Join();
  27:      }
  28:  }
  29:  public static long sn = 0;
  30:  public static void Producer()
  31:  {
  32:      for (int count = 0; count < 30; count++)
  33:      {
  34:          RandomWait();
  35:          string item = string.Format("item:{0}", Interlocked.Increment(ref sn));
  36:          WriteLine("Produce Item: {0}", item);
  37:          queue.EnQueue(item);
  38:      }
  39:      WriteLine("Producer Exit");
  40:  }
  41:  public static void Consumer()
  42:  {
  43:      try
  44:      {
  45:          while (true)
  46:          {
  47:              RandomWait();
  48:              string item = queue.DeQueue();
  49:              WriteLine("Cust Item: {0}", item);
  50:          }
  51:      }
  52:      catch
  53:      {
  54:          WriteLine("Consumer Exit");
  55:      }
  56:  }
  57:  private static void RandomWait()
  58:  {
  59:      Random rnd = new Random();
  60:      Thread.Sleep((int)(rnd.NextDouble() * 300));
  61:  }
  62:  private static void WriteLine(string patterns, params object[] arguments)
  63:  {
  64:      Console.WriteLine(string.Format("[#{0:D02}] ", Thread.CurrentThread.ManagedThreadId) + patterns, arguments);
  65:  }

 

 

 

 

 

主程式很簡單,你知道怎麼寫多執行緒程式的話那麼一看就懂了。一開始替 Producer / Consumer 各建立三個執行續,而每個 Producer 只作很簡單的事,就是連續生產 30 個字串放到 BlockQueue, 當所有的 Producer thread 都執行完後,會呼叫 queue.Shutdown( ); 通知 QUEUE 已經全部生產完畢。

Consumer 也很簡單,每個 Consumer 只是去 Queue 拿東西出來,顯示在 Console 上。直到 Dequeue 動作失敗,接到 Exception 之後就結束。

要試試生產者/消費者模式的各種狀況,可以試著調整兩者的執行緒數量。舉例來說,調大 Producer 執行緒數量時 (P: 10 / C:5),結果是這樣:

image

Producer 的進度大約就是領先 Consumer 的進度 10 筆資料左右,領先的幅度就暫停了,不會無止境的成長下去。證明卡在 QUEUE 內的數量受到控制。接下來再來看看調高 Consumer 的執行緒數量的結果:

image

好像 iPhone 上市搶購熱潮一樣 @_@,供不應求,Producer 提供的資料馬上被搶走了...。

 

效果不錯,看來這樣的實作有達成它的目的。最後來看的是 BlockQueue 的程式碼:

 

BlockQueue<T> 實作的完整程式碼[copy code]
    public class BlockQueue<T>    {        public readonly int SizeLimit = 0;        private Queue<T> _inner_queue = null;        private ManualResetEvent _enqueue_wait = null;        private ManualResetEvent _dequeue_wait = null;        public BlockQueue(int sizeLimit)        {            this.SizeLimit = sizeLimit;            this._inner_queue = new Queue<T>(this.SizeLimit);            this._enqueue_wait = new ManualResetEvent(false);            this._dequeue_wait = new ManualResetEvent(false);        }        public void EnQueue(T item)        {            if (this._IsShutdown == true) throw new InvalidCastException("Queue was shutdown. Enqueue was not allowed.");            while (true)            {                lock (this._inner_queue)                {                    if (this._inner_queue.Count < this.SizeLimit)                    {                        this._inner_queue.Enqueue(item);                        this._enqueue_wait.Reset();                        this._dequeue_wait.Set();                        break;                    }                }                this._enqueue_wait.WaitOne();            }        }        public T DeQueue()        {            while (true)            {                if (this._IsShutdown == true)                {                    lock (this._inner_queue) return this._inner_queue.Dequeue();                }                lock (this._inner_queue)                {                    if (this._inner_queue.Count > 0)                    {                        T item = this._inner_queue.Dequeue();                        this._dequeue_wait.Reset();                        this._enqueue_wait.Set();                        return item;                    }                }                this._dequeue_wait.WaitOne();            }        }        private bool _IsShutdown = false;        public void Shutdown()        {            this._IsShutdown = true;            this._dequeue_wait.Set();        }    }
   1:  public class BlockQueue<T>
   2:  {
   3:      public readonly int SizeLimit = 0;
   4:      private Queue<T> _inner_queue = null;
   5:      private ManualResetEvent _enqueue_wait = null;
   6:      private ManualResetEvent _dequeue_wait = null;
   7:      public BlockQueue(int sizeLimit)
   8:      {
   9:          this.SizeLimit = sizeLimit;
  10:          this._inner_queue = new Queue<T>(this.SizeLimit);
  11:          this._enqueue_wait = new ManualResetEvent(false);
  12:          this._dequeue_wait = new ManualResetEvent(false);
  13:      }
  14:      public void EnQueue(T item)
  15:      {
  16:          if (this._IsShutdown == true) throw new InvalidCastException("Queue was shutdown. Enqueue was not allowed.");
  17:          while (true)
  18:          {
  19:              lock (this._inner_queue)
  20:              {
  21:                  if (this._inner_queue.Count < this.SizeLimit)
  22:                  {
  23:                      this._inner_queue.Enqueue(item);
  24:                      this._enqueue_wait.Reset();
  25:                      this._dequeue_wait.Set();
  26:                      break;
  27:                  }
  28:              }
  29:              this._enqueue_wait.WaitOne();
  30:          }
  31:      }
  32:      public T DeQueue()
  33:      {
  34:          while (true)
  35:          {
  36:              if (this._IsShutdown == true)
  37:              {
  38:                  lock (this._inner_queue) return this._inner_queue.Dequeue();
  39:              }
  40:              lock (this._inner_queue)
  41:              {
  42:                  if (this._inner_queue.Count > 0)
  43:                  {
  44:                      T item = this._inner_queue.Dequeue();
  45:                      this._dequeue_wait.Reset();
  46:                      this._enqueue_wait.Set();
  47:                      return item;
  48:                  }
  49:              }
  50:              this._dequeue_wait.WaitOne();
  51:          }
  52:      }
  53:      private bool _IsShutdown = false;
  54:      public void Shutdown()
  55:      {
  56:          this._IsShutdown = true;
  57:          this._dequeue_wait.Set();
  58:      }
  59:  }

 

重點只在重新包裝 Queue 的 Enqueue / Dequeue ,及追加的 Shutdown( ) 裡做的執行緒同步機制。在 BlockQueue 尚未 Shutdown 之前,Enqueue / Dequeue 都不會引發 Exception, 取代的是用 ManualResetEvent 的 WaitOne( ) 來暫停這個動作,直到另一端資料準備好為止。

然而當 Shutdown 被呼叫過之後,Queue 就不再接受新的東西被塞進來了,而東西拿光因為不再補貨,所以就維持原本 Queue 的設計扔出 Exception。

 

其實真的要挖的話,這個 Queue 可以進一步的改善,以資料結構來看,這種有固定 SIZE 上限的 QUEUE,最適合用 CircleQueue 來實作了。有興趣的朋友們可以換上回介紹過的 NGenerics 改看看,我就不再示範了。其實還有其它變型,像是 Priority Queue, 進去跟出來的順序不一定一樣,意思是你地位比較高的話是可以 "插隊" 的,後加入 QUEUE 的物件,可以優先被拿出來。這些機制都是可以進一步改善 "生產者/消費者" 模式的方法,有需要的讀者們可以朝這個方向思考看看!

 

這篇只是個開始,運用這種機制,可以進一步延伸出 Pipeline 模式 (生產線),甚至更進一步運用到串流 (Stream) 的應用。運氣好的話下個月應該看的到完整的探討跟解說吧 ...,敬請期待 :D



8/15/2008 2:51:00 AM

Thread Sync #2. 實作篇 - 互相等待的兩個執行緒

Microsoft.NET | Threading | 小技巧 | 作業系統 | 技術隨筆 | 物件導向 Facebook Share

上篇有人跟我講太深奧了... Orz, 其實不會,只是還沒看到 Code 而以...。就先來幫黑暗魔人賽說明一下程式碼...。首先來看的是黑暗大魔王: GameHost..

 

GameHost 呼叫 Player 的片段[copy code]
    public void Start(Player p)    {        // 略...        int[] guess = p.StartGuess(_maxNum, _digits);        // 略...        Hint hint = compare(guess);        // 略...        while (hint.A != _digits)        {            // 略...            guess = p.GuessNext(hint);            // 略...            hint = compare(guess);        }        p.Stop();        // 略...    }
   1:  public void Start(Player p)
   2:  {
   3:      // 略...
   4:      int[] guess = p.StartGuess(_maxNum, _digits);
   5:      // 略...
   6:      Hint hint = compare(guess);
   7:      // 略...
   8:      while (hint.A != _digits)
   9:      {
  10:          // 略...
  11:          guess = p.GuessNext(hint);
  12:          // 略...
  13:          hint = compare(guess);
  14:      }
  15:      p.Stop();
  16:      // 略...
  17:  }

 

這段程式完全是老闆的角度在思考。抓到 PLAYER 後就叫它開始猜 StartGuess(),然後拼命的叫 PLAYER 再猜 GuessNext(), 直到猜中才可以休息 Stop()

很典型的多型 ( Polymorphism ) 應用,實際上會 RUN 什麼 CODE,就看繼承 PLAYER 的人是怎麼寫的...。這次我們再從弱勢勞工的角度來看看 PLAYER 該怎麼實作 (以 darkthread 附的 DummyPlayer 為例):

 

Player 實作的範例 ( DummyPlayer )[copy code]
public class DummyPlayer : Player{    private int[] _currAnswer = null;    private Random _rnd = new Random();    private void randomGuess()    {        List<int> lst = new List<int>();        for (int i = 0; i < _digits; i++)        {            int r = _rnd.Next(_maxNum);            while (lst.Contains(r))                r = _rnd.Next(_maxNum);            lst.Add(r);            _currAnswer[i] = r;        }    }    public override int[] StartGuess(int maxNum, int digits)    {        base.StartGuess(maxNum, digits);        _currAnswer = new int[digits];        randomGuess();        return _currAnswer;    }    public override int[] GuessNext(Hint lastHint)    {        randomGuess();        return _currAnswer;    }}
   1:  public class DummyPlayer : Player
   2:  {
   3:      private int[] _currAnswer = null;
   4:      private Random _rnd = new Random();
   5:   
   6:      private void randomGuess()
   7:      {
   8:          List<int> lst = new List<int>();
   9:          for (int i = 0; i < _digits; i++)
  10:          {
  11:              int r = _rnd.Next(_maxNum);
  12:              while (lst.Contains(r))
  13:                  r = _rnd.Next(_maxNum);
  14:              lst.Add(r);
  15:              _currAnswer[i] = r;
  16:          }
  17:      }
  18:   
  19:      public override int[] StartGuess(int maxNum, int digits)
  20:      {
  21:          base.StartGuess(maxNum, digits);
  22:          _currAnswer = new int[digits];
  23:          randomGuess();
  24:          return _currAnswer;
  25:      }
  26:      public override int[] GuessNext(Hint lastHint)
  27:      {
  28:          randomGuess();
  29:          return _currAnswer;
  30:      }
  31:  }

 

因為 CODE 不多,我就不刪了,全文照貼。另一個原因是我想讓各位看看拆成好幾段的 CODE 是不是還能夠一眼就還原成原來的邏輯?  如果只看這段 CODE 十秒鐘,沒有看註解或說明,誰能馬上回答這段 CODE 解題的邏輯是什麼?

 

別誤會,不是指這 CODE 不易讀,而是因為呼叫的方式邏輯被迫配合 GameHost 而被切散了,你得再重新把它拼湊起來。它的邏輯很簡單,甚至簡單到連問題的答案都被忽略掉了,不過就每次都隨機丟個數字回去,在 StartGuess( ) 及 GuessNext( ) 都是。

 

 

可憐的勞動階級要站起來啊~ 先幻想一下,如果勞工 (PLAYER) 才是老闆,那麼程式可以改成怎麼樣? 這也才是我們本篇的主角。先來看看成果再回頭來看怎麼實作。這次看的是修改後的版本: AsyncDummyPlayer.

 

換 PLAYER 的角度思考的邏輯: AsyncDummyPlayer[copy code]
    public class AsyncDummyPlayer : AsyncPlayer    {        private int[] _currAnswer = null;        private Random _rnd = new Random();        private void randomGuess()        {            List<int> lst = new List<int>();            for (int i = 0; i < _digits; i++)            {                int r = _rnd.Next(_maxNum);                while (lst.Contains(r))                    r = _rnd.Next(_maxNum);                lst.Add(r);                _currAnswer[i] = r;            }        }        protected override void Init(int maxNum, int digits)        {            _currAnswer = new int[digits];        }        protected override void Think()        {            while (true)            {                this.randomGuess();                Hint h = this.GameHost_AskQuestion(this._currAnswer);                if (h.A == this._digits) break;            }        }    }
   1:  public class AsyncDummyPlayer : AsyncPlayer
   2:  {
   3:      private int[] _currAnswer = null;
   4:      private Random _rnd = new Random();
   5:      private void randomGuess()
   6:      {
   7:          List<int> lst = new List<int>();
   8:          for (int i = 0; i < _digits; i++)
   9:          {
  10:              int r = _rnd.Next(_maxNum);
  11:              while (lst.Contains(r))
  12:                  r = _rnd.Next(_maxNum);
  13:              lst.Add(r);
  14:              _currAnswer[i] = r;
  15:          }
  16:      }
  17:      protected override void Init(int maxNum, int digits)
  18:      {
  19:          _currAnswer = new int[digits];
  20:      }
  21:      protected override void Think()
  22:      {
  23:          while (true)
  24:          {
  25:              this.randomGuess();
  26:              Hint h = this.GameHost_AskQuestion(this._currAnswer);
  27:              if (h.A == this._digits) break;
  28:          }
  29:      }
  30:  }

 

程式碼也沒比較少,都差不多。不過是那堆 CODE 換個地方擺而以。但是仔細看看,這個版本的邏輯清楚多了,PLAYER 一開始就是執行 Init( ) 的部份,而 GameHost 叫 Player 開始解題時, Player 就開始思考 (Think),而這個無腦的 Player 也很直接,就一直執行 while (true) { .... } 這個無窮迴圈,直到亂猜猜中為止。

如果 Player 在思考的時,不管在那裡它都可以適時的呼叫 GameHost_AskQuestion( .... ) 來跟 GameHost 問答案。什麼時後該猜數字? 該猜什麼數字? 這正是整個 Player 的核心,也就是 "怎麼猜" 這件事。以人的思考方式一定會分階段,比如一開始先把所有數字猜一輪,有個概念後再想想怎麼猜能更逼近答案,最後才是致命的一擊,找出正確答案送出去,贏得比賽。

這樣的作法,如果套在 DummyPlayer (原版本),每個階段都要塞在一個大的 switch case, 放在 GuessNext( ) 裡。而現在是那個階段? 只能靠 instance variable 了,先存著等到下次又被呼叫時再拿出來回想一下,上回作到那...。

而第二個版本,則完全沒這個問題,就把它當作一般程式思考就夠了,第一階段就是一個 LOOP,有它自己用的一些變數。第一階段處理完畢就離開 LOOP 繼續執行後面的 CODE... 直到最後離開 Think( ) 這個 method (認輸) 或是猜中答案光榮返鄉...。

 

 

兩者的差別看出來了嗎? DummyPlayer 像是被動的勞工,老闆說一動他就作一動。第一動作完就拿個筆計記下來,等著下次老闆再叫他,他就翻翻筆記看看之前做到那,這次繼續...。

而 AsyncDummyPlayer 這個主動的勞工呢? 老闆交待給他一件任務後,他就自己思考起該怎麼做了。中間都不需要老闆下令。反而是過程中勞工需要老闆的協助時,老闆再適時伸出援手就可以了,一切雜務都由這位主動優秀的勞工自己處理掉。

 

 

 

有沒有差這麼多? 這麼神奇? 是怎麼辦到的? 先來看看類別關系圖:

ThreadSync

 

上圖中,AsyncPlayer 就是改變這種型態的關鍵類別。AsyncPlayer 會用我們在上一篇講到的關念,化被動為主動,轉換這兩種呼叫模式。先來看看這個類別的程式碼到底變了什麼把戲,可以讓弱勢的勞工也有自主的權力?

 

 

 

 

AsyncPlayer 實作: 化被動為主動[copy code]
    public abstract class AsyncPlayer : Player    {        public override int[] StartGuess(int maxNum, int digits)        {            base.StartGuess(maxNum, digits);            Thread thinkThread = new Thread(this.ThinkCaller);            thinkThread.Start();                        this._host_return.WaitOne();            return this._temp_number;        }        public override int[] GuessNext(Hint lastHint)        {            this._temp_hint = lastHint;            this._host_call.Set();            this._host_return.WaitOne();            return this._temp_number;        }        public override void Stop()        {            base.Stop();            this._temp_hint = new Hint(this._digits, 0);            this._host_call.Set();            this._host_end.WaitOne();            this._host_complete = true;        }        private void ThinkCaller()        {            try            {                this.Init(this._maxNum, this._digits);                this.Think();            }            catch (Exception ex)            {                Console.WriteLine("Player Exception: {0}", ex);            }            finally            {                this._host_end.Set();            }        }        protected abstract void Init(int maxNum, int digits);        protected abstract void Think();        private AutoResetEvent _host_call = new AutoResetEvent(false);        private AutoResetEvent _host_return = new AutoResetEvent(false);        private AutoResetEvent _host_end = new AutoResetEvent(false);        private bool _host_complete = false;        private int[] _temp_number;        private Hint _temp_hint;        protected Hint GameHost_AskQuestion(int[] number)        {            if (this._host_complete == true) throw new InvalidOperationException("GameHost stopped!");            lock (this)            {                try                {                    this._temp_number = number;                    this._host_return.Set();                    this._host_call.WaitOne();                    return this._temp_hint;                }                finally {                    this._temp_number = null;                    this._temp_hint = new Hint(-1, -1);                }            }        }    }
   1:  public abstract class AsyncPlayer : Player
   2:  {
   3:      public override int[] StartGuess(int maxNum, int digits)
   4:      {
   5:          base.StartGuess(maxNum, digits);
   6:          Thread thinkThread = new Thread(this.ThinkCaller);
   7:          thinkThread.Start();
   8:          this._host_return.WaitOne();
   9:          return this._temp_number;
  10:      }
  11:      public override int[] GuessNext(Hint lastHint)
  12:      {
  13:          this._temp_hint = lastHint;
  14:          this._host_call.Set();
  15:          this._host_return.WaitOne();
  16:          return this._temp_number;
  17:      }
  18:      public override void Stop()
  19:      {
  20:          base.Stop();
  21:          this._temp_hint = new Hint(this._digits, 0);
  22:          this._host_call.Set();
  23:          this._host_end.WaitOne();
  24:          this._host_complete = true;
  25:      }
  26:      private void ThinkCaller()
  27:      {
  28:          try
  29:          {
  30:              this.Init(this._maxNum, this._digits);
  31:              this.Think();
  32:          }
  33:          catch (Exception ex)
  34:          {
  35:              Console.WriteLine("Player Exception: {0}", ex);
  36:          }
  37:          finally
  38:          {
  39:              this._host_end.Set();
  40:          }
  41:      }
  42:      protected abstract void Init(int maxNum, int digits);
  43:      protected abstract void Think();
  44:      private AutoResetEvent _host_call = new AutoResetEvent(false);
  45:      private AutoResetEvent _host_return = new AutoResetEvent(false);
  46:      private AutoResetEvent _host_end = new AutoResetEvent(false);
  47:      private bool _host_complete = false;
  48:      private int[] _temp_number;
  49:      private Hint _temp_hint;
  50:      protected Hint GameHost_AskQuestion(int[] number)
  51:      {
  52:          if (this._host_complete == true) throw new InvalidOperationException("GameHost stopped!");
  53:          lock (this)
  54:          {
  55:              try
  56:              {
  57:                  this._temp_number = number;
  58:                  this._host_return.Set();
  59:                  this._host_call.WaitOne();
  60:                  return this._temp_hint;
  61:              }
  62:              finally {
  63:                  this._temp_number = null;
  64:                  this._temp_hint = new Hint(-1, -1);
  65:              }
  66:          }
  67:      }
  68:  }

 

這段程式碼長了一點,內容也都刪不得,各位請耐心點看。上一篇我畫了張概念性的時序圖,這次我們再拿同一張圖,不過這次會標上程式碼:

 ThreadSync2

 

 

請注意一下各個箭頭的上下順序。由上往下代表時間的進行,如果應該在後面執行的 CODE 不巧先被呼叫了,則動作較快的那個 THREAD 會被迫暫停,等待另一邊的進度跟上。先來看看 StartGuess( ) 怎麼跟 Think( ) 互動:

StartGuess(...)[copy code]
        public override int[] StartGuess(int maxNum, int digits)        {            base.StartGuess(maxNum, digits);            Thread thinkThread = new Thread(this.ThinkCaller);            thinkThread.Start();                        this._host_return.WaitOne();            return this._temp_number;        }
   1:  public override int[] StartGuess(int maxNum, int digits)
   2:  {
   3:      base.StartGuess(maxNum, digits);
   4:      Thread thinkThread = new Thread(this.ThinkCaller);
   5:      thinkThread.Start();
   6:      this._host_return.WaitOne();
   7:      return this._temp_number;
   8:  }

 

GameHost 呼叫 Player.StartGuess( ) 有兩個目的,一個是給 Player 題目範圍,讓 Player 做好準備動作。另一個則是準備好之後 GameHost 要取得 Player 傳回的第一個問題。

程式碼很忠實的做了一樣的事,只不過 StartGuess( ) 建立了新的執行緒來負責。新的執行緒會執行 ThinkCaller( ),啟動之後 GameHost 這邊就什麼都不作,等待 _host_return 這個 WaitHandle 被叫醒,代表另一邊已經做好了,可以從共用變數 _temp_number 取得問題傳回去。

既然 GameHost 在等待某人通知它,我們就來看看是誰會通知他題目已經準備好了:

 

GameHost_AskQuestion(...)[copy code]
        protected Hint GameHost_AskQuestion(int[] number)        {            if (this._host_complete == true) throw new InvalidOperationException("GameHost stopped!");            lock (this)            {                try                {                    this._temp_number = number;                    this._host_return.Set();                    this._host_call.WaitOne();                    return this._temp_hint;                }                finally {                    this._temp_number = null;                    this._temp_hint = new Hint(-1, -1);                }            }        }
   1:  protected Hint GameHost_AskQuestion(int[] number)
   2:  {
   3:      if (this._host_complete == true) throw new InvalidOperationException("GameHost stopped!");
   4:      lock (this)
   5:      {
   6:          try
   7:          {
   8:              this._temp_number = number;
   9:              this._host_return.Set();
  10:              this._host_call.WaitOne();
  11:              return this._temp_hint;
  12:          }
  13:          finally {
  14:              this._temp_number = null;
  15:              this._temp_hint = new Hint(-1, -1);
  16:          }
  17:      }
  18:  }

 

就在 GameHost 正在等題目的時後,另一個執行緒正在進行 "思考" 的動作,直到有結論後會呼叫 GameHost_AskQuestion( ... ) 送出問題。這時這個問題會被放到 _temp_number, 而下一步就是 _host_return.Set( ), 通知另一個執行緒,正在等這個結果的人: "喂! 東西已經準備好了,可以來取貨了!!"

整個機制就這樣串起來了。GameHost 那邊怎麼把答案傳回來? 同樣的作法,反過來而以。GameHost 會藉著呼叫 Player.GuessNext(...) 把答案傳回來,而這時就觸動一樣的機制,讓另一邊 Player Thread 呼叫的 GameHost_AskQuestion( ... ) 醒過來,把答案拿走, RETURN 回去。

這樣一直重複下去,剩下最後一個同步的點,就是結束遊戲的地方。說穿了也是一樣的把戲,只是這次是藉著 GameHost 呼叫 Player.Stop( ),而另一邊 Player Thread 執行完 Think( ) 後,兩邊就一起結束遊戲了。

 

總算講完了。其實 thread 能解決的問題還真是五花八門。每次當我想出這些方法來簡化問題時,我就會覺的很有成就感。雖然寫出這個不會讓我贏得比賽,反而因為同步的關係,AsyncDummyPlayer 執行的速度還遠遠落後 DummyPlayer (我的機器跑起來,大概差了四~五倍 ...) 。不過我知道我簡單的頭腦,不先把問題簡化的話,我大概解決不了太複雜的問題...。也許是缺了這種能力,才讓我更有動力去想簡化問題的方式吧?

 

最後,為什麼每次講到 thread 的文章,都是 code 一點點,文章跟圖一大堆? 咳咳... 難道我也到了靠一張嘴混日子的地步了嘛? Orz... 本系列到此結束,以後還會有什麼主題? 想到再說啦~~ 今天各位記得去拜拜~~ 下回見!



3/3/2008 3:56:00 AM

Memory Management (III) - .NET CLR ?

Microsoft.NET | 作業系統 | 技術隨筆 Facebook Share

續 [上篇] & [上上篇],同樣的問題,我改用 .NET 開發是不是就搞定了? 其實這篇才是我要寫的重點,只不過引言寫太高興,就是兩篇文章了,咳咳... 有人在問,為什麼我老是寫些冷門的文章? 沒辦法... 大家都在寫的東西我就沒興趣寫了,文筆沒別人好,網站沒別人漂亮,連範例程式都沒別人炫,只好挑些沒人寫的內容...

大部份討論這主題的文章,講的都是 GC, GC 的 generation,IDisposable,還有 Heap 等等,不過這些知識都無法直接回答這次問題。底下的例子你會發現,預設的 GC 也無法解決 memory fragment 的問題,不過實際上是有解的,只是還要動用到秘技...

 

回題,先來看看之前的問題為什麼會是個問題? 萬惡之首都在: 指標 (POINTER)。

因為有 pointer,因此 C 絕對不能 *自動* 幫你調整記憶體位置,也就一定會有這種問題。看到為何我在上篇提到的程式碼要把 pointer 的值印出來? 因為這代表我可以輕易拿的到實際的位址,因此任何重新定址 (relocation) 的動作一定會影響到程式的執行。所以最根本的解決辦法就是把 pointer 這東西拿掉。

年紀較輕的程式語言,如我常提到的 Java 跟 C#,都完完全全的把 pointer 從語言內移掉了,只留下 reference 這類差不多的東西。除了拿不到絕對的 address 之外,其它功能一個都不缺。但是這樣帶來的好處是很明顯的,除了一般書上講到爛的理由: "更安全,更簡易" 之外,很重要的一點就是,像 CLR or JavaVM 這種環境,開始有機會去搬移記憶體配置的區塊,徹底的由系統層面解決這種問題了。

.NET / Java 回收記憶體的動作是自動的,就是常聽到的 Garbage Collection,而上面提到的 relocation,就是指在回收時順便把剩下已配置的空間排在一起,搬移記憶體區塊所需要的重新定址動作。這種類型的 GC 有個特別的名辭,叫作 compact collection。理論上,.NET 已經具備這樣的條件了,應該要有能力可以解決這樣的問題。

不過 "可以解決" 跟 "已經解決" 仍然有一段差距,那麼現在的 .NET CLR 到底行不行? 一樣的範例程式用 C# 改寫一下,同樣的試看看,不過這次懶的再放好幾種版本試試看了,除了最大可用記憶體可能有差別之外,其它應該都統一了。我只針對 .NET 2.0 (x86) 一種版本測試,一樣,鐵齒的讀者們,有興趣就抓回去試一試...。

整段程式碼跟之前 C 版本大同小異,就是照順序配置 64mb 的 byte[],直到丟出 OutOfMemoryException,然後跳著釋放,接著再配置 72mb 的 byte[],看看能不能配置成功? 直到再丟出 OutOfMemoryException 為止,能配置多少記憶體? 這邊為了方便,我直接在 vista x86 系統上測試:

 

測試的結果令我想殺人,竟然是 FAIL ? 放掉的空間拿不回來...

without GC 

 

後來想到,程式移除 reference,不見得會立刻釋放記憶體,總得等垃圾車 (Garbage Collect) 來收拾一下... 手動呼叫了 GC,也強迫指定要回收所有的 Generation 了 (呼叫: GC.Collect(GC.MaxGeneraion) ) 再試一次:

with GC

 

結果好不到那裡去,難到我沒用市政府的垃圾袋嘛? [:@] 查了一下 MSDN,常見的 generation 問題也試過,沒有用。90% 講 CLR GC 的問題都在探討 generation 的問題...  查到某 Java 名人的文章,提到了 compact collection 比較接近,不過沒有講怎麼明確的啟動這樣的 GC 啊... 後來去翻 .NET runtime 裡關於 garbage collection 的設定,發現還有這玩意... gcConcurrent / gcServer:

gcConcurrent: Specifies whether the common language runtime runs garbage collection on a separate thread.

gcServer: Specifies whether the common language runtime runs server garbage collection.

講的很清楚,不過對我沒啥用。gcConcurrent可能的影響是,也許呼叫後系統還在GC,我的程式就先跑下去了? 因此這東西關掉也許有幫助,再來試一次:

gcConcurrent Disabled

 

真慘,一點幫助都沒有... 放掉的 768MB,只撈回 72MB,再來看一下最後一個 gcServer,看它的 HELP 看不大出來什麼是 "server garbage collection" ? 算了,試一下比較快:

gcServer Enabled

 

Bingo,看來這個參數下下去才是我預期的結果,放掉了 576MB,後面撈了 648MB 回來。這樣的作法,已經完全不會受到 memory fragment 問題的影響,証實了 compact collection 是有發恢它的效用的,只不過這個參數實際的作用,翻遍了 Google / MSDN,得到的都是很模菱兩可的答案,不外乎是你的程式如果是 blah blah blah 的話就要用 gcServer,這樣會比較好之類的,不過實際的差別則看不大出來。沒有任何一篇文件明確提到 server gc 會做 compact collection (如果這篇不算的話,哈哈),而 workstation gc 不會,也許前面的方式也會觸發 compact collection也說不定,只是時機不成熟...

抱著不可能的希望,用 Reflector追看看,果然不出所料,Reflector也看不到細節,因為全都呼叫 native code 去了。不過這次的測試,至少確定了,在啟用 gcServer option 之後,CLR 的 GC 是會進行 compact collection 的。

寫到這裡,本系列文章結束,只是為了在新的平台驗證古早的問題而以,果然時代在進步,以前耽心的問題現在都不再是問題了。這一連串試下來,學到了一課,原來 gcServer 有這個差別,算是值回票價了。最後把我的測試程式碼貼一下,一樣,歡迎拿去各種平台試一下,有不一樣的結果也記得通知我一聲!

 

 

[Program.cs]

using System;using System.Collections.Generic;using System.Linq;using System.Text;namespace ClrMemMgmt{    class Program    {        static void Main(string[] args)        {            List<byte[]> buffer1 = new List<byte[]>();            List<byte[]> buffer2 = new List<byte[]>();            List<byte[]> buffer3 = new List<byte[]>();            //            //    allocate             //            Console.WriteLine();            Console.WriteLine();            Console.WriteLine("1. Allocate 64mb block(s) as more as possible...");            try            {                while (true)                {                    buffer1.Add(new byte[64 * 1024 * 1024]);                    Console.Write("#");                    buffer2.Add(new byte[64 * 1024 * 1024]);                    Console.Write("#");                }            }            catch (OutOfMemoryException)            {            }            Console.WriteLine();            Console.WriteLine("   Total {0} blocks were allocated ( {1} MB).", (buffer1.Count + buffer2.Count), (buffer1.Count + buffer2.Count) * 64);            //            //    free            //            Console.WriteLine();            Console.WriteLine();            Console.WriteLine("2. Free Blocks...");            buffer2.Clear();            Console.WriteLine("   Total: {0} blocks ({1} MB)", buffer1.Count, buffer1.Count * 64);            //            //  GC            //            GC.Collect(GC.MaxGeneration);            //            //    allocate            //            Console.WriteLine();            Console.WriteLine();            Console.WriteLine("3. Allocate 72mb block(s) as more as possible...");            try            {                while (true)                {                    buffer3.Add(new byte[72 * 1024 * 1024]);                    Console.Write("#");                }            }            catch (OutOfMemoryException)            {            }            Console.WriteLine();            Console.WriteLine("   Total: 64mb x {0}, 72mb x {1} blocks allocated( {2} MB).\n", buffer1.Count, buffer3.Count, buffer1.Count * 64 + buffer3.Count * 72);            Console.ReadLine();        }    }}

 

 

 

[configuration file]

<?xml version="1.0" encoding="utf-8" ?><configuration>  <runtime>    <!--<gcConcurrent enabled="false" />-->    <!--<gcServer enabled="true" />-->  </runtime></configuration>


3/3/2008 3:14:00 AM

Memory Management (II) - Test Result

Microsoft.NET | 作業系統 | 技術隨筆 Facebook Share

該來揭曉謎底了,在人氣不怎麼高的地方拋這些冷門的問題,看的我都覺的 "這版主真是不自量力" .. 咳咳.. 為了把之前的心得,在現在的主流作業系統及平台再驗證一次,只好自己花了點小工夫,把 C 挖出來寫個測試程式,不過 C / C++ 實在太久沒寫了,已經忘到連語法都得翻 HELP 的程度 :~ 花了些時間才搞定。

不過也因為這樣,連帶的查了一下如何編譯 x64 的程式碼,還有一些相關的設定項目。這次測試只測了 windows 的環境,沒辦法,這把年紀實在沒力氣再去摸第二種 OS 了,如果有善心人事要幫我測的話,我倒是很樂意把你測的結果一起放上來! 程式碼請看 [註3]

不多說廢話,測試主要會針對三種環境來測試:

  1. 一般的 x86 (win32)
  2. 在 x64 下的 32 位元執行環境 (wow64)
  3. 原生的 x64 程式

原本還想加上 /3GB options 來測的,不過這樣跟 (2) 幾乎是一樣的,只差在 3GB 跟 4GB 的可用空間而以,差異不大,當然就省下來了 [H]

測試結果就直接抓畫面附在底下了。程式碼當然都是同一份。原始 project 放在 [這裡]。其實這次問題的關鍵,跟 windows / linux,32/64,都沒有直接關係,唯一有關的是,你的 memory address space 到底有沒有用完... 怎麼說? 先來看結果:

  1. x86 build (run under windows 2003 standard x64):
  2. x86 build + /LARGEADDRESSAWARE option (under 2003 x64)
  3. x64 build
     
     

看不出個所以然? 簡單畫個表格整理一下測試結果:

測試環境統一為 Windows 2003 x64 版本,可用記憶體為 2GB,分頁檔有 4GB。

測試項目 測試 #01 測試 #02 測試 #03
執行環境 32Bits (WOW64) 32Bits (WOW64) 64Bits (Native)
Build Options x86 x86 + LargeAddressAware x64
可定址空間 / 實際可用空間 2048MB / 1920MB 4096MB / 3904MB 8TB / 4032MB
問題的測試結果 / 可以配置的 72mb 區塊 NO / 2 NO / 2 YES / 27

這結果大概會跌破一堆人的眼鏡,雖然板上回應的很冷清,不過私下 MSN 問了幾個人,有很肯定的,也有不確定的,當然也有亂猜猜的很肯定的 (S*M,就是你...),不過答案都很兩極,不是都不行,不然就是都可以...

就理論上來說,分頁的記憶體管理方式,的確是不能解決在 virtual memory address space 過度分散 (fragment) 的問題,如果程式或作業系統沒有作適度的 defrag,那麼你要挖一塊記憶體的要求一定會失敗,但是為什麼測試 #03 會通過? 因為實際可用的 Memory ( Physical RAM + Swap File ) 也不過 6GB,你的程式需索無度,要求超過的怎麼樣也生不出來。6GB 扣掉其它 OS / AP 用掉的,跟理論上能用的 8TB 實在是差太多,造成你的 virtual memory address space 跟本不可能用完。當然 *不可能* 這字眼別用太滿,十年前也是認為 4GB 跟本不可能用完,不過我現在的 PC 就已經裝了 6GB ... [:$] 上表中列了一些較特別的參數,像是明明是 32 位元,怎麼可定址空間是 2048MB ? 還有 LargeAddressAware 是什麼? 這些屬 windows 較特別的規矩,我在文章最後面的 [註1] 說明一下。

現在的 PC,隨便都是 1GB / 2GB RAM,加上分頁檔,超過 4GB 跟本是件很普通的事,意思是寫不好的程式,的確是已經會面臨到這樣的困境了,明明記憶體還夠用,但是系統卻回報 Out Of Memory ...。只可惜這樣的問題,OS一點忙都幫不上。因為 OS 沒有辦法替你做 Memory Defragment 的動作 [註2]。上一篇我有畫張記憶體配置的圖,只能用來說明 #01 / #02 的情況,換成 #03 就不大適用了。只要可定址空間夠大,基本上你只需要考慮你配置的記憶體總量有沒有超過可用的記憶體就好,是不大需要在意是不是有 fragment 的問題,除非你的可配置空間跟可用空間很接近,這問題才會浮現出來。就像積木買來,積木的體積跟盒子差不多大,你要全擺進去就得花點工夫安排一下,否則一定會有一些是收不進去的一樣 (幫吳小皮收玩具的感想... -_-)。不過如果你換一個大盒子,或是把整個房間當成大盒子來看,隨便丟都沒問題,連會不會撞在一起都不用耽心,一定不會有空間夠卻放不進去的問題,這就是差別。

這測試結果看起來很可怕,感覺起來只要是 32 位元的程式,加上是 server or services,會長時間運作的,好像都有可能碰到這種問題。這不算是 Memory Leak (因為記憶體是真的有成功的被釋放),那麼 Java / .NET 宣稱的 Garbage Collection 回收機制到底會不會碰到這個問題? 別耽心,等著看下一篇就知道了 [H]

 

 

 

--

註 1. /LARGEADDRESSAWARE:

32 位元系統可定址空間應該是 2^32 = 4GB 沒錯,不過 Microsoft 把它一分為二,規劃一半是給 Kernal,另一半才是給 APP 使用。意思是你的程式再怎麼用只能用到 2GB。不過這種問題幾年前就浮現出來了,Microsoft 特地在 OS 上加了 /3GB 這種參數,可以把 OS : AP = 2GB : 2GB 的規劃,調整為 1GB : 3GB。不過相對的程式在編譯時也要加上 /LARGEADDRESSAWARE 參數,才有可能讓 AP 取得 2GB 以上的記憶體。

所以即使在 x64 下執行的 x86 應用程式,跟本沒有 OS 那 2GB 的需求,一般 x86 APP 在 64 位元作業系統下仍然只能用到 2GB,但是不同參數編譯出來的程式碼,就能用到 4GB (如果是在加上 /3GB 的 x86 OS,則大約是 3GB)

 

註 2. 為什麼 OS 不能替 Memory 執行 Defragment 的動作?

原因很簡單,測試的程式是用 C / C++ 這類可以直接存取 pointer 的程式語言寫的。試想一下 OS 如果替你把已配置的記憶體區塊挪一挪會發生什麼事? 你拿到的 pointer 全都成了廢物,因為它指向的記憶體,可能已經不是當時你認識的資料了... 因為資料有可能被強迫搬家,你的通訊錄再也沒有用,老朋友就失聯了...

因此,別的程式語言不一定,但是允許你直接使用 pointer 的語言,這類的問題你閃不掉,一定得自己想辦法...

 

註 3. 測試程式碼

這段 code 我做了點改變,因為 4kb block size 實在太小了 (相對於 4GB 上限),印出訊息一大堆,執行又慢,因此我自己把問題的參數調整一下,把問題的 4kb 改成 64mb,而最後要配置的 5kb 則改成 72mb。若對這段 code 有興趣的人,可以直接 copy 去用。我直接把 source code 貼在底下 ( C++ 語法忘了一大半 :P,因此用的都是單純的 C ... 除了 conio.h 之外,應該隨便都能成功編譯吧,看誰有興趣可以拿到 Linux 下去試看看....):

 

 

#include <stdio.h>#include <stdlib.h>#include <conio.h>#include <malloc.h>void init_buffer(void *ptr, unsigned int size) {    if (ptr == NULL) return;    //for (int count = 0; count < size / sizeof(long); count++) {    //    ((long *)ptr)[count] = 0;    //}}int main(const int& args) {    void *buffer1[4096];    void *buffer2[4096];    void *buffer3[4096];    //    //    allocate     //    printf("\n\n");    printf("1. Allocate 64mb block(s) as more as possible...\n");    int total = 0;    for (int count = 0; count < 4096; count++) {        buffer1[count] = buffer2[count] = NULL;        buffer1[count] = malloc(64 * 1024 * 1024);        if (buffer1[count] == NULL) break;        init_buffer(buffer1[count], 64 * 1024 * 1024);        printf("#");        total++;        buffer2[count] = malloc(64 * 1024 * 1024);        if (buffer2[count] == NULL) break;        init_buffer(buffer2[count], 64 * 1024 * 1024);        printf("#");        total++;    }    printf("\n   Total %d blocks were allocated ( %d MB).\n", total, total * 64);    //    //    free    //    printf("\n\n");    printf("2. Free Blocks...\n");    for (int count = 0; count < 4096; count++) {        if (buffer2[count] == NULL) break;        free(buffer2[count]);        buffer2[count] = NULL;        total--;        printf("#");    }    printf("\n   Total: %d blocks (%d MB)\n", total, total * 64);        //    //    allocate    //    printf("\n\n");    printf("3. Allocate 72mb block(s) as more as possible...\n");    int total2 = 0;    for (int count = 0; count < 4096; count++) {        buffer3[count] = malloc(72 * 1024 * 1024);        if (buffer3[count] == NULL) break;        printf("#");        total2++;    }    printf("\n   Total: 64mb x %d, 72mb x %d blocks allocated( %d MB).\n", total, total2, total * 64 + total2 * 72);    printf("\nDump Blocks Address:\n");    for (int count = 0; count < 4096; count++) {        if (buffer1[count] == NULL) break;        printf("  64mb block ponter: [%08p] ~ [%08p] \n", buffer1[count], (long)buffer1[count] + 64 * 1024 * 1024);    }    for (int count = 0; count < 4096; count++) {        if (buffer3[count] == NULL) break;        printf("  72mb block ponter: [%08p] ~ [%08p] \n", buffer3[count], (long)buffer3[count] + 72 * 1024 * 1024);    }    _getch();    return 0;}


2/26/2008 9:07:00 PM

Memory Management - (I). Fragment ?

Microsoft.NET | 543 | 作業系統 | 技術隨筆 Facebook Share

程式越寫, 越覺的課本教的東西很重要... 最近碰到一些記憶體管理的問題, 想到以前學 C 跟 OS 時, 大家都有個理想..

"只要 OS 支援虛擬記憶體, 以後寫 code 都不用耽心 Memory 不夠..."

聽起來很合理, 虛擬記憶體本來就是讓開發人員省事的機制啊... 當然前提不超過硬體限制, 像是 32 位元的程式就不能超過 4GB. Virtual Memory 也帶來很多好處. 除了可以以硬碟空間換取記憶體空間之外, 因為 swap 需要的 paging 機制, 也間接的解決了 "PHYSICAL" Memory Fragment 的問題.

怎麼說? 邏輯的記憶體位址, 對應到實體的記憶體位址, 不一定是連續的. 有點像是硬碟的一個大檔案, 實際上可能是散在硬碟的好幾個不連續區塊. 除了效能問題之外, 是沒有任何不同的.

因為這堆 "便民" 的機制, 現在的程式設計師還會考慮這種問題的人, 少之又少. 有的還聽不懂你在問啥... 以前在 BBS 討論版看到一個問題, 印像很深刻, 算一算十來年了還記得... 把這問題貼一下, 主題就是 programmer 到底該不該耽心 memory fragment 的問題? 實驗的方式很有趣:

  1. 連續以固定 size (ex: 4KB) allocate memory, 直到沒有記憶體為止
  2. 開始 free memory. 不過是跳著釋放. 比如 (1) 取得的一連串記憶體, 只放奇數位子 1st, 3rd, 5th, 7th ....
  3. 挑戰來了, 這時應該清出一半空間了. 如果我再 allocate 5KB 的記憶體, OS 會成功清給我? 還是會失敗?

簡單畫張圖說明,就像這樣:

image

(1) 就是在可用的定址空間內盡量塞,因為虛擬記憶體的關係,不管實體記憶體夠不夠,都能夠使用。

(2) 就是跳著釋放記憶體後的分佈情況。

(3) 圖上看來已經沒有能夠容納 "大一點" 區塊的空間了,那麼 [?] 這個區塊到底還放不放的下?

來來來, 大挑戰... 這種程式千萬別在自己家裡亂玩... 也別在你按不到 reset 開關的電腦亂玩 (ex: 遠端連到機房的 server) ... 前題是用 C / C++ 這類可以直接操作 pointer 的 language. OS 不限, 覺的 Linux 強就用 Linux, 喜歡 Gates 的就用 windows... 32 / 64 位元都可以...

先賣個關子, 結果會是怎麼樣? 不同的 OS 會有不同的結果嗎? 64位元會有不同嗎? 有興趣可以試看看,懶的寫 code 也可以猜看看!



12/17/2007 2:21:00 AM

ThreadPool 實作 #3. AutoResetEvent / ManualResetEvent

Microsoft.NET | Threading | 作業系統 | 技術隨筆 Facebook Share

續上篇, 從眾多閒置的 worker thread 挑選一個起來接工作有兩種策略作法. 一種作法是 Thread Pool 自己決定, 最基本的就是誰等最久就叫誰起來, 或是 Thread Pool 有自己的演算法挑一個最菜的 worker thread 來做工都可以... 另一種作法就是不管它, 每個 worker thread 都靠運氣, 交給上天 (OS) 決定, 看誰搶到下一個 job. 看起來第一種好像比較好, 事實上不見得. 每個 thread 之間的排程是個學問, OS 多工的效率好不好就看這個. 舉例來說, 如果每個 worker thread 的優先順序不同, 或是某些 thread 正好碰到 GC, 或是正好被移到 virtaual memory 等等, 硬去叫它起來工作反而要花更多的時間. 而這些資訊都在 OS 的排程器裡才有足夠的資訊可以判斷, 以寫 AP 的角度很難顧級到這個層面. 這時最好的辦法就是不管它, 用齊頭式的平等, 把選擇權交給 OS 決定.

又是一個說起來比 code 多的例子. 這兩種不同的策略, 寫成 code 其實只差一行... 就是選用 AutoResetEvent 跟 ManualResetEvent 的差別而以. .NET SDK 的 Class Reference 上這樣寫著:

AutoResetEvent: Notifies a waiting thread that an event has occurred.

ManualResetEvent: Notifies one or more waiting threads that an event has occurred.

真正寫成 Code 來測試一下...

   1:  static void Main(string[] args)
   2:  {
   3:      for (int count = 0; count < 5; count++)
   4:      {
   5:          Thread t = new Thread(new ThreadStart(ThreadTest));
   6:          t.Start();
   7:      }
   8:      Thread.Sleep(1000);
   9:      wait.Set();
  10:      Thread.Sleep(1000);
  11:      wait.Set();
  12:      Thread.Sleep(1000);
  13:      wait.Set();
  14:      Thread.Sleep(1000);
  15:      wait.Set();
  16:      Thread.Sleep(1000);
  17:      wait.Set();
  18:  }
  19:   
  20:  private static AutoResetEvent wait = new AutoResetEvent(false);
  21:  private static void ThreadTest()
  22:  {
  23:      Console.WriteLine("Thread[{0}]: wait...", Thread.CurrentThread.ManagedThreadId);
  24:      wait.WaitOne();
  25:      Console.WriteLine("Thread[{0}]: wakeup...", Thread.CurrentThread.ManagedThreadId);
  26:  }

執行結果:

Thread[ 3 ]: wait...
Thread[ 5 ]: wait...
Thread[ 4 ]: wait...
Thread[ 6 ]: wait...
Thread[ 7 ]: wait...
Thread[ 3 ]: wakeup...
Thread[ 4 ]: wakeup...
Thread[ 6 ]: wakeup...
Thread[ 5 ]: wakeup...
Thread[ 7 ]: wakeup...

程式過程中我加了幾個 Sleep, 首先我用同一個 AutoResetEvent, 讓五個 thread 都去等待同一個 notify event 來叫醒它. 而 AutoResetEvent 一次只能叫醒一個被 WaitOne blocked 住的 thread. 就是第一種先到先贏的作法, 後面幾行 wakeup 的 message 每隔一秒會跳一行出來.

 

再來看一下 ManualResetEvent ...

   1:  static void Main(string[] args)
   2:  {
   3:      for (int count = 0; count < 5; count++)
   4:      {
   5:          Thread t = new Thread(new ThreadStart(ThreadTest));
   6:          t.Start();
   7:      }
   8:   
   9:      Thread.Sleep(1000);
  10:      wait.Set();
  11:  }
  12:   
  13:  private static ManualResetEvent wait = new ManualResetEvent(false);
  14:  private static void ThreadTest()
  15:  {
  16:      Console.WriteLine("Thread[{0}]: wait...", Thread.CurrentThread.ManagedThreadId);
  17:      wait.WaitOne();
  18:      Console.WriteLine("Thread[{0}]: wakeup...", Thread.CurrentThread.ManagedThreadId);
  19:  }

執行結果:

Thread[ 3 ]: wait...
Thread[ 4 ]: wait...
Thread[ 5 ]: wait...
Thread[ 6 ]: wait...
Thread[ 7 ]: wait...
Thread[ 5 ]: wakeup...
Thread[ 4 ]: wakeup...
Thread[ 6 ]: wakeup...
Thread[ 3 ]: wakeup...
Thread[ 7 ]: wakeup...

除了把型別宣告從 AutoResetEvent 換成 ManualResetEvent 之外, 其它都沒變. 當然 line 10 一次就能叫醒所有的 thread, 所以後面四次 Set( ) 我就直接刪掉了. 程式 run 到 line 10, 後面五行 wakeup 的訊息就會一次全出現, 而出現的順序是隨機的, 每次都不大一樣.

這種作法的解釋, 是一次 Set( ), 卡在 WaitOne( ) 的五個 thread 就全被叫醒了. 而這個現象如果套用在 SimpleThreadPool 的實作上, 它的作用相當於第二種作法. 一瞬間把所有的 worker thread 從 blocked 狀態移到 waiting 狀態. 而到底是那一個 thread 有幸第一個被 OS 移到 running 狀態? 就是根據 OS 自己的排程策略而定. 第一個移到 running 狀態的 thread 通常就能搶到 job queue 裡的工作, 剩下的沒搶到, 則又會因為沒有工作好做, 再度進入閒置狀態, 等待下一次機會再一起來碰一次運氣...

就這一行, 花了最多篇幅來說明, 因為它最抽象. 說明這段的目的, 如果你的 ThreadPool 要更進階一點, 如果你想要改用先排隊先贏的策略, 把 WaitHandle 的型別改成 AutoResetEvent 就好. 如果你希望根據工作的特性來微調每個 thread 的 priority, 你就必需用 ManualResetEvent.

好, 沒想到一百行左右的 SimpleThreadPool 有這麼多東西可以寫, 完整的 code 我直接貼在底下, 歡迎引用. 好用的話記得給個回應. 要用在你的 project 也歡迎, 只要禮貌性的支會我一聲. 讓我知道我寫的 code 被用在什麼地方就好. 寫到這裡總算告一段落. 謝謝收看 [:D]

 

--
完整的 SimpleThreadPool.cs 原始碼:

 

   1:  using System;
   2:  using System.Collections.Generic;
   3:  using System.Text;
   4:  using System.Threading;
   5:  using System.Diagnostics;
   6:   
   7:  namespace ChickenHouse.Core.Threading
   8:  {
   9:      public class SimpleThreadPool : IDisposable
  10:      {
  11:          private List<Thread> _workerThreads = new List<Thread>();
  12:   
  13:          private bool _stop_flag = false;
  14:          private bool _cancel_flag = false;
  15:   
  16:          private TimeSpan _maxWorkerThreadTimeout = TimeSpan.FromMilliseconds(3000);
  17:          private int _maxWorkerThreadCount = 0;
  18:          private ThreadPriority _workerThreadPriority = ThreadPriority.Normal;
  19:   
  20:          private Queue<WorkItem> _workitems = new Queue<WorkItem>();
  21:          private ManualResetEvent enqueueNotify = new ManualResetEvent(false);
  22:   
  23:          public SimpleThreadPool(int threads, ThreadPriority priority)
  24:          {
  25:              this._maxWorkerThreadCount = threads;
  26:              this._workerThreadPriority = priority;
  27:          }
  28:   
  29:          private void CreateWorkerThread()
  30:          {
  31:              Thread worker = new Thread(new ThreadStart(this.DoWorkerThread));
  32:              worker.Priority = this._workerThreadPriority;
  33:              this._workerThreads.Add(worker);
  34:              worker.Start();
  35:          }
  36:   
  37:          public bool QueueUserWorkItem(WaitCallback callback)
  38:          {
  39:              return this.QueueUserWorkItem(callback, null);
  40:          }
  41:   
  42:          public bool QueueUserWorkItem(WaitCallback callback, object state)
  43:          {
  44:              if (this._stop_flag == true) return false;
  45:   
  46:              WorkItem wi = new WorkItem();
  47:              wi.callback = callback;
  48:              wi.state = state;
  49:   
  50:              if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();
  51:   
  52:              this._workitems.Enqueue(wi);
  53:              this.enqueueNotify.Set();
  54:   
  55:              return true;
  56:          }
  57:   
  58:          public void EndPool()
  59:          {
  60:              this.EndPool(false);
  61:          }
  62:   
  63:          public void CancelPool()
  64:          {
  65:              this.EndPool(true);
  66:          }
  67:   
  68:          public void EndPool(bool cancelQueueItem)
  69:          {
  70:              if (this._workerThreads.Count == 0) return;
  71:   
  72:              this._stop_flag = true;
  73:              this._cancel_flag = cancelQueueItem;
  74:              this.enqueueNotify.Set();
  75:   
  76:              do
  77:              {
  78:                  Thread worker = this._workerThreads[0];
  79:                  worker.Join();
  80:                  this._workerThreads.Remove(worker);
  81:              } while (this._workerThreads.Count > 0);
  82:          }
  83:   
  84:          private void DoWorkerThread()
  85:          {
  86:              while (true)
  87:              {
  88:                  while (this._workitems.Count > 0)
  89:                  {
  90:                      WorkItem item = null;
  91:                      lock (this._workitems)
  92:                      {
  93:                          if (this._workitems.Count > 0) item = this._workitems.Dequeue();
  94:                      }
  95:                      if (item == null) continue;
  96:   
  97:                      try
  98:                      {
  99:                          item.Execute();
 100:                      }
 101:                      catch (Exception)
 102:                      {
 103:                          //
 104:                          //  ToDo: exception handler
 105:                          //
 106:                      }
 107:   
 108:                      if (this._cancel_flag == true) break;
 109:                  }
 110:   
 111:                  if (this._stop_flag == true || this._cancel_flag == true) break;
 112:                  if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
 113:                  break;
 114:              }
 115:   
 116:              this._workerThreads.Remove(Thread.CurrentThread);
 117:          }
 118:   
 119:          private class WorkItem
 120:          {
 121:              public WaitCallback callback;
 122:              public object state;
 123:   
 124:              public void Execute()
 125:              {
 126:                  this.callback(this.state);
 127:              }
 128:          }
 129:   
 130:          public void Dispose()
 131:          {
 132:              this.EndPool(false);
 133:          }
 134:      }
 135:  }


12/17/2007 2:12:00 AM

ThreadPool 實作 #2. 程式碼 (C#)

Microsoft.NET | Threading | 作業系統 | 技術隨筆 Facebook Share

既然上一篇都把 pseudo code 寫出來了, 現在就可以開始來寫真正的 Thread Pool 了. 開始之前, 我先把目標定一下. 這次寫的 Thread Pool 必需俱備這些能力:

  1. 要能由使用者控制 thread pool 的組態:
    - worker thread 數量上限
    - worker thread 優先權
    - thread idle timeout 時間 (超過 idle timeout, 代表 thread 是宂員, 可以下台了)
    - job queue 安全範圍 (超過代表需要找幫手 - 建立新的 worker thread)
  2. thread pool 在 job queue 超過安全範圍時, 要能動態建立新的 thread 來消化 queue 裡的工作
  3. worker thread 在 idle 時間超過 idle timeout 時, 則這個 worker thread 就要被回收
  4. 簡單的同步機制, 要能等待 thread pool 處理完所有的 job.
  5. 如果有多個 worker thread 要搶同一個 job 來執行, 要由 OS 決定, 不要由 thread pool 自己決定

每次在寫這些描述, 都會覺的怎麼寫起來比 code 還多... @_@, 沒錯, code 短到我可以直接貼上來, 不需要附檔案.. 我會把完整的 code 貼在最下方. 其它說明的部份只會貼片段.

首先, 先來決定 SimpleThreadPool 的 class define 為何. 依照需求及我希望它用起來的樣子, 為:

   1:      public class SimpleThreadPool : IDisposable
   2:      {
   3:          public SimpleThreadPool(int threads, ThreadPriority priority)
   4:          {
   5:          }
   6:   
   7:          public bool QueueUserWorkItem(WaitCallback callback)
   8:          {
   9:          }
  10:   
  11:          public bool QueueUserWorkItem(WaitCallback callback, object state)
  12:          {
  13:          }
  14:   
  15:          public void EndPool()
  16:          {
  17:          }
  18:   
  19:          public void CancelPool()
  20:          {
  21:          }
  22:   
  23:          public void EndPool(bool cancelQueueItem)
  24:          {
  25:          }
  26:   
  27:          private void DoWorkerThread()
  28:          {
  29:          }
  30:   
  31:          public void Dispose()
  32:          {
  33:              this.EndPool(false);
  34:          }

這個 ThreadPool 我希望它用起來像這樣, 貼一段理想中的用法 sample code:

   1:              SimpleThreadPool stp = new SimpleThreadPool(2, System.Threading.ThreadPriority.BelowNormal);
   2:   
   3:              for (int count = 0; count < 25; count++)
   4:              {
   5:                  stp.QueueUserWorkItem(
   6:                      new WaitCallback(ShowMessage),
   7:                      string.Format("STP1[{0}]", count));
   8:                  Thread.Sleep(new Random().Next(500));
   9:              }
  10:              Console.WriteLine("wait stop");
  11:              stp.EndPool();
把 ThreadPool 想像成一個服務櫃台, 很多人排隊等著處理. 因此整個實作會像是個工作的佇列 (job queue), 只要把你的工作放到 queue 裡 (排隊), 而服務人員 (worker thread) 就會一個一個的處理. 最後你可以決定要把所有工作做完才收攤 (呼叫 EndPool( ), 會 blocked 直到工作清光), 或是決定掛牌 "明日請早" (呼叫 CancelPool( )), 只把作到一半的工作處理掉, 剩下還在排隊的改天再來.

整個實作的關鍵部份是在 private void DoWorkerThread(), 裡面寫的 code 就是每一個 worker thread 要執行的所有內容. 補上實作的 code:

   1:   
   2:  private void DoWorkerThread()
   3:  {
   4:      while (true)
   5:      {
   6:          while (this._workitems.Count > 0)
   7:          {
   8:              WorkItem item = null;
   9:              lock (this._workitems)
  10:              {
  11:                  if (this._workitems.Count > 0) item = this._workitems.Dequeue();
  12:              }
  13:              if (item == null) continue;
  14:   
  15:              try
  16:              {
  17:                  item.Execute();
  18:              }
  19:              catch (Exception)
  20:              {
  21:                  //
  22:                  //  ToDo: exception handler
  23:                  //
  24:              }
  25:   
  26:              if (this._cancel_flag == true) break;
  27:          }
  28:   
  29:          if (this._stop_flag == true || this._cancel_flag == true) break;
  30:          if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
  31:          break;
  32:      }
  33:   
  34:      this._workerThreads.Remove(Thread.CurrentThread);
  35:  }

每個 worker thread 就只作很簡單的一件事, 就是進入無窮迴圈, 只要開始上班就不段的接工作來處理, 一直到下班為止. 整個最外層的 while loop 就是指這部份. 離開 loop 後就代表這個 worker thread 該下班了.

迴圈內也很簡單, 上工的第一件事就是看 job queue 裡有沒有工作要做? 有就 dequeue 一個來處理, 一直重複到 job queue 空了為止, 或是直到老闆下令關店 (_cancel_flag 為 true).

無論是要關店或是工作做完了, 流程會跳離 line 6 ~ 27 這個 while loop. 後序的關鍵在 line 30:

   1:  if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;

呼叫 WaitHandle 的 WaitOne( ) method, 會讓 worker thread 進入 blocked 狀態. 直到被叫醒為止 (叫醒它的 code 寫在 add queue 裡), 或是 idle timeout 時間到了. .NET API WaitHandle.WaitOne( ) 提供 option 指定 timeout 時間, 至於是被叫醒的 or 時間到了自己醒來, 就靠 return value 來判定. 以這段 code 來看, 被叫醒 (return true) 代表有新工作進來, 就執行 continue 指令, 繼續到 job queue 拿新的工作繼續努力, 如果是睡太飽自己醒的, 就執行 break, 準被收拾東西下班去...

整個 worker thread 的生命周期就是靠這段 code 來運作. 接下來看一下如何把 job 加進來:

 
   1:  private List<Thread> _workerThreads = new List<Thread>();
   2:  private Queue<WorkItem> _workitems = new Queue<WorkItem>();
   3:  private ManualResetEvent enqueueNotify = new ManualResetEvent(false);
   4:   
   5:  public bool QueueUserWorkItem(WaitCallback callback, object state)
   6:  {
   7:      if (this._stop_flag == true) return false;
   8:   
   9:      WorkItem wi = new WorkItem();
  10:      wi.callback = callback;
  11:      wi.state = state;
  12:   
  13:      if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();
  14:   
  15:      this._workitems.Enqueue(wi);
  16:      this.enqueueNotify.Set();
  17:   
  18:      return true;
  19:  }
 
扣掉一大半準備 WorkItem 的 code 之外, 剩下的就是把 workitem 加到 queue 裡了. 兩個關鍵的地方是:
   1:  if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();

如果 job queue 堆的工作超過 0 個, 而總共的 worker thread 數量還沒超過上限, 就呼叫 CreateWorkerThread( ) 再叫一個 worker thread 來幫忙.

line 14 把 work item 加到 queue 之後, line 15 就緊接著呼叫 WaitHandle.Set( ), 通知所有正卡在 WaitOne( ) 睡覺中的 worker thread 該醒來工作了. 其實到這裡, thread pool 主要結構都說明完了, 剩下的都是細部實作, 比如如何封裝 job 的物件, 如何得知共有幾個 worker thread 等等, 這些直接看 code 比較快我就不多說明了. 搭配前一篇, 提到有各種 synchrinization 機制可以使用, 這裡我用的是 ManualResetEvent, 為什麼要挑這個? 先弄清楚觀念上的問題: 假設有五個 worker thread 都睡著等待新的工作進來, 這時只有一個新的工作進來, 到底是誰該醒來作事? 是由誰決定?

說明起來又是一大篇了... 改寫第三篇再繼續吧!



12/12/2007 11:17:00 AM

平行處理的技術演進

Microsoft.NET | Threading | 作業系統 | 技術隨筆 Facebook Share

現在 CPU 廠商大打 "多核心" 的口號, 讓大家都知道多核心的好處了, 不過每個評論的人也都會補上一句, "要有專為多核心設計的軟體才能發恢效能". 到底什麼叫作專為多核設計的軟體?

簡單的說, 就是程式不能再以單一流程來思考, 必需引用平行處理的概念. 就像工作分派一樣, 有十個人幫你做事, 一定比一個人好. 不過這也是考驗你分派及管理的能力. 做的不好, 可能工作還是都只有一個人在做, 另外九個在偷懶, 更糟的是還造成溝通的問題, 比只有一個人還糟.

在程式設計的領域裡, 實現平行處理, 它的困難有幾個:

  1. 並行的流程控制
  2. 多個程序之間的資料交換
  3. Critial Section 問題 (某些絕對不能同時執行的程式片段)
  4. 資料 Lock 及 Racing condition 的問題

早期的 Unix 提供的 fork( ) 就是個典型的例子, 呼叫後有兩份一模一樣的程式一起執行, 你要自己想辦法分出誰是誰, 然後讓它們各自執行. 兩個程式怎麼溝通? 只能靠 IPC (Inter-Process Communication), 方式不外呼開 socket 或是 share memory, 互相等待要靠 signal( )... 總之你大概會有 80% 以上的精力是在解決這些問題, 不是在解決你要處理的問題.

後來較新的 OS 紛紛引入了 thread 的關念, 解決了部份 IPC 問題, 其它的還是一樣困難. 直到 Java 出來, 寫 multi-threading 程式就簡單多了. 到了 .net 3.5, 又更進一步, 就是我這篇要講的主題.

在 MSDN Magazine 看到一篇文章, 覺的還不錯, 就貼上來跟大家分享一下心得. 原文在此:
為多重核心電腦最佳化 Managed 程式碼
http://msdn.microsoft.com/msdnmag/issues/07/10/Futures/default.aspx?loc=zx

細節我就不多討論了, 那篇文章裡面都說明的非常清楚, 很棒的一篇文章. 主要的重點是, 即使用了 Java / .NET 這樣的開發環境, 你仍然要面對許多 threading 的問題, 像是 thread 如何開始, 如何結束, 這個 thread 如何去通知另一個 thread 完成的問題. 但是大部份的情況下, 我只不過是有一堆工作, 想要丟給電腦處理, 最好就是所有可用 CPU 通通叫來幫忙...

這篇文章介紹的 TPL ( Task Parallel Library ) 很巧妙的利用 delegate, 把整套 Library 包裝的像一個 for loop 一樣簡單. 基本的觀念就是, 只要用它提供的類似 for loop 寫法改寫你的程式, 原本 loop 裡要執行 100 次的工作, 現在就會自動的分配到你所有的 CPU 一同執行. 聽起來很酷, 真的用起來也是如此. 我簡單貼一下內文兩段 sample code:

   1:  // 一般的迴圈
   2:  for (int i = 0; i < 10; i++) {
   3:      a[ i ] = a[ i ] * a[ i ];
   4:  }
   5:   
   6:  // 改用 TPL 的迴圈
   7:  Parallel.For(0, 100, delegate(int i) {
   8:      a[ i ] = a[ i ] * a[ i ];
   9:  });

上面兩段 code 做的事情都一樣, 就是把陣列 a 的內容, 每一筆都算平方, 再寫回來. 差別在於第一段程式會在同一個 thread 裡依序完成每一筆計算, 而第二個例子則利用 anonymous delegate, 讓 code 看起來像迴圈, 實際上每圈都是執行一次 delegate. 而這個 delegate 會自動透過 task manager 分配到合適的 thread 執行. 它跟 thread pool 有許多不同, 文章內有一些說明... 如此一來就能享用的到多核心 CPU 的好處, 你的每一個核心都會被充份的利用.

其實這樣的作法, 並不是 Microsoft 或是 .NET 特有的創新, 早在更重視效能的 C/C++ 就有了. Intel 就大力的支持了這個 open source project: http://threadingbuildingblocks.org/ 裡面提供了大量的 C++ template, 也是用類似的方式替 C++ 加入了平行處理的支援.

ZD Net 上面有一系列的 video (http://www.zdnet.com.tw/white_board/intel/video-8.htm), 講的相當不錯, 我很認同裡面的幾個觀念, 主講人 James Reinders 提到, 平行處理你第一個應該要想到的是函式庫, 或是編譯器等等, 讓你的程式不自覺就能自動享用到平行處理的好處, 第二是用這類 library, 針對各種的 loop 去調整, 讓他能適用平行處理, 最後也是最不建議的作法, 才是大家常聽到的 multi-threading. 原因很簡單, threading 必需是你設計軟體架構就考慮進去的東西, 相對之下開發不易, 效能也不見得比較好, 更糟的是你可能會設計用 4 threads 來處理問題, 結果就是你的程式在超過四核的系統上就沒有明顯的效能增強了.

昨天晚上仔細的看完這幾篇文章, 果然科技的進步真是神速啊, 過去寫到翻掉的 Multi-Process 程式, 如今一個 For Loop 就解決掉, 沒走過這一段的人應該不能體會吧. 不過也因為這樣, 更能體會這些技術的價值在那裡. TPL 真的是個好東西, 強力推薦大家學一學!!

--

文中題到的 TPL 已經有 Tech Preview 可以下載了. 感謝 Unicorn 提供資訊.
http://www.microsoft.com/downloads/details.aspx?FamilyID=e848dc1d-5be3-4941-8705-024bc7f180ba&DisplayLang=en

--

這篇是寫了貼在別的地方, 當然自己的 blog 也要貼一份... [H]






精選文章

RUN! PC 文章及範例下載
2008/11. 生產線模式的多執行緒應用
2008/09. 用ThreadPool發揮CPU運算能力
2008/06. SEMAPHORE在ASP.NET的應用
2008/04. 以ASP.NET開發同步WEB應用程式

如何學好 "寫程式" 系列
#1. 該如何學好 "寫程式" ??
#2. 為什麼 programmer 該學資料結構 ??
#3. 進階應用 - 資料結構 + 問題分析
#4. 你的程式夠 "可靠" 嗎?

#5. 善用 TRACE / ASSERT

安德魯是誰?

Andrew Wu | Create Your Badge

我喜歡鑽研物件導向、軟體工程及作業系統等相關技術。我會在這裡發表我的研究心得,也當作我自己的學習筆記。


Recent comments

Comment RSS