4/5/2010 3:31:00 AM

[RUN! PC] 2010 四月號 - 生產者vs消費者– 執行緒的供需問題

Microsoft.NET | RUNPC | Threading | 技術隨筆 | 我的作品 | 物件導向

image 

 

隔了足足一年,趁著過年,生出第五篇了 :D,再次感謝編輯賞光啦,願意刊登我寫的題材...

這篇 [生產者/消費者] 其實是延續上一篇 [生產線模式] 而來的,生產線模式,是利用 PIPE 的方式,把工作切成各個階段同時進行。而生產者消費者,則是去探討兩個階段之間如何做好進度的協調。這篇除了講講處理的原則之外,也實作了BlockQueue來簡化這個問題。

其實更漂亮的用法,是我在 MSDN Magazine 上看到的 BlockingStream, 直接把它作成 System.IO.Stream 的衍生類別。像那些包裝成 Stream 的資料處理 (像是壓縮、加密、或是 Socket 這類),都很適合。不過一般情況下不是所有應用都能套上 Stream 的模式,因此就有了把它包裝成 Queue 的念頭,也就是這篇寫的 BlockQueue 的應用。

相關的東西其實過去也寫了幾篇,只不過 BLOG 寫的都比較瑣碎,都是針對特定主題較深入的討論,而投稿的文章就會比較完整的介紹原理及作法等等,細節就沒辦法兼顧了。有興趣的讀者可以參考一下我部落格的相關文章 :D   這期的範例程式可以在底下下載,相關的 LINK 也列在底下。

再次謝謝各位支持啦 :D


MSDN Magazine 閱讀心得: Stream Pipeline
RUN!PC 精選文章 - 生產線模式的多執行緒應用
[RUN! PC] 2008 十一月號 生產線模式的多執行緒應用
生產者 vs 消費者 - BlockQueue 實作



9/19/2009 3:58:22 PM

[設計案例] 生命遊戲 #4, 有效率的使用執行緒

Microsoft.NET | 543 | C# | Threading | 我的作品 | 技術隨筆 | 物件導向

原本這篇不講執行緒,要直接跳到 OOP 多型的應用... 不過看一看 #3 自己寫的程式,實在有點看不下去... 30x30 的大小,程式跑起來就看到 903 條執行緒在那邊跑... 而看一下 CPU usage, 只有 5% 不到... 這實在不是很好看的實作範例,如果這是線上遊戲的 SERVER 程式,裡面的每個人,每個怪物等等都用一條專用的執行緒在控制他的行為的話,我看這遊戲不用太多人玩,SERVER 就掛掉了吧! 因此要繼續更貼近實際的生命模擬遊戲前,我們先來解決效能的問題,所以多安插了這篇進來 :D

前一篇 (#3) 的主題是把生命的進行,從被動的在固定時間被喚醒 (callback) 的作法,改成主動的在指定時間執行 (execute)。 想也知道,現實世界的生物都是 "主動" 的,後面的作法比較符合 OOP 的 "模擬世界,加以處理" 的精神。但是,一個小程式就吃掉 900 條執行緒,是有點過頭了。不知道還有沒有人記得,我騙到獎品的這個程式... 很另類的用 yield return 來解決類似問題的作法... 藉著 compiler 很雞婆的把單一流程翻成數段可以切開執行的邏輯...,正好拿來利用一下,替我們把一連串連續的邏輯切段,以便利用多執行緒來處理。我的想法是這樣,原程式是用個迴圈,作完該作的事,就休息 (sleep) 一段時間。而新的寫法,我打算用 yield return new TimeSpan(…) 來取代 Thread.Sleep(…)。每個 Cell內部的程式結構修改不大,不過對於 GameHost 就是個挑戰了... 來看看修改前及修改後的程式碼:

用 yield return TimeSpan 來取代 Thread.Sleep( ) 的作法[copy code]
   1:  // 修改前
   2:  // 使用 Thread.Sleep( ) 來控制時間
   3:  public void WholeLife(object state)
   4:  {
   5:      int generation = (int)state;
   6:      for (int index = 0; index < generation; index++)
   7:      {
   8:          this.OnNextStateChange();
   9:          Thread.Sleep(_rnd.Next(950, 1050));
  10:      }
  11:  }
  12:  //
  13:  //
  14:  //
  15:  // 修改後
  16:  // 使用 yield return new TimeSpan( ) 來控制時間
  17:  public IEnumerable<TimeSpan> WholeLife(object state)
  18:  {
  19:      int generation = (int)state;
  20:      for (int index = 0; index < generation; index++)
  21:      {
  22:          this.OnNextStateChange();
  23:          yield return TimeSpan.FromMilliseconds(_rnd.Next(950, 1050));
  24:      }
  25:      yield break;
  26:  }

 

別想的太美,只改這樣,程式是不會動的... 修改過之後,麻煩的地方會在 GameHost. 因為整個 GameHost 的邏輯都反過來了。原本是 GameHost 只要放著那九百條執行緒自生自滅,它只要不斷的刷新畫面就好了。現在它則得用 foreach(…) 去詢問:

"大爺,這次您要休息多久?"

接到 yield return 傳回的 TimeSpan 物件 (代表它要休息多久後,繼續下一個動作) 後,經過這段時間,GameHost 就要再去叫醒 cell, 然後再詢問一次:

"大爺,這次您要休息多久?"

關鍵就在於 GameHost 如何能透過少量的 thread 來伺後這些大爺,而不是像 #3 的程式一樣,每個大爺都用一條專屬的 thread… 要共用執行緒,就要先想辦法把工作切碎,這是基本法則。如果你希望你的生命遊戲程式不只是作業的話,那麼效能跟即時回應的問題是必需要考慮的。在動手改寫 GameHost 程式之前,先來分析一下改寫的目標有那些:

目標是要達到像 #2 範例一樣的效果,但是要用更有效率的方式。

目標很清楚,再來就看看有什麼手段可以用了。第一個是過量的執行緒,應該要想辦法改用執行緒集區。因為 #2 用了高達 900 條執行緒,不過整體 CPU USAGE 不到 5%,大部份的執行緒都在閒置狀態。如果能想辦法把這些運算丟到執行緒集區,由集區動態管理會有效率的多。

第二,就是把原本的 Thread.Sleep(ts) 改成 yield return ts 後,原本每個 thread 自己睡覺的機制,就要改成 cell 各自回報 game host 它想要睡多久,然後由 game host 統一在時間到時叫醒它。由於一次有多個 cell 同時在運作,因此我們需要一個簡單的排程器,作法像這樣:

  1. 建立一個時間表,依照時間順序,把每個 cell 預計要被叫醒的時間標上去。
  2. 時間到了之後,就去呼叫該 cell 的 OnNextStateChangeEx(),同時取得該 cell 下次要喚醒的時間,再標到時間表上
  3. GameHost就不斷的替每個 cell 重複 (1) (2) 的動作..
  4. 同時另外用一條獨立的執行緒,作畫面更新的動作。

嗯,要處理的方式越來越清楚了。剩下的是 "時間表" 要用什麼型式來表現? 我的選擇是,我希望它是個 ToDo List, 會幫我排好時間,我只要把工作標上時間,丟進 ToDo List, 然後 ToDo List 只要能忠實的回報給我還有沒有排定的工作? 如果有,下一個要處理的工作是那一個? 什麼時後處理?

它的用法只有丟工作進去,跟拿工作出來,因此我設計它的公開介面長這個樣子:

CellToDoList 的類別設計[copy code]
   1:  public class CellToDoList
   2:  {
   3:      public void AddCell(Cell cell) {...}
   4:      public Cell GetNextCell() {...}
   5:      public Cell CheckNextCell() {...}
   6:      public int Count {get;}
   7:  }

 

裡面的實作,我就不多說了。我是把它當成 QUEUE 在設計,唯一的差別是,放進 QUEUE 的東西會先經過排序,因此不見得是 "First In First Out" 這種典型的貯列,而是會以 Cell 上標示的時間為準,依序 Out …。實作起來很簡單,用現成的 SortedList 當內部的儲存方式,加上基本的 lock 機制來確保它是 thread safe 的就夠了。

好,這些雞絲都準備好之後,就可以來打造我們的新版 GameHost 了。來看看 Code:

改用 ThreadPool / CellToDoList 的新版 GameHost:[copy code]
   1:  static CellToDoList _cq;
   2:  static void _YieldReturnGameHost(string[] args)
   3:  {
   4:      int worldSizeX = 30;
   5:      int worldSizeY = 30;
   6:      World realworld = new World(worldSizeX, worldSizeY);
   7:      _cq = new CellToDoList();
   8:      // init threads for each cell
   9:      for (int positionX = 0; positionX < worldSizeX; positionX++)
  10:      {
  11:          for (int positionY = 0; positionY < worldSizeY; positionY++)
  12:          {
  13:              Cell cell = realworld.GetCell(positionX, positionY);
  14:              cell.OnNextStateChangeEx();
  15:              _cq.AddCell(cell);
  16:          }
  17:      }
  18:      // 啟動定期更新畫面的執行緒
  19:      Thread t = new Thread(RefreshScreen);
  20:      t.Start(realworld);
  21:      while (_cq.Count > 0)
  22:      {
  23:          Cell item = _cq.GetNextCell();
  24:          if (item.NextWakeUpTime > DateTime.Now)
  25:          {
  26:              // 時間還沒到,發呆一下等到時間到為止
  27:              Thread.Sleep(item.NextWakeUpTime - DateTime.Now);
  28:          }
  29:          ThreadPool.QueueUserWorkItem(RunCellNextStateChange, item);
  30:      }
  31:  }
  32:  private static void RunCellNextStateChange(object state)
  33:  {
  34:      Cell item = state as Cell;
  35:      TimeSpan? ts = item.OnNextStateChangeEx();
  36:      if (ts != null) _cq.AddCell(item);
  37:  }
  38:  private static void RefreshScreen(object state)
  39:  {
  40:      while (true)
  41:      {
  42:          Thread.Sleep(500);
  43:          (state as World).ShowMaps("");
  44:      }
  45:  }

 

GameHost 的工作很明確,一開始 (line 18 ~ 20) 就把更新畫面的動作完全交給另一個執行緒,之後就專心處理 ToDoList 內的工作了。

接著後面的 while loop (line 21 ~ 30) 則是很單純的從 ToDoList 裡取出下一個要要動作的 Cell, 如果時間還沒到就 Sleep 等一下它。執行完後會再詢問下一次是什麼時後,同時再把他加到 ToDoList 內等待下一次輪到他時繼續。

這次的程式我沒有設定停止的條件,因此你會看到程式會不斷的執行下去。程式執行起來,結果跟 #3 沒什麼不同,畫面上的每個細胞會照著題目的規則生長或死亡,不同的是 #3 的 Game Host 需要用到 903 條執行緒,而這版的 Game Host 只要 9 條執行緒...

image

 

其實,以這樣的範例題,我大可以不用顧慮到效能的問題,不過就是示範程式怎麼寫嘛。不過,我的目標如果只是訂在怎麼寫這練習題,大可以 GOOGLE 一下就有一堆作業解答了 :D。我的目標是要展示一下,該如何開發這樣的 GameHost ? 這樣的程式,是大部份的遊戲的基礎,尤其是像線上遊戲或是 facebook 這類互動遊戲的基礎。有了像樣的 Game Host 之後,接下來就把目標放在如何建立多樣的生物,一起放在這世界裡面生活了。接下來就會大量運用到 OOP 的特點 (對,就是上一篇預告的...) 繼承及多型。

有沒有人覺的,這種程式越寫越像 Matrix (就是駭客任務裡的 "母體") 了? 裡面活著的東西其實都在我的掌控之下... =_= 哈哈... 未完待續,請期待續集 :D。

--
範例程式:



9/15/2009 2:26:31 AM

[設計案例] 生命遊戲#3, 時序的控制

Microsoft.NET | C# | Threading | 作業系統 | 技術隨筆 | 物件導向

原本的範例,其實有些盲點,不知各位有沒看到? 一樣的起始狀態,一樣的遊戲規則,你不一定會得到一樣的結果。為什麼? 因為這會跟你程式 SCAN 的順序有關。怎麼說? 因為到目前為只,整個遊戲就好像下棋一樣,是 "回合制",我下完了換你... 一路一直輪下去。

這時先下後下就會影響結果了。現實世界的生命不是這樣的啊... 不知有沒有人玩過早期的太空戰士 (Final Fantasy) 系列遊戲? 當年 FF 有個很重要的突破,就是把 RPG 從傳統的 "回合制" 改成即時戰鬥... 每個人都有個倒數的碼錶,數到 0 你就可以發動下一次的攻擊... 這樣才接近現實世界啊。套用到我們的生命遊戲,這次我們想作的改變,就是把程式改成這種模式。

因此來調整一下規則,每個細胞每隔 1000ms 後會進到下一個狀態。不過生命總是沒有完全一樣的,因此每個細胞進到下一個狀態的時間差,都會有 10% 的誤差 (也就是 950ms ~ 1050ms 之間的時間都有可能)。其它規責則維持不變,來看看程式該怎麼改寫。

這種 "即時制",是比較合乎現實的情況的,如果未來你想發展到像 facebook 上的那些小遊戲,或是其它線上遊戲一樣的話, "回合制" 是決對行不通的... 這時,我們可以想像,每個細胞都有自己的執行緒,每換過一次狀態後就 Sleep() 一段時間,醒來再換到下一次狀態... 一直到指定的世代 (generation) 到達為止。

來看一下改版過的程式。我們先不動原本的 Cell, 只追加一個 method: WholeLife( ), 呼叫後就會一直更新這個細胞的狀態,直到它結束為止 (不是死掉喔,是 generation 到達)。而整個世界的所有細胞,都是獨立的個體,都有個專屬的執行緒在運作...。這時 Game Host 就得換個方式來讓這些細胞過日子 (執行),同時 Game Host 好像有個人造衛星一樣,不斷的在上空拍照來更新畫面,而完全不影響這些細胞的生命進行。

來看一下改寫過的 Cell 追加的 method:

Cell 追加的 method[copy code]
   1:  public void WholeLife(object state)
   2:  {
   3:      int generation = (int)state;
   4:      for (int index = 0; index < generation; index++)
   5:      {
   6:          this.OnNextStateChange();
   7:          Thread.Sleep(_rnd.Next(950, 1050));
   8:      }
   9:  }

 

改變不大,只是多個簡單的迴圈,跟 sleep 來控制時間而已。再來看看 Game Host 要怎麼改:

多執行緒版本的 Game Host[copy code]
   1:  static void Main(string[] args)
   2:  {
   3:      int worldSizeX = 30;
   4:      int worldSizeY = 30;
   5:      int maxGenerationCount = 100;
   6:      World realworld = new World(worldSizeX, worldSizeY);
   7:      // init threads for each cell
   8:      List<Thread> threads = new List<Thread>();
   9:      for (int positionX = 0; positionX < worldSizeX; positionX++)
  10:      {
  11:          for (int positionY = 0; positionY < worldSizeY; positionY++)
  12:          {
  13:              Cell cell = realworld.GetCell(positionX, positionY);
  14:              Thread t = new Thread(cell.WholeLife);
  15:              threads.Add(t);
  16:              t.Start(maxGenerationCount);
  17:          }
  18:      }
  19:      // reflesh maps
  20:      do
  21:      {
  22:          realworld.ShowMaps("");
  23:          Thread.Sleep(100);
  24:      } while (IsAllThreadStopped(threads) == false);
  25:      // wait all thread exit.
  26:      foreach (Thread t in threads) t.Join();
  27:  }
  28:  private static bool IsAllThreadStopped(List<Thread> threads)
  29:  {
  30:      foreach (Thread t in threads)
  31:      {
  32:          if (t.ThreadState != ThreadState.Stopped) return false;
  33:      }
  34:      return true;
  35:  }

 

其實這卅幾行 code, 大都花在控制執行緒上面,有興趣的讀者可以翻翻我之前寫的那系列文章,我就不多作說明了。調整之後,這個世界變的更不可測了,一樣的起始環境,連上帝 (在這模擬世界裡,我就是上帝 XD) 都無法預測下一秒會發生什麼事...

image

 

感覺就好像看電視一樣。畫面不斷的在閃動,而畫面裡的細胞會不規責的跳動,不像上一版程式一樣,每刷一次就變一次那樣的枯燥無聊。如果畫面呈現的地方再多用點心思,就可以弄的像卡通一樣,每個細胞都各自用自己的步調在活著...

到這裡,如何? 應該沒有人把作業寫到這個樣子了吧 XD (就說別抄我的程式去交作業了)。不適當的利用執行緒,也做的到類似的結果。不過,你花費的代價會很大,因為你的程式得自己來做 context switch (這些是 OS + thread scheduler 會幫你解決掉的,只要你曉得要用 thread)。

接下來下一篇,我們再繼續調整這世界的遊戲規則,加入更多元素進去,看看程式會變怎樣? 多執行緒解決時間的問題了,再來我們要用繼承及多型,讓不同的生命可以在同一個世界下共同生活...  ((待續))

 



9/12/2009 4:09:30 AM

[設計案例] 生命遊戲#1, 前言

Microsoft.NET | C# | Threading | 技術隨筆 | 物件導向

[前言]

好久沒寫點自己覺的有內容的東西了... 最近 code 寫的少,實在沒有什麼了不起的新技術可以分享,而 thread 那種 "古典" 計算機科學的東西也寫的差不多了.. 就懶了起來。

雖然沒新技術好寫,不過老狗玩的把戲還是能榨出點渣的... 很多人都熟新技術,可以寫出很炫的程式,不過也常看到程式的結構真的是亂搞一通的... 所以我打算寫些 [設計案例] 的文章,舉一些我實作過的案例,說明什麼樣的問題可以用什麼方式或技術來解決。其實我想寫的就是像 design patterns 那類的東西,只不過我程度還差的遠,只能稱作 "案例" ... Orz

----------------------------------------------------------------------------------

最近 facebook 上有一些小遊戲,不知道在紅什麼... 突然間大家都在玩,就都是些模擬遊戲,像是開心農場、My FishBowl … 之類的,你要在裡面種東西或養魚,條件充足就會長大,收成等等... 然後透過 Facebook API 可以跟別人互動的遊戲。看到這類的 GAME,不禁想起過去在唸書時,幾個經典的作業題目,其中一個 [生命遊戲] (Game of Life) 就是這種 GAME 的始祖...

在 Wiki 找的到這段介紹:

http://zh.wikipedia.org/zh-hk/%E7%94%9F%E5%91%BD%E6%B8%B8%E6%88%8F

生命遊戲(Game of Life),又稱生命棋,是英國數學家約翰·何頓·康威(John Horton Conway)在1970年發明的細胞自動機(cellular automaton,也翻譯成「格狀自動機」)。

它最初於1970年10月在《科學美國人》(Scientific American)雜誌中馬丁·葛登能(Martin Gardner)的「數學遊戲」專欄出現。

1970… 我還沒出生... Orz, 不過, 這麼一個古老經典的問題,找的到一大堆範例程式,或是作業解答。清一色是用 C 這類配的上它的年紀的程式語言寫的,就算有 JAVA 版,大概也是換湯不換藥... 這四十年程式語言及軟體技術的進步,寫這種程式總該有點改變吧?

這篇我想寫的,就是這樣的問題,配合現在的 .NET / C#,能怎麼寫它? 這年代的軟體開發技術,對這種古典的程式能發揮什麼效益?

(警告: 剛好要交作業的人,可千萬別用我的方法交出去啊... 你的助教看不懂可能會給你零分...)

先找個範例來看看... 為了不讓過多的畫面處理程式碼,干擾到主程式的架構,我特地找了兩個 console based 的範例:

Java 版:
http://tw.myblog.yahoo.com/dust512/article?mid=25&prev=28&next=-1

多語言版 (C, Java, Python, Scala):
http://caterpillar.onlyfun.net/Gossip/AlgorithmGossip/LifeGame.htm

這... 這就是典型的 "Java 版 C 程式碼" 的範例... 用 Java 來寫只寫這樣,有點用牛刀的感覺... 新的開發環境強調這幾項:

  1. 物件導向 (封裝,多型,動態連結... etc)
  2. 多執行緒
  3. 其它語言特色 (這次會講到的是 yield return)

這些技術怎麼套進這程式? 先來看看這遊戲有幾個障礙要克服吧。遊戲的規則簡單明瞭,借轉貼上面第二個範例的說明:

生命遊戲(game of life)為1970年由英國數學家J. H. Conway所提出,某一細胞的鄰居包括上、下、左、右、左上、左下、右上與右下相鄰之細胞,遊戲規則如下:

  1. 孤單死亡:如果細胞的鄰居小於一個,則該細胞在下一次狀態將死亡。
  2. 擁擠死亡:如果細胞的鄰居在四個以上,則該細胞在下一次狀態將死亡。
  3. 穩定:如果細胞的鄰居為二個或三個,則下一次狀態為穩定存活。
  4. 復活:如果某位置原無細胞存活,而該位置的鄰居為三個,則該位置將復活一細胞。

以前我最討厭寫這種程式了,這種程式寫起來就跟 Regexp 一樣,是 "write only” 的 code… 怎麼說? 程式寫好後,可能自己都看不懂了,因為邏輯被切的亂七八糟... GAME 裡可能同時有好幾個細胞,每個都有獨立的規則,不過程式卻是一個主迴圈,每次執行每個細胞的一小段邏輯... 程式的流程就這樣被切碎了... 我打算用C#的 yield return, 解決這邏輯破碎的問題。

第二個障礙,就是這類程式,某種程度都是隨著時間的進行而跑的,比如上面的條件都是 "下一次狀態" … 把每次狀態改變定義一個時間 (比如一秒),這就是個 realtime 的模擬程式了。如果有的細胞是一秒改變一次狀態,有的是兩秒,有的是五秒... 那就傷腦筋了... 你的程式會被切的更破碎... 這些每種細胞特殊的部份,我打算用 OOP 的多型來解決。

最後,這種很明顯是 "並行" 的問題,照道理來說,用多執行緒是最適合的了。不過隨便也有成千上萬個 "細胞" 在成長,每個都來一個 thread 養它,再高級的 server 都撐不住吧? 這邊會來探討一下,怎麼用執行緒相關的技巧,來解決這問題。

 

--------------------------------------------------------------------------------------

寫到這裡,突然覺的這題目好大... Orz, 搞不好這幾篇要撐幾個月才寫的完... 至少有個題材好寫,等到我生出第一個 sample code, 就會有下一篇了... 如果有同好也想試試看的,也歡迎分享看看你的 code… 只不過我沒像 darkthread 有本錢提供獎品... 哈哈 :D



4/17/2009 7:51:00 PM

RUNPC 精選文章 - 運用ThreadPool發揮CPU運算能力

543 | C# | RUNPC | Threading | 我的作品 | 技術隨筆 | Microsoft.NET | [精選文章]

果然這個什麼東西都上網的年代,要三不五時的 GOOGLE 一下自己,才會知道那些網站把你的八卦跟內幕爆了出來... 不過應該沒啥週刊記者對我有興趣吧? 哈哈。在 GOOGLE 自己名字時,倒是意外發現,之前投稿的文章,又有一篇被拿來登在網站上的精選文章了 :D

特此留念一下 :D

http://www.runpc.com.tw/content/main_content.aspx?mgo=176&fid=E02

--

順便整理一下懶人包:

另一篇精選文章 [RUN!PC 精選文章 - 生產線模式的多執行緒應用]

過去投過的系列文章 (multi-threading programming using c#):

2008/11. 生產線模式的多執行緒應用
2008/09. 用ThreadPool發揮CPU運算能力
2008/06. SEMAPHORE在ASP.NET的應用
2008/04. 以ASP.NET開發同步WEB應用程式



1/16/2009 2:10:00 AM

RUN!PC 精選文章 - 生產線模式的多執行緒應用

543 | C# | RUNPC | Threading | 我的作品 | 技術隨筆 | Microsoft.NET | [精選文章]

http://www.runpc.com.tw/content/main_content.aspx?mgo=178&fid=E08

無意間 search 我自己的名字,才發現這篇文章除了投稿到 RUN! PC 之外,原來還有刊在網站上的精選文章啊...

哈哈,暗爽一下,順道貼一下 link, 讓沒看到雜誌的網友們也有機會看一看在下的作品...



11/4/2008 2:11:00 AM

[RUN! PC] 2008 十一月號

Microsoft.NET | RUNPC | Threading | 我的作品 | 技術隨筆

IMG_0208

YA! 第四篇!! :D 還是一樣要先感謝一下編輯賞光,讓我有點空間寫些不一樣的東西。

 

基本的執行緒相關的程式設計跟函式庫,講的差不多了,其實這些也沒什麼好寫的。接下來打算寫一些應用的模式,來談談有那些方法,那些設計方式才能夠有效的發揮多執行緒的優點。看了 .NET Framework 4.0 / Visual Studio 2010 的 ROADMAP,有一大部份的重點擺在平行處理,INTEL年底也要發表四核 + HT 的 CPU ( WINDOWS 會認為有八個處理器 ),軟硬體都備齊了,剩下的就是程式設計師的巧思了。

 

其實之前貼過幾篇類似主題的文章,只是這次把它統合起來介紹一下。生產線模式,如果簡化後就是 [生產者消費者] 的模式,而把它徹底一點的應用,則是上回提到 [Stream Pipeline] ..

這篇也是第一次在雜誌上嘗試說明比較偏設計概念的文章,實作比較少,很怕不合讀者的口味... 應該不會貼了就沒續篇了吧? :P 有買雜誌的記得讀者回函填一下,哈哈,也算是點鼓勵。這次範例程式也是 Console application (我不會寫太炫的程式 :P ),需要的可以點 [這裡] 下載!



10/18/2008 11:53:00 AM

生產者 vs 消費者 - BlockQueue 實作

543 | C# | Threading | 作業系統 | 我的作品 | 技術隨筆 | 物件導向 | Microsoft.NET | [精選文章]

過去寫了好幾篇跟執行緒相關的文章,講的都是如何精確控制執行緒的問題。不過實際上有在寫的人就知道,那些只是 "工具",最重要的還是你該怎樣安排你的程式,讓它能有效率的用到執行緒的好處,那才是重點。大部份能有效利用到多執行緒的程式,大都是大量且獨立的小動作,可以很簡單的撒下去給ThreadPool處理,不過當你的程式沒辦法這樣切,就要想點別的辦法了。

 

開始看 code 前先講講簡單的概念。這篇要講的是另一種模式: "生產者 v.s. 消費者"。這是個很典型的供需問題,唸過作業系統 (Operation System) 的人應該都被考過這個課題吧 @_@。簡單的說如果你的程式要處理的動作可以分為 "生產者" (產生資料,載入檔案,或是第一階段的運算等等) 及消費者 (匯出資料,或是第二階段的運算等等) 這種模式,而前後兩個階段各自又適合用執行緒來加速的話,那你就值得來研究一下這種模式。第一手資料就是去看看作業系統的書,恐龍書足足有一整章在講,足夠你研究了。本篇重點會擺在怎樣用 C# / .NET 實作的部份。

 

舉個具體一點的例子,如果你想寫個程式,從網站下載幾百個檔案,同時要把它們壓縮成一個 ZIP 檔,在過去你大概只能全部下載完之後,再開始ZIP的壓縮動作。第一階段都是 IO (網路) bound 的程式,第二階段則是 CPU bound。如果是完全獨立的兩個程式,很適合擺在一起執行,因為它們需要的資源不一樣,不會搶來搶去。但是就敗在他們要處理的資料是卡在一起的。

 

把這個動作想像成我們有兩組人分別負責下載及壓縮的動作,下載的部份可以多執行緒同時進行沒問題,但是下載好一個檔案,就可以先丟給後面的那組人開始壓縮了,不用等期它人下載完成。如果下載的暫存目錄空間有限,我們甚至可以這樣調整: 當 TEMP 滿了的話,下載動作就暫停,等到 TEMP 裡的東西壓縮好清掉一部份後再繼續。而壓縮的部份則相反,如果 TEMP 已經空了就暫停,等到有東西進來再繼續,直到完成為止。

 

image

前後兩階段該如何利用多執行緒,我就跳過去了,過去那幾篇就足以應付。這種模式的關鍵在於前後兩階段的進度該如何平衡。有些範例是有照規矩的把這模式實作出來,不過... 你也知道,看起來就是像作業的那種,完全不像是可以拿來正規的用途。

 

我認定 "好" 的實作,是像 System.Collections.Generics 之於 DataStructure 那樣,能很漂亮的把細節封裝起來,很容易重複利用的才是我認為好的實作。不能容易的使用,那就只能像作業一樣寫完就丟...。這個問題看過有人用 Semaphore 來做,也是作的很棒,不過我比較推薦的是 QUEUE 的作法。

 

從上圖來看,生產者跟消費者都很簡單,就是過去講的多執行緒或是執行緒集區就搞定,關鍵在於中間的控制。我的想法是把庫存管理的東西實作成佇列 (QUEUE),生產者產出的東西就放到 QUEUE,而消費者就去 QUEUE 把東西拿出來。不過現成的 QUEUE 不會告訴你 QUEUE 滿了,QUEUE 空了也只會丟 EXCEPTION 而以。這次我做了個 BlockQueue 就是希望解決這個問題。

 

我期望這個 QUEUE 能跟一般的 QUEUE 一樣使用,但是要有三個地方不一樣:

  1. 要設定大小限制,當 QUEUE 達到容量上限時 EnQueue 的動作會被暫停 (Block),而不是丟出例外。
  2. QUEUE 已經空了的時後,DeQueue 的動作會被暫停 (Block),而不是丟出例外。
  3. 要多個 QUEUE 關機的動作 (SHUTDOWN),以免生產者都不出貨了,消費者還苦苦的等下去的窘況。

 

先看看這樣的 QUEUE 我希望它怎麼被使用。看一下簡單的範例程式 (主程式,不包含 BlockQueue):

使用 BlockQueue 來實作的生產者/消費者範例[copy code]
        public static BlockQueue<string> queue = new BlockQueue<string>(10);        public static void Main(string[] args)        {            List<Thread> ps = new List<Thread>();            List<Thread> cs = new List<Thread>();            for (int index = 0; index < 5; index++)            {                Thread t = new Thread(Producer);                ps.Add(t);                t.Start();            }            for (int index = 0; index < 10; index++)            {                Thread t = new Thread(Consumer);                cs.Add(t);                t.Start();            }            foreach (Thread t in ps)            {                t.Join();            }            WriteLine("Producer shutdown.");            queue.Shutdown();            foreach (Thread t in cs)            {                t.Join();            }        }        public static long sn = 0;        public static void Producer()        {            for (int count = 0; count < 30; count++)            {                RandomWait();                string item = string.Format("item:{0}", Interlocked.Increment(ref sn));                WriteLine("Produce Item: {0}", item);                queue.EnQueue(item);            }            WriteLine("Producer Exit");        }        public static void Consumer()        {            try            {                while (true)                {                    RandomWait();                    string item = queue.DeQueue();                    WriteLine("Cust Item: {0}", item);                }            }            catch            {                WriteLine("Consumer Exit");            }        }        private static void RandomWait()        {            Random rnd = new Random();            Thread.Sleep((int)(rnd.NextDouble() * 300));        }        private static void WriteLine(string patterns, params object[] arguments)        {            Console.WriteLine(string.Format("[#{0:D02}] ", Thread.CurrentThread.ManagedThreadId) + patterns, arguments);        }
   1:  public static BlockQueue<string> queue = new BlockQueue<string>(10);
   2:  public static void Main(string[] args)
   3:  {
   4:      List<Thread> ps = new List<Thread>();
   5:      List<Thread> cs = new List<Thread>();
   6:      for (int index = 0; index < 5; index++)
   7:      {
   8:          Thread t = new Thread(Producer);
   9:          ps.Add(t);
  10:          t.Start();
  11:      }
  12:      for (int index = 0; index < 10; index++)
  13:      {
  14:          Thread t = new Thread(Consumer);
  15:          cs.Add(t);
  16:          t.Start();
  17:      }
  18:      foreach (Thread t in ps)
  19:      {
  20:          t.Join();
  21:      }
  22:      WriteLine("Producer shutdown.");
  23:      queue.Shutdown();
  24:      foreach (Thread t in cs)
  25:      {
  26:          t.Join();
  27:      }
  28:  }
  29:  public static long sn = 0;
  30:  public static void Producer()
  31:  {
  32:      for (int count = 0; count < 30; count++)
  33:      {
  34:          RandomWait();
  35:          string item = string.Format("item:{0}", Interlocked.Increment(ref sn));
  36:          WriteLine("Produce Item: {0}", item);
  37:          queue.EnQueue(item);
  38:      }
  39:      WriteLine("Producer Exit");
  40:  }
  41:  public static void Consumer()
  42:  {
  43:      try
  44:      {
  45:          while (true)
  46:          {
  47:              RandomWait();
  48:              string item = queue.DeQueue();
  49:              WriteLine("Cust Item: {0}", item);
  50:          }
  51:      }
  52:      catch
  53:      {
  54:          WriteLine("Consumer Exit");
  55:      }
  56:  }
  57:  private static void RandomWait()
  58:  {
  59:      Random rnd = new Random();
  60:      Thread.Sleep((int)(rnd.NextDouble() * 300));
  61:  }
  62:  private static void WriteLine(string patterns, params object[] arguments)
  63:  {
  64:      Console.WriteLine(string.Format("[#{0:D02}] ", Thread.CurrentThread.ManagedThreadId) + patterns, arguments);
  65:  }

 

 

 

 

 

主程式很簡單,你知道怎麼寫多執行緒程式的話那麼一看就懂了。一開始替 Producer / Consumer 各建立三個執行續,而每個 Producer 只作很簡單的事,就是連續生產 30 個字串放到 BlockQueue, 當所有的 Producer thread 都執行完後,會呼叫 queue.Shutdown( ); 通知 QUEUE 已經全部生產完畢。

Consumer 也很簡單,每個 Consumer 只是去 Queue 拿東西出來,顯示在 Console 上。直到 Dequeue 動作失敗,接到 Exception 之後就結束。

要試試生產者/消費者模式的各種狀況,可以試著調整兩者的執行緒數量。舉例來說,調大 Producer 執行緒數量時 (P: 10 / C:5),結果是這樣:

image

Producer 的進度大約就是領先 Consumer 的進度 10 筆資料左右,領先的幅度就暫停了,不會無止境的成長下去。證明卡在 QUEUE 內的數量受到控制。接下來再來看看調高 Consumer 的執行緒數量的結果:

image

好像 iPhone 上市搶購熱潮一樣 @_@,供不應求,Producer 提供的資料馬上被搶走了...。

 

效果不錯,看來這樣的實作有達成它的目的。最後來看的是 BlockQueue 的程式碼:

 

BlockQueue<T> 實作的完整程式碼[copy code]
    public class BlockQueue<T>    {        public readonly int SizeLimit = 0;        private Queue<T> _inner_queue = null;        private ManualResetEvent _enqueue_wait = null;        private ManualResetEvent _dequeue_wait = null;        public BlockQueue(int sizeLimit)        {            this.SizeLimit = sizeLimit;            this._inner_queue = new Queue<T>(this.SizeLimit);            this._enqueue_wait = new ManualResetEvent(false);            this._dequeue_wait = new ManualResetEvent(false);        }        public void EnQueue(T item)        {            if (this._IsShutdown == true) throw new InvalidCastException("Queue was shutdown. Enqueue was not allowed.");            while (true)            {                lock (this._inner_queue)                {                    if (this._inner_queue.Count < this.SizeLimit)                    {                        this._inner_queue.Enqueue(item);                        this._enqueue_wait.Reset();                        this._dequeue_wait.Set();                        break;                    }                }                this._enqueue_wait.WaitOne();            }        }        public T DeQueue()        {            while (true)            {                if (this._IsShutdown == true)                {                    lock (this._inner_queue) return this._inner_queue.Dequeue();                }                lock (this._inner_queue)                {                    if (this._inner_queue.Count > 0)                    {                        T item = this._inner_queue.Dequeue();                        this._dequeue_wait.Reset();                        this._enqueue_wait.Set();                        return item;                    }                }                this._dequeue_wait.WaitOne();            }        }        private bool _IsShutdown = false;        public void Shutdown()        {            this._IsShutdown = true;            this._dequeue_wait.Set();        }    }
   1:  public class BlockQueue<T>
   2:  {
   3:      public readonly int SizeLimit = 0;
   4:      private Queue<T> _inner_queue = null;
   5:      private ManualResetEvent _enqueue_wait = null;
   6:      private ManualResetEvent _dequeue_wait = null;
   7:      public BlockQueue(int sizeLimit)
   8:      {
   9:          this.SizeLimit = sizeLimit;
  10:          this._inner_queue = new Queue<T>(this.SizeLimit);
  11:          this._enqueue_wait = new ManualResetEvent(false);
  12:          this._dequeue_wait = new ManualResetEvent(false);
  13:      }
  14:      public void EnQueue(T item)
  15:      {
  16:          if (this._IsShutdown == true) throw new InvalidCastException("Queue was shutdown. Enqueue was not allowed.");
  17:          while (true)
  18:          {
  19:              lock (this._inner_queue)
  20:              {
  21:                  if (this._inner_queue.Count < this.SizeLimit)
  22:                  {
  23:                      this._inner_queue.Enqueue(item);
  24:                      this._enqueue_wait.Reset();
  25:                      this._dequeue_wait.Set();
  26:                      break;
  27:                  }
  28:              }
  29:              this._enqueue_wait.WaitOne();
  30:          }
  31:      }
  32:      public T DeQueue()
  33:      {
  34:          while (true)
  35:          {
  36:              if (this._IsShutdown == true)
  37:              {
  38:                  lock (this._inner_queue) return this._inner_queue.Dequeue();
  39:              }
  40:              lock (this._inner_queue)
  41:              {
  42:                  if (this._inner_queue.Count > 0)
  43:                  {
  44:                      T item = this._inner_queue.Dequeue();
  45:                      this._dequeue_wait.Reset();
  46:                      this._enqueue_wait.Set();
  47:                      return item;
  48:                  }
  49:              }
  50:              this._dequeue_wait.WaitOne();
  51:          }
  52:      }
  53:      private bool _IsShutdown = false;
  54:      public void Shutdown()
  55:      {
  56:          this._IsShutdown = true;
  57:          this._dequeue_wait.Set();
  58:      }
  59:  }

 

重點只在重新包裝 Queue 的 Enqueue / Dequeue ,及追加的 Shutdown( ) 裡做的執行緒同步機制。在 BlockQueue 尚未 Shutdown 之前,Enqueue / Dequeue 都不會引發 Exception, 取代的是用 ManualResetEvent 的 WaitOne( ) 來暫停這個動作,直到另一端資料準備好為止。

然而當 Shutdown 被呼叫過之後,Queue 就不再接受新的東西被塞進來了,而東西拿光因為不再補貨,所以就維持原本 Queue 的設計扔出 Exception。

 

其實真的要挖的話,這個 Queue 可以進一步的改善,以資料結構來看,這種有固定 SIZE 上限的 QUEUE,最適合用 CircleQueue 來實作了。有興趣的朋友們可以換上回介紹過的 NGenerics 改看看,我就不再示範了。其實還有其它變型,像是 Priority Queue, 進去跟出來的順序不一定一樣,意思是你地位比較高的話是可以 "插隊" 的,後加入 QUEUE 的物件,可以優先被拿出來。這些機制都是可以進一步改善 "生產者/消費者" 模式的方法,有需要的讀者們可以朝這個方向思考看看!

 

這篇只是個開始,運用這種機制,可以進一步延伸出 Pipeline 模式 (生產線),甚至更進一步運用到串流 (Stream) 的應用。運氣好的話下個月應該看的到完整的探討跟解說吧 ...,敬請期待 :D



10/1/2008 10:43:22 PM

得獎了 :D~~~

Microsoft.NET | 543 | C# | Threading | 小技巧 | 安德魯的當年勇 | 我的作品 | 技術隨筆 | 物件導向

 IMG_9142

 

雖然上禮拜就知道了,不過獎品還沒拿到,當然要忍一下再發表... 哈哈!

花了幾個晚上拼了猜數字的程式,運氣不錯,順利拼到冠軍了。除了寫程式,把心得貼到 BLOG 也花了不少時間.. 主要貼的這四篇:

  1. Thread Sync #1. 概念篇 - 如何化被動為主動?
  2. Thread Sync #2. 實作篇 - 互相等待的兩個執行緒
  3. [C#: yield return] #1. How It Work ?
  4. [C# yield return] #2. 另類的應用 - Thread Sync 替代方案

 

蠻有意思的比賽。雖然過去也參加過不少比賽,運氣不錯也騙到一些獎品...,不過這次倒是寫的最起勁,因為其它比賽都是 "廠商" 讚助,不是 Microsoft 就是 Cisco ... 都要想儘辦法把他們的技術發揮出來才能得獎,老實說寫起來跟工作差不多,總是要滿足那些 "市場" 的需求。

這次題目老實說很 "不實用",純粹是比 code 誰寫的又快又好而已,不過還蠻合我胃口的 :D。正好這次碰到誰呼叫誰這種結構上的問題,就是上面四篇文章一直在討論的 GameHost 為主還是 Player 為主的思考方式,解決這問題花的工夫還比較多。想到這兩套解決方式,我覺的收穫是蠻值得的,至少我多學到兩種不同的設計模式。

最後當然要感謝一下主辦人,下班專程騎車過來頒獎... 哈哈,獎品對我還蠻實用的,算是大獎一枚! 正好是我需要的東西,看來可以開始物色新硬碟,還有要準備來更新我的 SERVER 了 :D



9/27/2008 11:45:00 PM

MSDN Magazine 十月號竟然下架了...

Microsoft.NET | Threading | 技術隨筆 | MSDN

image

上個禮拜才看完 2008 / 10 的 MSDN Magazine,才想說這期 MSDN 的內容,有一半以上的主題是討論多核心處理器效能問題 (平行處理,多核的 CACHE 機制對程式的影響... etc),不過剛才才想貼篇心得,上網一看竟然又縮回去只剩九月號? @_@

難道我是夢到我看過十月號的嘛? 哈哈... GOOGLE 找一下還真的給我找到庫存網頁 :D 當然要貼來紀念一下。不知道重新上架後內容會不會多幾篇文章?

 

這期的主題還真的都押在多核處理器的效能問題上。光是平行處理 ( TPL, PLINQ ) 的方法就好幾篇:

  • Design Considerations For Parallel Programming
  • Improved Support For Parallelism In The Next Version Of Visual Studio
  • Build Concurrent Apps From Simple F# Expressions

 

另外在探討多核心效能問題的也有幾篇:

  • new Thread(ReadEditorsNote).Start(); yourAttention.WaitOne() //這標題還蠻有意思的...
  • .NET Matters: False Sharing
  • Exploring High-Performance Algorithms

 

不過看完只記得部份重點而以,本來想多寫點心得,不過原文網址都連不到了... 再等幾天吧 :D



9/21/2008 10:30:00 PM

[C# yield return] #2. 另類的應用 - Thread Sync 替代方案

Microsoft.NET | 543 | C# | Threading | 小技巧 | 我的作品 | 技術隨筆

上篇,講了一些 yield return 編譯後產生的 Code, 說明了 C# compiler 如何用簡單的語法替你實作了 IEnumerator 介面,而完全不會增加程式的複雜度,這是我認為 C# 提供最讚的 Syntax Sugar ...。

不過無意間我想到了 yield return 還有另一種應用方式。靈感來自之前 Darkthread 舉辦的 [黑暗盃程式魔人賽]。因為參賽題目 [xAxB猜數字遊戲] 原本就是考驗演算法,邏輯就不大簡單了,加上要配合 GameHost 的呼叫方式,難度更提高不少。因此之前貼了兩篇文章 [ThreadSync #1. 概念篇 - 如何化被動為主動?, #2. 實作篇 - 互相等待的兩個執行緒],介紹了我改寫的 AsyncPlayer,讓程式可以分別以獨立的執行緒執行 GameHost 及 Player 的程式碼。藉著這方式讓兩者都可以 "獨立思考",邏輯不會中斷,讓程式能夠簡單一些。

不過執行緒同步機制是很花時間的,因為兩方都要等來等去...。多了 Sync 的動作,就要至少 10 ms 的時間來完成這動作。跑個十幾萬次下來,額外花費的時間太多了,因此我貼了那兩篇文章後,就一直在思考這樣的作法有沒有其它效能較佳的方式?

有的,最後我找到的答案就是 yield return,不過大家看了一定很納悶...

"yield return (Iteration) 跟執行緒同步機制有什麼關聯?"

不多說,先看看之前畫的兩張時序圖:

image

先看之前 ThreadSync #1 裡提到的圖,我這次加上紅線當 "輔助線",紅線代表執行 GameHost 的主程式,這個執行序必需反反覆覆的在 GameHost / Player 兩份類別的程式碼跑來跑去,主程式是 GameHost 發起的,當然被強迫切成好幾段的就只有 Player 了。

image

這是修改過後的版本,GameHost / Player 有各自的執行緒,紅色是 GameHost,藍色是 Player。當執行緒跑到中間時代表它在等待了,等另一方也跑到中間把執行結果放到共用變數,同時叫醒對方之後才交換過來。兩方都各自照著自己的邏輯跑,不過這種等待 & 喚醒的動作,相較於一般的 function call / return 而言,實在是太慢了...。我就是從這張圖得來的靈感,這個解決方式不就跟 yield return 很像嘛? 都是為了避免多次呼叫之間,被呼叫的另一方的邏輯被破切斷的問題... 因此我就開始思考 AsyncPlayer 是不是有機會用 yield return 寫出另一個版本...。

原本的結構很直覺,透過共用變數來傳遞資訊,用 AutoResetEvent 來通知另一個等待中的執行緒可以醒來拿資料去用。而 yield return 則要換個角度來想這件事。yield return 是實作 Iterator 的一種方式,目的是讓你的程式自己決定如何把 collection 裡的 element 照什麼方式丟出去,原本的問題就要想成:

"GameHost 要跟 Player 拿所有 Player 會問的問題,而 Player 會透過 yield return 一次一次的把問題丟出去。"

看起來好像可行,不過方向只有單向,就是 Player 丟問題給 GameHost,還缺了 GameHost 把問題答案交給 Player 這段。不過這部份好解決,一樣用共用變數就搞定。細節我就不講太多,直接來看程式碼:

用 yield return 改寫過的 AsyncPlayer[copy code]
        public abstract IEnumerable<HintRecord> Think();        private HintRecord last_record = null;        public override int[] StartGuess(int maxNum, int digits)        {            base.StartGuess(maxNum, digits);            this._enum = this.Think().GetEnumerator();            this._enum.MoveNext();            return this._enum.Current.Number;        }        public override int[] GuessNext(Hint lastHint)        {            this._enum.Current.Hint = lastHint;            if (this._enum.MoveNext() == true) return this._enum.Current.Number;            throw new InvalidOperationException("Player Stopped!");        }        public override void Stop()        {            base.Stop();            this._enum.Current.Hint = new Hint(this._digits, 0);            try { this._enum.MoveNext(); }            catch {                Console.WriteLine("!!!!");            }        }        protected virtual HintRecord GameHost_AskQuestion(int[] number)        {            this.last_record = new HintRecord(                (int[])number.Clone(),                new Hint());            return this.last_record;        }        protected HintRecord GameHostAnswer        {            get            {                return this.last_record;            }        }
   1:  public abstract IEnumerable<HintRecord> Think();
   2:  private HintRecord last_record = null;
   3:  public override int[] StartGuess(int maxNum, int digits)
   4:  {
   5:      base.StartGuess(maxNum, digits);
   6:      this._enum = this.Think().GetEnumerator();
   7:      this._enum.MoveNext();
   8:      return this._enum.Current.Number;
   9:  }
  10:  public override int[] GuessNext(Hint lastHint)
  11:  {
  12:      this._enum.Current.Hint = lastHint;
  13:      if (this._enum.MoveNext() == true) return this._enum.Current.Number;
  14:      throw new InvalidOperationException("Player Stopped!");
  15:  }
  16:  public override void Stop()
  17:  {
  18:      base.Stop();
  19:      this._enum.Current.Hint = new Hint(this._digits, 0);
  20:      try { this._enum.MoveNext(); }
  21:      catch {
  22:          Console.WriteLine("!!!!");
  23:      }
  24:  }
  25:  protected virtual HintRecord GameHost_AskQuestion(int[] number)
  26:  {
  27:      this.last_record = new HintRecord(
  28:          (int[])number.Clone(),
  29:          new Hint());
  30:      return this.last_record;
  31:  }
  32:  protected HintRecord GameHostAnswer
  33:  {
  34:      get
  35:      {
  36:          return this.last_record;
  37:      }
  38:  }

程式碼一如往常,又是只有一點點 (謎之音: 你到底有沒有寫過長一點的程式碼? -_-) ...。 原本的 Think 改成會傳回 IEnumerable<HintRecord> 的型別,因此內部就可以透過一連串的 yield return xxxx; 指令來把問題交給 GameHost。而 GameHost 拿到題目就會開始計算答案,然後再呼叫 Player.GuessNext( ) 把上次的答案傳回去。透過 Player 的實作,GuessNext 會呼叫 _enum.MoveNext( ), 控制權會再交到 Think( ) 上次呼叫 yield return 的地方,直到又執行到下一個 yield return 為止。這時 GameHost 又取得下一個問題,不斷重複這樣的動作直到結束。

同樣的,我們用 DummyPlayer 改寫,看看用 yield return 的版本寫起來是怎麼樣?

DummyYieldPlayer 的程式碼[copy code]
    public class DummyYieldPlayer : YieldPlayer    {        private Random _rnd = new Random();        private int[] randomGuess()        {            int[] _currAnswer = new int[this._digits];            List<int> lst = new List<int>();            for (int i = 0; i < _digits; i++)            {                int r = _rnd.Next(_maxNum);                while (lst.Contains(r))                    r = _rnd.Next(_maxNum);                lst.Add(r);                _currAnswer[i] = r;            }            return _currAnswer;        }        public override IEnumerable<HintRecord> Think()        {            while (true)            {                yield return this.GameHost_AskQuestion(this.randomGuess());            }        }    }
   1:  public class DummyYieldPlayer : YieldPlayer
   2:  {
   3:      private Random _rnd = new Random();
   4:      private int[] randomGuess()
   5:      {
   6:          int[] _currAnswer = new int[this._digits];
   7:          List<int> lst = new List<int>();
   8:          for (int i = 0; i < _digits; i++)
   9:          {
  10:              int r = _rnd.Next(_maxNum);
  11:              while (lst.Contains(r))
  12:                  r = _rnd.Next(_maxNum);
  13:              lst.Add(r);
  14:              _currAnswer[i] = r;
  15:          }
  16:          return _currAnswer;
  17:      }
  18:      public override IEnumerable<HintRecord> Think()
  19:      {
  20:          while (true)
  21:          {
  22:              yield return this.GameHost_AskQuestion(this.randomGuess());
  23:          }
  24:      }
  25:  }

跟上次的 DummyAsyncPlayer (用 ThreadSync 的版本) 一樣,超簡單,實在沒什麼需要說明的了。唯一要特別記得的是,如果你需要取得 GameHost 傳回的答案,應該在 22 ~ 23 行之間,使用 this.GameHostAnswer( ) 來取得答案。有人問我為什麼不把它包成 function call ? 在 function 內接到參數後呼叫 yield return, 而把答案 return 回來不是很好嗎?

很無奈,除非 C# 支援像 C/C++ 那樣的 MACRO 語法,不然這個東西是不可能單靠 yield return 就做出來。你使用 yield return 的條件就是 function return type 一定要是 IEnumerable<T>,這是配對的,代表你不能任易的把 yield return 移到其它 function call 內。除非你不靠 C# yield return 來自動產生對應的 IEnumerator,一切自己來就可以。不過這樣不就又回到原點了? 咳咳... 就乖乖的寫兩行吧。

這樣的寫法執行效率就好的多,我用 DummyYieldPlayer 來測試,跟 DarkThread 提供的版本不相上下,意思是差異小到可以不理它的地步了 :D 這樣的方式不會有太大的效能損失,因為最後要執行的程式碼,跟直接手寫是差不多的,只是中間難寫的那段 code 是 C# compiler 幫我們解決掉,而不是像上回 AsyncPlayer 是用兩個執行緒來解決的。

效果很滿意,當然最後參賽的版本就改這寫法了 :D。不過寫的太晚,來不及幫到其它參賽者 :P,想到這方法算是我佔了 C# compiler 一點便宜,有幸找到方法坳 C# compiler 幫我把最難的部份寫好了,我自己則樂的輕鬆,專心研究怎樣才能少猜幾次... 這裡把我另類應用 yield return 的方法貼給各位參考一下,也算作個筆記 :D,各位高手如果還有發現 yield return 解決過你什麼樣的怪問題,也歡迎到我這留個言 :D



9/18/2008 3:25:17 AM

[C#: yield return] #1. How It Work ?

Microsoft.NET | Threading | 小技巧 | 技術隨筆 | 物件導向 | C#

C# 常常拿來跟 Java 比較,在 .NET 1.1 時常常是不相上下,而 .NET 又因為較年輕 & 頂著 M$ 的名號,往往被當成玩具一樣,不過 M$ 的確是在 .NET 及 C# 下了很多功夫,作了很多 Sun 不願意在 Java 身上作的事,這次要探討的 yield returnIEnumerable<T> 這搭配的 Interface 就是一例...。

Java 在過去的版本,往往為了跨平台,把修改 VM 規格視為大忌,連帶的連語法修改都一樣,即使不影響編譯出來的 bytecode 相容性也是一樣不肯改。而 .NET 就為了語法簡潔,編譯器往往是讓步的一方,因此 C# 有相當多的 Syntax Sugar,讓你寫起 CODE 爽一點...。你只要寫簡單的 CODE,編譯器會幫你轉成較煩雜的 CODE,有點像是文言文或是成語那樣的味道。古代文人常常用簡單的四個字,就有一大票引申的意義跑出來...,寫作文時只要套上成語,就代表了成語背後的故事,寓意。

"yield return" 算是最甜的甜頭了,因為編譯器替你翻出來的 code 整整一大串。先來看個簡單的例子,如果我想實作 IEnumerator<T> Interface, 照順序輸出 1 ~ 100 的數字,正統的 C# code 看起來要像這樣:

用 IEnumerator 依序傳回 1 ~ 100 的數字[copy code]

   1:  public class EnumSample1 : IEnumerator<int>
   2:  {
   3:      private int _start = 1;
   4:      private int _end = 100;
   5:      private int _current = 0;
   6:      public EnumSample1(int start, int end)
   7:      {
   8:          this._start = start;
   9:          this._end = end;
  10:          this.Reset();
  11:      }
  12:      public int Current
  13:      {
  14:          get { return this._current; }
  15:      }
  16:      public void Dispose()
  17:      {
  18:      }
  19:      object System.Collections.IEnumerator.Current
  20:      {
  21:          get { return this._current; }
  22:      }
  23:      public bool MoveNext()
  24:      {
  25:          this._current++;
  26:          return !(this._current > this._end);
  27:      }
  28:      public void Reset()
  29:      {
  30:          this._current = 0;
  31:      }
  32:  }

 

 

好不容易寫好 IEnumerator 之後,再來是拿來用,一筆一筆印出來:

取得 IEnumerator 物件後,依序取出裡面的數字[copy code]
   1:  EnumSample1 e = new EnumSample1(1, 100);
   2:  while (e.MoveNext())
   3:  {
   4:      Console.WriteLine("Current Number: {0}", e.Current);
   5:  }

 

不過如果只是要列出 1 ~ 100,大部份的人都不會想這樣寫吧? 直接用計概第一堂教你的 loop 不就好了? 程式碼如下:

送分題: 用 LOOP 印出 1 ~ 100 的數字[copy code]
   1:  for (int current = 1; current <= 100; current++)
   2:  {
   3:      Console.WriteLine("Current Number: {0}", current);
   4:  }

 

 

兩個範例都沒錯啊,那為什麼要用 IEnumerator ? 其實 IEnumerator 並不是 Microsoft 發明的,在四人幫寫的經典書籍 (Design Patterns) 裡就有這麼一個設計模式: Iterator,它的目的很明確:

"毋須知曉聚合物件的內部細節,即可依序存取內含的每一個元素。"

(摘自 物件導向設計模式 Design Patterns 中文版,葉秉哲 譯)

這裡指的 "聚合物件" 就是指 .NET 的 Collection, List, Array 等這類物件。意思是你不需要管 collection 裡每一個物件是怎麼擺的,用什麼結構處理的,用什麼邏輯或演算法處理的,我就只管照你安排好的順序一個一個拿出來就好。沒錯,這就是它主要的目的。換另一個說法,就是我們希望把物件巡訪的順序 (iteration) 跟依序拿到物件後要作什麼事 (process) 分開,那你就得參考 Iterator Pattern。不用? 那只好讓你的 iteration / process 混在一起吧。

差別在那? 我們再來看第二個例子。如果題目改一下,要列出 1 ~ 100 的數字,但如果不是 2 的倍數,也不是 3 的倍數,就跳過去。先來看看 Loop 的版本:

進階送分題,用LOOP印出 1~100 之中,2 或 3 的倍數[copy code]
   1:  for (int current = 1; current <= 100; current++)
   2:  {
   3:      bool match = false;
   4:      if (current % 2 == 0) match = true;
   5:      if (current % 3 == 0) match = true;
   6:      if (match == true)
   7:      {
   8:          Console.WriteLine("Current Number: {0}", current);
   9:      }
  10:  }

 

 

再來看看 IEnumerator 的版本:

用 IEnumerator 列出 1 ~ 100 中 2 或 3 的倍數[copy code]
   1:  public class EnumSample2 : IEnumerator<int>
   2:   {
   3:       private int _start = 1;
   4:       private int _end = 100;
   5:       private int _current = 0;
   6:       public EnumSample2(int start, int end)
   7:       {
   8:           this._start = start;
   9:           this._end = end;
  10:           this.Reset();
  11:       }
  12:       public int Current
  13:       {
  14:           get { return this._current; }
  15:       }
  16:       public void Dispose()
  17:       {
  18:       }
  19:       object System.Collections.IEnumerator.Current
  20:       {
  21:           get { return this._current; }
  22:       }
  23:       public bool MoveNext()
  24:       {
  25:           do {
  26:               this._current++;
  27:           } while(this._current %2 > 0 && this._current %3 > 0);
  28:           return !(this._current > this._end);
  29:       }
  30:       public void Reset()
  31:       {
  32:           this._current = 0;
  33:       }
  34:   }

 

 

而扣掉 IEnumerator 的部份,要把數字印出來的程式碼則完全沒有改變:

取出 IEnumerator 的每個數字,印到畫面上[copy code]
   1:  EnumSample2 e = new EnumSample2(1, 100);
   2:  while (e.MoveNext())
   3:  {
   4:      Console.WriteLine("Current Number: {0}", e.Current);
   5:  }

 

可以看的到,Loop 版本的確是把 iteration 跟 process 的 code 完全混在一起了,未來任何一方的邏輯要抽換都很麻煩,而 IEnumerator 則不會,分的很清楚,不過... 這 Code 會不會太 "髒" 了一點啊...? 試問一下,有誰會這麼勤勞,都用 IEnumerator 來寫 Code? 有的話請留個言,讓我崇拜一下...。

 

屁話講了一堆,最後就是要帶出來 "的確有魚與熊掌得兼的方法",怎麼作? 來看看用 C# 的 yield return 版本的程式碼:

傳回 IEnumerable 的 METHOD (不用再寫 CLASS,實作 IEnumerator 了)[copy code]
   1:  public static IEnumerable<int> YieldReturnSample3(int start, int end)
   2:  {
   3:      for (int current = 1; current <= 100; current++)
   4:      {
   5:          bool match = false;
   6:          if (current % 2 == 0) match = true;
   7:          if (current % 3 == 0) match = true;
   8:          if (match == true)
   9:          {
  10:              yield return current;
  11:          }
  12:      }
  13:  }

 

 

用 foreach 搭配 IEnumerable 印出每一筆數字[copy code]
   1:  foreach (int current in YieldReturnSample3(1, 100))
   2:  {
   3:      Console.WriteLine("Current Number: {0}", current);
   4:  }

 

 

 

真是太神奇了,安德魯。如何? 完美的結合兩者的優點,這種 code 實在是令人挑不出什麼缺點... 真是優雅... 不過念過系統程式的人一定都會吶悶... 這樣的程式執行方式,不就完全的違背了一般結構化程式典型的 function call / return 的鐵律了? 程式呼叫某個 function 就應該完全執行完才能 return 啊,怎麼能 "yield" return 後,跑完一圈又回到剛才執行到一半的 function 繼續跑,然後再 "yield" return ? 好像同實有兩段獨立的邏輯在運作... 還可以在兩者之間跳來跳去?

這就是 C# compiler 猛的地方了。搬出 reflector 來看看編譯出來的 code, 再被反組譯回來變成什麼樣子:

 

反組譯 YieldReturnSample3[copy code]
   1:  public static IEnumerable<int> YieldReturnSample3(int start, int end)
   2:  {
   3:      <YieldReturnSample3>d__0 d__ = new <YieldReturnSample3>d__0(-2);
   4:      d__.<>3__start = start;
   5:      d__.<>3__end = end;
   6:      return d__;
   7:  }
   8:   
   9:   

 

耶? 看到一個多出來的 class: <YieldReturnSample3>d__0 ... 再看看它的 class 長啥樣:

編譯器自動產生的 IEnumerator 衍生類別[copy code]
   1:  [CompilerGenerated]
   2:  private sealed class <YieldReturnSample3>d__0 : IEnumerable<int>, IEnumerable, IEnumerator<int>, IEnumerator, IDisposable
   3:  {
   4:      // Fields
   5:      private int <>1__state;
   6:      private int <>2__current;
   7:      public int <>3__end;
   8:      public int <>3__start;
   9:      private int <>l__initialThreadId;
  10:      public int <current>5__1;
  11:      public bool <match>5__2;
  12:      public int end;
  13:      public int start;
  14:   
  15:      // Methods
  16:      [DebuggerHidden]
  17:      public <YieldReturnSample3>d__0(int <>1__state)
  18:      {
  19:          this.<>1__state = <>1__state;
  20:          this.<>l__initialThreadId = Thread.CurrentThread.ManagedThreadId;
  21:      }
  22:   
  23:      private bool MoveNext()
  24:      {
  25:          switch (this.<>1__state)
  26:          {
  27:              case 0:
  28:                  this.<>1__state = -1;
  29:                  this.<current>5__1 = 1;
  30:                  while (this.<current>5__1 <= 100)
  31:                  {
  32:                      this.<match>5__2 = false;
  33:                      if ((this.<current>5__1 % 2) == 0)
  34:                      {
  35:                          this.<match>5__2 = true;
  36:                      }
  37:                      if ((this.<current>5__1 % 3) == 0)
  38:                      {
  39:                          this.<match>5__2 = true;
  40:                      }
  41:                      if (!this.<match>5__2)
  42:                      {
  43:                          goto Label_0098;
  44:                      }
  45:                      this.<>2__current = this.<current>5__1;
  46:                      this.<>1__state = 1;
  47:                      return true;
  48:                  Label_0090:
  49:                      this.<>1__state = -1;
  50:                  Label_0098:
  51:                      this.<current>5__1++;
  52:                  }
  53:                  break;
  54:   
  55:              case 1:
  56:                  goto Label_0090;
  57:          }
  58:          return false;
  59:      }
  60:   
  61:      [DebuggerHidden]
  62:      IEnumerator<int> IEnumerable<int>.GetEnumerator()
  63:      {
  64:          Program.<YieldReturnSample3>d__0 d__;
  65:          if ((Thread.CurrentThread.ManagedThreadId == this.<>l__initialThreadId) && (this.<>1__state == -2))
  66:          {
  67:              this.<>1__state = 0;
  68:              d__ = this;
  69:          }
  70:          else
  71:          {
  72:              d__ = new Program.<YieldReturnSample3>d__0(0);
  73:          }
  74:          d__.start = this.<>3__start;
  75:          d__.end = this.<>3__end;
  76:          return d__;
  77:      }
  78:   
  79:      [DebuggerHidden]
  80:      IEnumerator IEnumerable.GetEnumerator()
  81:      {
  82:          return this.System.Collections.Generic.IEnumerable<System.Int32>.GetEnumerator();
  83:      }
  84:   
  85:      [DebuggerHidden]
  86:      void IEnumerator.Reset()
  87:      {
  88:          throw new NotSupportedException();
  89:      }
  90:   
  91:      void IDisposable.Dispose()
  92:      {
  93:      }
  94:   
  95:      // Properties
  96:      int IEnumerator<int>.Current
  97:      {
  98:          [DebuggerHidden]
  99:          get
 100:          {
 101:              return this.<>2__current;
 102:          }
 103:      }
 104:   
 105:      object IEnumerator.Current
 106:      {
 107:          [DebuggerHidden]
 108:          get
 109:          {
 110:              return this.<>2__current;
 111:          }
 112:      }
 113:  }

 

耶? 不就完全跟之前手工寫的 IEnumerator 一樣嘛? 只不過這個 IEnumerator 是自動產生出來的,不是手寫的...。 畢竟是機器產生的 CODE,總是沒那麼精簡。想到了嗎? 沒錯,這就是 C# compiler 送給你的 syntax sugar ...,你可以腦袋裡想像著計概課入門時教你的 LOOP 那樣簡單的想法,compiler 就幫你換成 IEnumerator 的實作方式,讓你隨隨便便就可以跟別人宣稱:

 

"看! 我的程式有用到 Iterator 這個設計模式喔..."

 

聽起來好像很臭屁的樣子... 哈哈! 如果是在真的用的到 Iterator Patterns 的情況下,真的是可以很臭屁的拿出來炫耀一下。不過,我幹嘛突然講起 yield return ? 各位看的過程中有沒有聯想到前幾篇 POST 講的 Thread Sync 那兩篇文章 ( #1, #2 ) ? IEnumerator 跟 Thread Sync 又有什麼關係? 賣個關子,下篇繼續!



9/2/2008 9:58:00 PM

[RUN! PC] 2008 九月號

Microsoft.NET | RUNPC | Threading | 我的作品 | 技術隨筆

image

感謝編輯賞光,第三篇順利刊出 :D

 

執行緒這種東西,實在不是什麼主流的文章,不過雜誌社願意刊到第三篇,真是感謝... 前兩篇分別介紹了同步機制跟旗標,這次用執行緒集區作總結,提供了綜合的應用,也對效能的影響作整理,讓讀者具體的瞭解使用前後的效能差異。

 

這次文章內提到了 ThreadPool 的應用,不過因為內容及篇幅的關係,沒有挖到 ThreadPool 本身怎麼設計。對這部份有興趣的讀者可以參考我寫的這三篇:

ThreadPool 實作 #1. 基本概念

ThreadPool 實作 #2. 程式碼 (C#)

ThreadPool 實作 #3. AutoResetEvent / ManualResetEvent

 

雖然好像沒有人因為看到雜誌才連到這裡來,不過還是要囉唆一下,看到文章有任何意見都可以在這裡留言給我。文章內提到的 SAMPLE CODE 可以在這裡下載! 這次的範例程式是 Console application,不提供直接在網頁上執行,下載回去試試吧!



8/15/2008 2:51:00 AM

Thread Sync #2. 實作篇 - 互相等待的兩個執行緒

Microsoft.NET | Threading | 小技巧 | 作業系統 | 技術隨筆 | 物件導向

上篇有人跟我講太深奧了... Orz, 其實不會,只是還沒看到 Code 而以...。就先來幫黑暗魔人賽說明一下程式碼...。首先來看的是黑暗大魔王: GameHost..

 

GameHost 呼叫 Player 的片段[copy code]
    public void Start(Player p)    {        // 略...        int[] guess = p.StartGuess(_maxNum, _digits);        // 略...        Hint hint = compare(guess);        // 略...        while (hint.A != _digits)        {            // 略...            guess = p.GuessNext(hint);            // 略...            hint = compare(guess);        }        p.Stop();        // 略...    }
   1:  public void Start(Player p)
   2:  {
   3:      // 略...
   4:      int[] guess = p.StartGuess(_maxNum, _digits);
   5:      // 略...
   6:      Hint hint = compare(guess);
   7:      // 略...
   8:      while (hint.A != _digits)
   9:      {
  10:          // 略...
  11:          guess = p.GuessNext(hint);
  12:          // 略...
  13:          hint = compare(guess);
  14:      }
  15:      p.Stop();
  16:      // 略...
  17:  }

 

這段程式完全是老闆的角度在思考。抓到 PLAYER 後就叫它開始猜 StartGuess(),然後拼命的叫 PLAYER 再猜 GuessNext(), 直到猜中才可以休息 Stop()

很典型的多型 ( Polymorphism ) 應用,實際上會 RUN 什麼 CODE,就看繼承 PLAYER 的人是怎麼寫的...。這次我們再從弱勢勞工的角度來看看 PLAYER 該怎麼實作 (以 darkthread 附的 DummyPlayer 為例):

 

Player 實作的範例 ( DummyPlayer )[copy code]
public class DummyPlayer : Player{    private int[] _currAnswer = null;    private Random _rnd = new Random();    private void randomGuess()    {        List<int> lst = new List<int>();        for (int i = 0; i < _digits; i++)        {            int r = _rnd.Next(_maxNum);            while (lst.Contains(r))                r = _rnd.Next(_maxNum);            lst.Add(r);            _currAnswer[i] = r;        }    }    public override int[] StartGuess(int maxNum, int digits)    {        base.StartGuess(maxNum, digits);        _currAnswer = new int[digits];        randomGuess();        return _currAnswer;    }    public override int[] GuessNext(Hint lastHint)    {        randomGuess();        return _currAnswer;    }}
   1:  public class DummyPlayer : Player
   2:  {
   3:      private int[] _currAnswer = null;
   4:      private Random _rnd = new Random();
   5:   
   6:      private void randomGuess()
   7:      {
   8:          List<int> lst = new List<int>();
   9:          for (int i = 0; i < _digits; i++)
  10:          {
  11:              int r = _rnd.Next(_maxNum);
  12:              while (lst.Contains(r))
  13:                  r = _rnd.Next(_maxNum);
  14:              lst.Add(r);
  15:              _currAnswer[i] = r;
  16:          }
  17:      }
  18:   
  19:      public override int[] StartGuess(int maxNum, int digits)
  20:      {
  21:          base.StartGuess(maxNum, digits);
  22:          _currAnswer = new int[digits];
  23:          randomGuess();
  24:          return _currAnswer;
  25:      }
  26:      public override int[] GuessNext(Hint lastHint)
  27:      {
  28:          randomGuess();
  29:          return _currAnswer;
  30:      }
  31:  }

 

因為 CODE 不多,我就不刪了,全文照貼。另一個原因是我想讓各位看看拆成好幾段的 CODE 是不是還能夠一眼就還原成原來的邏輯?  如果只看這段 CODE 十秒鐘,沒有看註解或說明,誰能馬上回答這段 CODE 解題的邏輯是什麼?

 

別誤會,不是指這 CODE 不易讀,而是因為呼叫的方式邏輯被迫配合 GameHost 而被切散了,你得再重新把它拼湊起來。它的邏輯很簡單,甚至簡單到連問題的答案都被忽略掉了,不過就每次都隨機丟個數字回去,在 StartGuess( ) 及 GuessNext( ) 都是。

 

 

可憐的勞動階級要站起來啊~ 先幻想一下,如果勞工 (PLAYER) 才是老闆,那麼程式可以改成怎麼樣? 這也才是我們本篇的主角。先來看看成果再回頭來看怎麼實作。這次看的是修改後的版本: AsyncDummyPlayer.

 

換 PLAYER 的角度思考的邏輯: AsyncDummyPlayer[copy code]
    public class AsyncDummyPlayer : AsyncPlayer    {        private int[] _currAnswer = null;        private Random _rnd = new Random();        private void randomGuess()        {            List<int> lst = new List<int>();            for (int i = 0; i < _digits; i++)            {                int r = _rnd.Next(_maxNum);                while (lst.Contains(r))                    r = _rnd.Next(_maxNum);                lst.Add(r);                _currAnswer[i] = r;            }        }        protected override void Init(int maxNum, int digits)        {            _currAnswer = new int[digits];        }        protected override void Think()        {            while (true)            {                this.randomGuess();                Hint h = this.GameHost_AskQuestion(this._currAnswer);                if (h.A == this._digits) break;            }        }    }
   1:  public class AsyncDummyPlayer : AsyncPlayer
   2:  {
   3:      private int[] _currAnswer = null;
   4:      private Random _rnd = new Random();
   5:      private void randomGuess()
   6:      {
   7:          List<int> lst = new List<int>();
   8:          for (int i = 0; i < _digits; i++)
   9:          {
  10:              int r = _rnd.Next(_maxNum);
  11:              while (lst.Contains(r))
  12:                  r = _rnd.Next(_maxNum);
  13:              lst.Add(r);
  14:              _currAnswer[i] = r;
  15:          }
  16:      }
  17:      protected override void Init(int maxNum, int digits)
  18:      {
  19:          _currAnswer = new int[digits];
  20:      }
  21:      protected override void Think()
  22:      {
  23:          while (true)
  24:          {
  25:              this.randomGuess();
  26:              Hint h = this.GameHost_AskQuestion(this._currAnswer);
  27:              if (h.A == this._digits) break;
  28:          }
  29:      }
  30:  }

 

程式碼也沒比較少,都差不多。不過是那堆 CODE 換個地方擺而以。但是仔細看看,這個版本的邏輯清楚多了,PLAYER 一開始就是執行 Init( ) 的部份,而 GameHost 叫 Player 開始解題時, Player 就開始思考 (Think),而這個無腦的 Player 也很直接,就一直執行 while (true) { .... } 這個無窮迴圈,直到亂猜猜中為止。

如果 Player 在思考的時,不管在那裡它都可以適時的呼叫 GameHost_AskQuestion( .... ) 來跟 GameHost 問答案。什麼時後該猜數字? 該猜什麼數字? 這正是整個 Player 的核心,也就是 "怎麼猜" 這件事。以人的思考方式一定會分階段,比如一開始先把所有數字猜一輪,有個概念後再想想怎麼猜能更逼近答案,最後才是致命的一擊,找出正確答案送出去,贏得比賽。

這樣的作法,如果套在 DummyPlayer (原版本),每個階段都要塞在一個大的 switch case, 放在 GuessNext( ) 裡。而現在是那個階段? 只能靠 instance variable 了,先存著等到下次又被呼叫時再拿出來回想一下,上回作到那...。

而第二個版本,則完全沒這個問題,就把它當作一般程式思考就夠了,第一階段就是一個 LOOP,有它自己用的一些變數。第一階段處理完畢就離開 LOOP 繼續執行後面的 CODE... 直到最後離開 Think( ) 這個 method (認輸) 或是猜中答案光榮返鄉...。

 

 

兩者的差別看出來了嗎? DummyPlayer 像是被動的勞工,老闆說一動他就作一動。第一動作完就拿個筆計記下來,等著下次老闆再叫他,他就翻翻筆記看看之前做到那,這次繼續...。

而 AsyncDummyPlayer 這個主動的勞工呢? 老闆交待給他一件任務後,他就自己思考起該怎麼做了。中間都不需要老闆下令。反而是過程中勞工需要老闆的協助時,老闆再適時伸出援手就可以了,一切雜務都由這位主動優秀的勞工自己處理掉。

 

 

 

有沒有差這麼多? 這麼神奇? 是怎麼辦到的? 先來看看類別關系圖:

ThreadSync

 

上圖中,AsyncPlayer 就是改變這種型態的關鍵類別。AsyncPlayer 會用我們在上一篇講到的關念,化被動為主動,轉換這兩種呼叫模式。先來看看這個類別的程式碼到底變了什麼把戲,可以讓弱勢的勞工也有自主的權力?

 

 

 

 

AsyncPlayer 實作: 化被動為主動[copy code]
    public abstract class AsyncPlayer : Player    {        public override int[] StartGuess(int maxNum, int digits)        {            base.StartGuess(maxNum, digits);            Thread thinkThread = new Thread(this.ThinkCaller);            thinkThread.Start();                        this._host_return.WaitOne();            return this._temp_number;        }        public override int[] GuessNext(Hint lastHint)        {            this._temp_hint = lastHint;            this._host_call.Set();            this._host_return.WaitOne();            return this._temp_number;        }        public override void Stop()        {            base.Stop();            this._temp_hint = new Hint(this._digits, 0);            this._host_call.Set();            this._host_end.WaitOne();            this._host_complete = true;        }        private void ThinkCaller()        {            try            {                this.Init(this._maxNum, this._digits);                this.Think();            }            catch (Exception ex)            {                Console.WriteLine("Player Exception: {0}", ex);            }            finally            {                this._host_end.Set();            }        }        protected abstract void Init(int maxNum, int digits);        protected abstract void Think();        private AutoResetEvent _host_call = new AutoResetEvent(false);        private AutoResetEvent _host_return = new AutoResetEvent(false);        private AutoResetEvent _host_end = new AutoResetEvent(false);        private bool _host_complete = false;        private int[] _temp_number;        private Hint _temp_hint;        protected Hint GameHost_AskQuestion(int[] number)        {            if (this._host_complete == true) throw new InvalidOperationException("GameHost stopped!");            lock (this)            {                try                {                    this._temp_number = number;                    this._host_return.Set();                    this._host_call.WaitOne();                    return this._temp_hint;                }                finally {                    this._temp_number = null;                    this._temp_hint = new Hint(-1, -1);                }            }        }    }
   1:  public abstract class AsyncPlayer : Player
   2:  {
   3:      public override int[] StartGuess(int maxNum, int digits)
   4:      {
   5:          base.StartGuess(maxNum, digits);
   6:          Thread thinkThread = new Thread(this.ThinkCaller);
   7:          thinkThread.Start();
   8:          this._host_return.WaitOne();
   9:          return this._temp_number;
  10:      }
  11:      public override int[] GuessNext(Hint lastHint)
  12:      {
  13:          this._temp_hint = lastHint;
  14:          this._host_call.Set();
  15:          this._host_return.WaitOne();
  16:          return this._temp_number;
  17:      }
  18:      public override void Stop()
  19:      {
  20:          base.Stop();
  21:          this._temp_hint = new Hint(this._digits, 0);
  22:          this._host_call.Set();
  23:          this._host_end.WaitOne();
  24:          this._host_complete = true;
  25:      }
  26:      private void ThinkCaller()
  27:      {
  28:          try
  29:          {
  30:              this.Init(this._maxNum, this._digits);
  31:              this.Think();
  32:          }
  33:          catch (Exception ex)
  34:          {
  35:              Console.WriteLine("Player Exception: {0}", ex);
  36:          }
  37:          finally
  38:          {
  39:              this._host_end.Set();
  40:          }
  41:      }
  42:      protected abstract void Init(int maxNum, int digits);
  43:      protected abstract void Think();
  44:      private AutoResetEvent _host_call = new AutoResetEvent(false);
  45:      private AutoResetEvent _host_return = new AutoResetEvent(false);
  46:      private AutoResetEvent _host_end = new AutoResetEvent(false);
  47:      private bool _host_complete = false;
  48:      private int[] _temp_number;
  49:      private Hint _temp_hint;
  50:      protected Hint GameHost_AskQuestion(int[] number)
  51:      {
  52:          if (this._host_complete == true) throw new InvalidOperationException("GameHost stopped!");
  53:          lock (this)
  54:          {
  55:              try
  56:              {
  57:                  this._temp_number = number;
  58:                  this._host_return.Set();
  59:                  this._host_call.WaitOne();
  60:                  return this._temp_hint;
  61:              }
  62:              finally {
  63:                  this._temp_number = null;
  64:                  this._temp_hint = new Hint(-1, -1);
  65:              }
  66:          }
  67:      }
  68:  }

 

這段程式碼長了一點,內容也都刪不得,各位請耐心點看。上一篇我畫了張概念性的時序圖,這次我們再拿同一張圖,不過這次會標上程式碼:

 ThreadSync2

 

 

請注意一下各個箭頭的上下順序。由上往下代表時間的進行,如果應該在後面執行的 CODE 不巧先被呼叫了,則動作較快的那個 THREAD 會被迫暫停,等待另一邊的進度跟上。先來看看 StartGuess( ) 怎麼跟 Think( ) 互動:

StartGuess(...)[copy code]
        public override int[] StartGuess(int maxNum, int digits)        {            base.StartGuess(maxNum, digits);            Thread thinkThread = new Thread(this.ThinkCaller);            thinkThread.Start();                        this._host_return.WaitOne();            return this._temp_number;        }
   1:  public override int[] StartGuess(int maxNum, int digits)
   2:  {
   3:      base.StartGuess(maxNum, digits);
   4:      Thread thinkThread = new Thread(this.ThinkCaller);
   5:      thinkThread.Start();
   6:      this._host_return.WaitOne();
   7:      return this._temp_number;
   8:  }

 

GameHost 呼叫 Player.StartGuess( ) 有兩個目的,一個是給 Player 題目範圍,讓 Player 做好準備動作。另一個則是準備好之後 GameHost 要取得 Player 傳回的第一個問題。

程式碼很忠實的做了一樣的事,只不過 StartGuess( ) 建立了新的執行緒來負責。新的執行緒會執行 ThinkCaller( ),啟動之後 GameHost 這邊就什麼都不作,等待 _host_return 這個 WaitHandle 被叫醒,代表另一邊已經做好了,可以從共用變數 _temp_number 取得問題傳回去。

既然 GameHost 在等待某人通知它,我們就來看看是誰會通知他題目已經準備好了:

 

GameHost_AskQuestion(...)[copy code]
        protected Hint GameHost_AskQuestion(int[] number)        {            if (this._host_complete == true) throw new InvalidOperationException("GameHost stopped!");            lock (this)            {                try                {                    this._temp_number = number;                    this._host_return.Set();                    this._host_call.WaitOne();                    return this._temp_hint;                }                finally {                    this._temp_number = null;                    this._temp_hint = new Hint(-1, -1);                }            }        }
   1:  protected Hint GameHost_AskQuestion(int[] number)
   2:  {
   3:      if (this._host_complete == true) throw new InvalidOperationException("GameHost stopped!");
   4:      lock (this)
   5:      {
   6:          try
   7:          {
   8:              this._temp_number = number;
   9:              this._host_return.Set();
  10:              this._host_call.WaitOne();
  11:              return this._temp_hint;
  12:          }
  13:          finally {
  14:              this._temp_number = null;
  15:              this._temp_hint = new Hint(-1, -1);
  16:          }
  17:      }
  18:  }

 

就在 GameHost 正在等題目的時後,另一個執行緒正在進行 "思考" 的動作,直到有結論後會呼叫 GameHost_AskQuestion( ... ) 送出問題。這時這個問題會被放到 _temp_number, 而下一步就是 _host_return.Set( ), 通知另一個執行緒,正在等這個結果的人: "喂! 東西已經準備好了,可以來取貨了!!"

整個機制就這樣串起來了。GameHost 那邊怎麼把答案傳回來? 同樣的作法,反過來而以。GameHost 會藉著呼叫 Player.GuessNext(...) 把答案傳回來,而這時就觸動一樣的機制,讓另一邊 Player Thread 呼叫的 GameHost_AskQuestion( ... ) 醒過來,把答案拿走, RETURN 回去。

這樣一直重複下去,剩下最後一個同步的點,就是結束遊戲的地方。說穿了也是一樣的把戲,只是這次是藉著 GameHost 呼叫 Player.Stop( ),而另一邊 Player Thread 執行完 Think( ) 後,兩邊就一起結束遊戲了。

 

總算講完了。其實 thread 能解決的問題還真是五花八門。每次當我想出這些方法來簡化問題時,我就會覺的很有成就感。雖然寫出這個不會讓我贏得比賽,反而因為同步的關係,AsyncDummyPlayer 執行的速度還遠遠落後 DummyPlayer (我的機器跑起來,大概差了四~五倍 ...) 。不過我知道我簡單的頭腦,不先把問題簡化的話,我大概解決不了太複雜的問題...。也許是缺了這種能力,才讓我更有動力去想簡化問題的方式吧?

 

最後,為什麼每次講到 thread 的文章,都是 code 一點點,文章跟圖一大堆? 咳咳... 難道我也到了靠一張嘴混日子的地步了嘛? Orz... 本系列到此結束,以後還會有什麼主題? 想到再說啦~~ 今天各位記得去拜拜~~ 下回見!



8/14/2008 3:37:00 AM

Thread Sync #1. 概念篇 - 如何化被動為主動?

Microsoft.NET | Threading | 技術隨筆 | 物件導向

別以為我轉行了... 這篇不是勵志文章,教你用主動積極的態度面對人生.... 而是討論執行緒同步機制及如何用來解決惱人的流程問題。會寫這篇的念頭來自黑暗程式魔人辦的猜數字程式設計大賽,在處理的過程中想到的解法...,不過這篇要講的不是猜數字,而是不相干的東西: Thread Sync (執行緒的同步機制)。

 

一般程式寫久了,會很習慣一路到底的思考方式,程式也完全照這樣的思路被設計出來。不過寫 GAME 這類的程式就不是這麼一回事了。就先舉十五年前我用 C 寫的俄羅斯方塊的遊戲當例子 (大驚! 十... 十五年?),腦袋裡想的流程八九不離十,一定是像這樣:

"隨機從上面掉一個方塊下來,時間到了就往下掉,USER有按方向鍵就左右移動或是旋轉,直到卡到底下或是其它方塊為止..."

很正確的想法,很可惜你的程式完全不能這樣寫,為什麼? 當你沒有使用多執行緒或是其它的技巧時,你的主程式流程一定得被限定在固定時間 refresh 畫面的無窮迴圈... 上面的邏輯怎麼辦? 會被迫拆成好幾塊,然後被主程式定期呼叫... 程式寫起來大概會像這樣:

TETRIS
   1:  public void ProcessBrick()
   2:  {
   3:      switch (status)
   4:      {
   5:          case 1:
   6:              //
   7:              //  按右鍵, 往右移一格
   8:              //
   9:              break;
  10:          case 2:
  11:              //
  12:              //  按左鍵, 往左移一格
  13:              //
  14:              break;
  15:          case 3:
  16:              //
  17:              //  按上鍵, 順時針旋轉 90 度
  18:              //
  19:              break;
  20:          case 4:
  21:              //
  22:              //  按下鍵, 往下移一格
  23:              //
  24:              break;
  25:          case 5:
  26:              //
  27:              //  .......
  28:              //
  29:              break;
  30:      }
  31:  }

 

原本好好的邏輯被切成好幾塊,然後再藉著狀態等資訊,每次的 LOOP 各挑這次要執行的那一小段,然後拼湊出原本的邏輯...。別哀怨,誰叫你寫的 CODE 不是老大? 老大是控制畫面的主程式,你既然是當小的就乖乖躲在旁邊被呼叫... 委屈一點是應該的...。

 

真是黑心啊... 誰叫老闆永遠是對的。這次黑心... 不,黑暗魔人出的題目正好又讓我聯想到一樣的狀況。黑暗魔人出的題目,是先實作了 GameHost 類別 (就是老大啦),及 Player 抽象型別 (小角色就是他),再藉著多型 (Polymorphism) 的方式,由 GameHost 不斷的呼叫 Player 提供的 GuessNum( ),來讓 Player 問問題,同時把上一次問題的答案傳給 Player ...

 

程式也不難寫,大家都玩過 1A1B 的遊戲吧? 腦袋裡一定是這樣想的:

 

"一開始先隨便猜幾個,把結果記下來....。"

"再來刪去法,比較可能的幾個數字再猜一猜...。"

"快猜到了,幾種組合列出來想一想怎麼辦好? 好! 就猜這個...。"

...

...

...

"BINGO! 猜中了!!!"

 

很高興的想好流程後,真的要開始寫就傻住了... 這堆流程跟邏輯,要我拆成一直會被重複呼叫的 "單一個" method, 每一回合會被呼叫一次...。意思是一連串複雜的處理過程,要依序切成完全一樣的片段 (就是指重複呼叫同一段程式碼) ? 更慘的是這次問的問題,下一次呼叫才拿的到答案!? MY GOD... 要想出怎麼猜到對方的數字已經夠頭痛了,還得來處理這些 "行政" 問題?

 

My God, 頭越想越痛,這樣下去來我大概只能寫出比 DummyPlayer (註: 參賽程式附的 Player 範例,隨機產生問題,無腦的一直問,直到猜中為止...) 高明一點點的程式而已了。曾 MSN 跟黑暗魔人討論過,看看能不能讓 GameHost 化主動為被動,改由 GameHost 提供 callback 讓 Player 呼叫的可能性? 不過後來想想不對,那有主持人在旁邊看等著被來賓訪問的道理? 何況如果以後是兩個 Player 對戰怎麼辦? 誰要來當 "小的" ?? 問題還是一樣沒解決...

 

過去大家對於 THREAD 的印象都是 "多工",需要用 thread 解決的問題大多是在改善執行效能上面,因為同時用兩個 thread 可以做兩件事,效率會比較好。其實 thread 也很適合解決這類的問題,因為執行緒讓我們有機會,不需切斷 GameHost / Player 的 "思路",讓兩邊都能用很直覺的思考方式寫程式。

 

大概畫一下時序圖對照一下,先看看原本的作法:

投影片2

 

 

 

 

再看一下改用兩個執行緒的作法:

投影片3

 

 

不管之中的技術障礙怎麼克服,至少這樣改起來,兩邊都能各自用更合理簡單的方式思考自己的問題,也更接近實際的情況 (莊家跟玩家不會共用一個腦子吧...)。也唯有把問題簡化之後,我們才有辦法想出更複雜的方式來解決問題,科技不就是這樣進步的嘛?

 

這次的例子裡,執行緒是用來簡化問題的,而不是拿來增進效率的。兩個人腦袋各自想著問題,總要溝通吧? thread 之間溝通的機制就很單純了,共用變數加上同步機制,來確定對方是否準備好我要的東西,或是對方是否已經準備好要接招了?

 

這次搬出來的是過去說明 thread pool 提到的 AutoResetEvent,現在又重現江湖了。方法很簡單,要拿資料的那一方,就去 Wait( ) 等資料準備好,另一方把資料放在共用變數之後就呼叫 Set( ),叫醒另一個還在 Wait( ) 的執行緒,可以起床拿東西閃人了...。

 

接下來當然就各忙各的,直到雙方又有交換資料的需求,同樣的方式就再來一次。只是隨著資料交換的方向不同 (比如問問題是把題目由 player --> host,而取得答案則是 host --> player),上述的動作雙方角色要互換才能順利進行。

 

Orz,本來想一篇打完的,不過打到快睡著,加上內容還剩不少... 就分兩集吧! 程式碼及實作各位就耐心點,下一篇會端出實際操作執行緒的範例。



4/10/2008 2:03:00 AM

Canon CR2 --&gt; .JPEG 速度加倍.. 該換 Core2 Quad 了嘛?

Microsoft.NET | 543 | Threading | WPF | 我的作品 | 技術隨筆

看來是換四核心CPU的時機到了 [H]

之前弄了半天的歸檔程式,效能都卡在 .CR2 -> .JPG 這段。雖然祭出了 ThreadPool,也想盡辦法把獨立的工作湊在一起,盡量提升 CPU 的利用率,不過得到的效果有限,因為最後都是剩下 .CR2 的檔案轉不完啊,其它拿來填空檔的工作早就做完了,實在不成比例... 整體效能還是卡在最慢的 Canon Codec 身上...

這次無意間想到,單一 process 內,Canon Codec 有過多不能重複進入的問題 (不能同時利用到兩顆CPU),那麼拆成兩個獨立的 process 是否就解決了?

想一想還蠻可行的,通常為了安全,都只需要做到 process 內的 LOCK 就夠了,不需要做到全域的 LOCK,除非要 LOCK 的資源是跨 process 會用的到的才需要這樣做... 在真正改程式下去之前,當然要先驗證一下...

用之前的 LIB 簡單寫了個執行檔,就單純的把 .CR2 轉 .JPG 而以。寫好後同時 RUN 兩份,讚! CPU 利用率飆到 80% (雖然離理想的 100% 還有段距離) ... 不過在我的 E6300 CPU,同時跑兩份,執行速度倒沒有下降,差不多..

確定這方式有效之後,花了點時間改我的歸檔程式,把轉檔部份抽成 .exe 然後由歸檔程式來啟動,一樣維持同時有兩個 .CR2 轉檔程序進行。耶! 情況不錯,轉一個檔案一樣要 70 秒,但是 70 秒過後可以轉完兩個檔案... 平均起來等於速度加倍了...

之前一直不想用這個方法,因為一來 IPC (inter-process communication) 是件麻煩事,不想去碰,二來啟動另一個 process,參數的傳遞也是個麻煩事,大概就只能靠檔案或是 arguments ... 或多或少都要處理到一些 parsing 的問題... 三來執行的回傳值也是一樣,總之都碰到 IPC 的問題就不想去碰他了

不過這次評估了一下,這些動作麻煩歸麻煩,至少不是花運算資源的動作,跟一張照片轉 70 秒,一次動則上百張的量比起來,怎麼算都划算...

真糟糕,這樣下去是不是代表該換 Q9450 了? 咳咳...



1/19/2008 5:48:00 AM

MSDN Magazine 閱讀心得: Stream Pipeline

Microsoft.NET | Threading

前一陣子研究了幾種可以在 .NET 善用多核心的作法之後, 最近剛好又在在 MSDN Magazine 看到這篇不錯的文章: http://msdn.microsoft.com/msdnmag/issues/08/02/NETMatters/default.aspx 裡面點出了另一種作法: 串流管線 (Stream Pipeline)...

如果你是以多核心能有效利用為目標, 那這篇對你可能沒什麼用... 不過如果你要處理的資料必需經過幾個 Stream 處理的話, 這篇就派的上用場. 這篇是以一個常見的例子為起點: 資料壓縮 + 加密...

在 .NET 要壓縮資料很簡單, 只要把資料寫到 GzipStream 就好, 建立 GzipStream 時再指定它要串接的 FileStream 就可以寫到檔案了 (串 Network 就可以透過網路傳出去... etc), 而加密則是用 CryptoStream, 也是一樣的用法, 可以串接其它的 Stream ...

正好利用這個特性, 資料要壓縮又要加密, 可以這樣作:

(single thread) [INPUT] --> GzipStream --> CryptoStream --> [OUTPUT]

不過那裡跟多核 CPU 扯上關係? 因為壓縮跟加密都是需要大量 CPU 運算, 這樣的作法等於是只用單一 thread 來負責 Gzip 跟 Crypto 的工作. 即使透過串接 stream , 動作被切成很多小段, 壓縮一點就加密一點, 但是仍然只有一個 thread... 再忙也仍然只有用到單一核心, 除非等到未來改板, 用類似上一篇 [TPL] 的方法改寫的 library 才有機會改善...

作者提出另一個觀點, 這兩個 stream 能不能分在兩個 thread 各別執行? 弄的像生產線 (pipeline) 一樣, 第一個作業員負責壓縮, 第二個作業員負責加密, 同時進行, 就可以有兩倍的處理速度... 答案當然是 "可以" ! 我佩服 Stephen Toub 的地方在於他很漂亮的解決了這個問題, 就是它寫了 BlockingStream 就把問題解掉了, 乾淨又漂亮... 真是甘拜下風.. 這也是我為什麼想多寫這篇的原因.

之前在念作業系統, 講到多工的課題, 有提過 "生產者 / 消費者" 的設計, 就是一部份模組負責丟出一堆工作, 而另一個模組則負責把工作處理掉. 如何在中間協調管控就是個重要的課題. 生產的太快工作會太多, 因此多到某個程度就要降低生產者的速度, 通常就是暫停. 消費者消化的太快也會沒事作, 就需要暫停來休息一下等待新的工作. 而消費者這端如果是用 thread pool 來實作, 就剛好是 [上篇] 提到的動態調整 thread pool 裡的 worker thread 數量的機制. 工作太多會動態增加 worker thread 來處理, 工作太少就會讓多餘的 worker thread 睡覺, 睡太久就把它砍了...

而這次 Stephen Toub 解決的方式: BlockingStream 則是處理生產者跟消費者之間的橋樑. Stream 這種類型, 正好是身兼消費及生產者的角色. 他舉了郵件處理的例子. 如果你有一堆信要寄, 你也許會找一個人把卡片折好, 放進信封貼郵票. 這些動作由一個人 (one thread) 來作, 就是原本的作法. 生產者就是那堆要寄的信件, 這個可憐的工人就是要把這些信裝好信封貼好郵票的消費者.

如果有第二個人呢? 他們可以分工, 一個裝信封, 一個黏郵票. 第一個人 ( thread 1 ) 裝好信封交給第二個人 (thread 2), 第二個人再黏郵票就搞定了. 這就是典型的生產線模式. 這樣在第一個人裝信封的同時, 第二個人可以貼上一封信的郵票... 因此除了裝第一封信, 第二個人沒事作, 及貼最後一張郵票, 第一個人沒事作之外, 其它過程中兩個人都有工作, 對應到程式就是一個 thread 負責壓縮, 壓縮好一部份後就交給後面的 thread 負責加密, 同時前面的 thread 則繼續壓縮下一塊資料... 剛好搭配雙核 cpu, 效能就能提高. 兩個 stream 之間就靠 StreamPipeline 及 BlockingStream 隔開在兩個不同的 thread. 原本的流程改為:

[INPUT] --> GzipStream (thread 1) --> BlockingStream --> CryptoStream (thread 2) --> [OUTPUT]

雖然技術細節才是精華, 但是這部份就請各位看原文就好, 我寫再多也比不過原文講的清楚... 我想要比較的是這樣作跟之前碰到的幾種平行處理 or thread pool 差在那裡? 這種作法有什麼好處?

其實最大的差異在於, pipeline 以外的作法 (TPL, ThreadPool) 都是把大量的工作分散給多個工人來處理. 每個人處理的內容其實都是獨立的. 比如之前我處理的問題, 要把一堆照片轉成縮圖. 每個動作都互獨立, 可以很容易的分給多個人處理, 也能得到明顯的加速, 更大的好處是人手越多效能越好.

但是事情不是一直這麼理想. 某些情況是另一個極端, 沒有辦法靠人海戰術解決. 比如這個寄信的例子, 折信紙裝到信封不難, 但是要貼郵票的話, 工人可能就得放下信紙, 到旁邊拿起郵票膠水貼上去. 反覆下來他會浪費時間在交換作這兩件事情上. 這時較好的分工就不是找兩個人一人各處理一半的信, 而是一人負責折信裝信封, 一人負責貼郵票.

當然還有另外一種考量, 如果這些信 *必需* 依照順序處理, 你也無法用 thread pool 這種人海戰術來處理, 一定要把工作切成不同階段, 由專人負責才可以. 這就是 pipeline 能夠應用的時機. 其實這種作法在 CPU 很普遍, 多虧之前修過 micro-processor, 也多少學到皮毛. X86 CPU 從 80386 開始稱作 "超純量" (super scaler) CPU, 就是代表它在單一 clock cycle 能執行完超過一個指令, 靠的也是 pipeline.

再講下去就離題了, 簡單的說以往平行處理都是用 multi thread 來分配工作, 就像銀行有好幾個櫃台, 每個客戶在單一櫃台就辦完他的手續. 而 pipeline 就像拿選票一樣, 第一位查證件, 第二位蓋章, 第三位給你選票, 第四位問你要不要領公投票... 咳咳 (我實在不支持公投題目這麼無聊, 無關黨派)... 這是屬於兩種不同維度的工作切割方式. 使用 pipeline 的方式切割有幾個好處:

  1. 每個階段都負責單一工作, 動作簡單明確, 因此就可以更簡潔快速, 不會有額外的浪費
  2. 不會像 thread pool 那樣, 會有不固定個數的 worker thread 在服務, thread 數量一定是固定的, 減少了 thread create / destory 的成本

不過呢, 缺點也不少, 文章內也列了幾點:

  1. 效能不見得能照比例提升. 整個 pipeline 過程中只要有一階段較慢, 會卡住整條生產線. 以此例來說, 只提升了 20% 的效能, 因為管線的兩個階段花費的時間並不相等.
  2. 如果我有足夠的人手, 但是工作的特質不見得能切成這麼多階段, 因此擴充性有限, 以此例來說, 你用四核CPU就派不上用處了, 仍然只能分成兩階來處理. 
  3. pipeline 啟動及結束的成本較高. 一開始只有前面階段有工作做, 最後只有後面階段有工作做. pipeline 切越多 stage 問題就越嚴重. 這在 CPU 的架構設計上比較常在探討這問題, 就是管線清空後造成的效能折損. 這也是為什麼常聽到園區停電一分鐘, 台積電就會損失幾億的原因... 因為它的生產線很長, 停掉生產線跟啟動生產線的成本高的嚇人.

廢話講了很多, 主要是這篇文章提供了另一種平行處理的方式, 是較少看到的, 就順手把心得寫一寫了, 自己留個心得, 以後才不會忘掉 [:D]. 最後, 很豬頭的是, 整篇英文很辛苦的 K 完, 才發現有中文版的, 真是... 下次各位看文章前請先注意一下有沒有中文版 [H], 當然你英文很好的話就沒差... 哈哈..



12/17/2007 2:21:00 AM

ThreadPool 實作 #3. AutoResetEvent / ManualResetEvent

Microsoft.NET | Threading | 作業系統 | 技術隨筆

續上篇, 從眾多閒置的 worker thread 挑選一個起來接工作有兩種策略作法. 一種作法是 Thread Pool 自己決定, 最基本的就是誰等最久就叫誰起來, 或是 Thread Pool 有自己的演算法挑一個最菜的 worker thread 來做工都可以... 另一種作法就是不管它, 每個 worker thread 都靠運氣, 交給上天 (OS) 決定, 看誰搶到下一個 job. 看起來第一種好像比較好, 事實上不見得. 每個 thread 之間的排程是個學問, OS 多工的效率好不好就看這個. 舉例來說, 如果每個 worker thread 的優先順序不同, 或是某些 thread 正好碰到 GC, 或是正好被移到 virtaual memory 等等, 硬去叫它起來工作反而要花更多的時間. 而這些資訊都在 OS 的排程器裡才有足夠的資訊可以判斷, 以寫 AP 的角度很難顧級到這個層面. 這時最好的辦法就是不管它, 用齊頭式的平等, 把選擇權交給 OS 決定.

又是一個說起來比 code 多的例子. 這兩種不同的策略, 寫成 code 其實只差一行... 就是選用 AutoResetEvent 跟 ManualResetEvent 的差別而以. .NET SDK 的 Class Reference 上這樣寫著:

AutoResetEvent: Notifies a waiting thread that an event has occurred.

ManualResetEvent: Notifies one or more waiting threads that an event has occurred.

真正寫成 Code 來測試一下...

   1:  static void Main(string[] args)
   2:  {
   3:      for (int count = 0; count < 5; count++)
   4:      {
   5:          Thread t = new Thread(new ThreadStart(ThreadTest));
   6:          t.Start();
   7:      }
   8:      Thread.Sleep(1000);
   9:      wait.Set();
  10:      Thread.Sleep(1000);
  11:      wait.Set();
  12:      Thread.Sleep(1000);
  13:      wait.Set();
  14:      Thread.Sleep(1000);
  15:      wait.Set();
  16:      Thread.Sleep(1000);
  17:      wait.Set();
  18:  }
  19:   
  20:  private static AutoResetEvent wait = new AutoResetEvent(false);
  21:  private static void ThreadTest()
  22:  {
  23:      Console.WriteLine("Thread[{0}]: wait...", Thread.CurrentThread.ManagedThreadId);
  24:      wait.WaitOne();
  25:      Console.WriteLine("Thread[{0}]: wakeup...", Thread.CurrentThread.ManagedThreadId);
  26:  }

執行結果:

Thread[ 3 ]: wait...
Thread[ 5 ]: wait...
Thread[ 4 ]: wait...
Thread[ 6 ]: wait...
Thread[ 7 ]: wait...
Thread[ 3 ]: wakeup...
Thread[ 4 ]: wakeup...
Thread[ 6 ]: wakeup...
Thread[ 5 ]: wakeup...
Thread[ 7 ]: wakeup...

程式過程中我加了幾個 Sleep, 首先我用同一個 AutoResetEvent, 讓五個 thread 都去等待同一個 notify event 來叫醒它. 而 AutoResetEvent 一次只能叫醒一個被 WaitOne blocked 住的 thread. 就是第一種先到先贏的作法, 後面幾行 wakeup 的 message 每隔一秒會跳一行出來.

 

再來看一下 ManualResetEvent ...

   1:  static void Main(string[] args)
   2:  {
   3:      for (int count = 0; count < 5; count++)
   4:      {
   5:          Thread t = new Thread(new ThreadStart(ThreadTest));
   6:          t.Start();
   7:      }
   8:   
   9:      Thread.Sleep(1000);
  10:      wait.Set();
  11:  }
  12:   
  13:  private static ManualResetEvent wait = new ManualResetEvent(false);
  14:  private static void ThreadTest()
  15:  {
  16:      Console.WriteLine("Thread[{0}]: wait...", Thread.CurrentThread.ManagedThreadId);
  17:      wait.WaitOne();
  18:      Console.WriteLine("Thread[{0}]: wakeup...", Thread.CurrentThread.ManagedThreadId);
  19:  }

執行結果:

Thread[ 3 ]: wait...
Thread[ 4 ]: wait...
Thread[ 5 ]: wait...
Thread[ 6 ]: wait...
Thread[ 7 ]: wait...
Thread[ 5 ]: wakeup...
Thread[ 4 ]: wakeup...
Thread[ 6 ]: wakeup...
Thread[ 3 ]: wakeup...
Thread[ 7 ]: wakeup...

除了把型別宣告從 AutoResetEvent 換成 ManualResetEvent 之外, 其它都沒變. 當然 line 10 一次就能叫醒所有的 thread, 所以後面四次 Set( ) 我就直接刪掉了. 程式 run 到 line 10, 後面五行 wakeup 的訊息就會一次全出現, 而出現的順序是隨機的, 每次都不大一樣.

這種作法的解釋, 是一次 Set( ), 卡在 WaitOne( ) 的五個 thread 就全被叫醒了. 而這個現象如果套用在 SimpleThreadPool 的實作上, 它的作用相當於第二種作法. 一瞬間把所有的 worker thread 從 blocked 狀態移到 waiting 狀態. 而到底是那一個 thread 有幸第一個被 OS 移到 running 狀態? 就是根據 OS 自己的排程策略而定. 第一個移到 running 狀態的 thread 通常就能搶到 job queue 裡的工作, 剩下的沒搶到, 則又會因為沒有工作好做, 再度進入閒置狀態, 等待下一次機會再一起來碰一次運氣...

就這一行, 花了最多篇幅來說明, 因為它最抽象. 說明這段的目的, 如果你的 ThreadPool 要更進階一點, 如果你想要改用先排隊先贏的策略, 把 WaitHandle 的型別改成 AutoResetEvent 就好. 如果你希望根據工作的特性來微調每個 thread 的 priority, 你就必需用 ManualResetEvent.

好, 沒想到一百行左右的 SimpleThreadPool 有這麼多東西可以寫, 完整的 code 我直接貼在底下, 歡迎引用. 好用的話記得給個回應. 要用在你的 project 也歡迎, 只要禮貌性的支會我一聲. 讓我知道我寫的 code 被用在什麼地方就好. 寫到這裡總算告一段落. 謝謝收看 [:D]

 

--
完整的 SimpleThreadPool.cs 原始碼:

 

   1:  using System;
   2:  using System.Collections.Generic;
   3:  using System.Text;
   4:  using System.Threading;
   5:  using System.Diagnostics;
   6:   
   7:  namespace ChickenHouse.Core.Threading
   8:  {
   9:      public class SimpleThreadPool : IDisposable
  10:      {
  11:          private List<Thread> _workerThreads = new List<Thread>();
  12:   
  13:          private bool _stop_flag = false;
  14:          private bool _cancel_flag = false;
  15:   
  16:          private TimeSpan _maxWorkerThreadTimeout = TimeSpan.FromMilliseconds(3000);
  17:          private int _maxWorkerThreadCount = 0;
  18:          private ThreadPriority _workerThreadPriority = ThreadPriority.Normal;
  19:   
  20:          private Queue<WorkItem> _workitems = new Queue<WorkItem>();
  21:          private ManualResetEvent enqueueNotify = new ManualResetEvent(false);
  22:   
  23:          public SimpleThreadPool(int threads, ThreadPriority priority)
  24:          {
  25:              this._maxWorkerThreadCount = threads;
  26:              this._workerThreadPriority = priority;
  27:          }
  28:   
  29:          private void CreateWorkerThread()
  30:          {
  31:              Thread worker = new Thread(new ThreadStart(this.DoWorkerThread));
  32:              worker.Priority = this._workerThreadPriority;
  33:              this._workerThreads.Add(worker);
  34:              worker.Start();
  35:          }
  36:   
  37:          public bool QueueUserWorkItem(WaitCallback callback)
  38:          {
  39:              return this.QueueUserWorkItem(callback, null);
  40:          }
  41:   
  42:          public bool QueueUserWorkItem(WaitCallback callback, object state)
  43:          {
  44:              if (this._stop_flag == true) return false;
  45:   
  46:              WorkItem wi = new WorkItem();
  47:              wi.callback = callback;
  48:              wi.state = state;
  49:   
  50:              if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();
  51:   
  52:              this._workitems.Enqueue(wi);
  53:              this.enqueueNotify.Set();
  54:   
  55:              return true;
  56:          }
  57:   
  58:          public void EndPool()
  59:          {
  60:              this.EndPool(false);
  61:          }
  62:   
  63:          public void CancelPool()
  64:          {
  65:              this.EndPool(true);
  66:          }
  67:   
  68:          public void EndPool(bool cancelQueueItem)
  69:          {
  70:              if (this._workerThreads.Count == 0) return;
  71:   
  72:              this._stop_flag = true;
  73:              this._cancel_flag = cancelQueueItem;
  74:              this.enqueueNotify.Set();
  75:   
  76:              do
  77:              {
  78:                  Thread worker = this._workerThreads[0];
  79:                  worker.Join();
  80:                  this._workerThreads.Remove(worker);
  81:              } while (this._workerThreads.Count > 0);
  82:          }
  83:   
  84:          private void DoWorkerThread()
  85:          {
  86:              while (true)
  87:              {
  88:                  while (this._workitems.Count > 0)
  89:                  {
  90:                      WorkItem item = null;
  91:                      lock (this._workitems)
  92:                      {
  93:                          if (this._workitems.Count > 0) item = this._workitems.Dequeue();
  94:                      }
  95:                      if (item == null) continue;
  96:   
  97:                      try
  98:                      {
  99:                          item.Execute();
 100:                      }
 101:                      catch (Exception)
 102:                      {
 103:                          //
 104:                          //  ToDo: exception handler
 105:                          //
 106:                      }
 107:   
 108:                      if (this._cancel_flag == true) break;
 109:                  }
 110:   
 111:                  if (this._stop_flag == true || this._cancel_flag == true) break;
 112:                  if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
 113:                  break;
 114:              }
 115:   
 116:              this._workerThreads.Remove(Thread.CurrentThread);
 117:          }
 118:   
 119:          private class WorkItem
 120:          {
 121:              public WaitCallback callback;
 122:              public object state;
 123:   
 124:              public void Execute()
 125:              {
 126:                  this.callback(this.state);
 127:              }
 128:          }
 129:   
 130:          public void Dispose()
 131:          {
 132:              this.EndPool(false);
 133:          }
 134:      }
 135:  }


12/17/2007 2:12:00 AM

ThreadPool 實作 #2. 程式碼 (C#)

Microsoft.NET | Threading | 作業系統 | 技術隨筆

既然上一篇都把 pseudo code 寫出來了, 現在就可以開始來寫真正的 Thread Pool 了. 開始之前, 我先把目標定一下. 這次寫的 Thread Pool 必需俱備這些能力:

  1. 要能由使用者控制 thread pool 的組態:
    - worker thread 數量上限
    - worker thread 優先權
    - thread idle timeout 時間 (超過 idle timeout, 代表 thread 是宂員, 可以下台了)
    - job queue 安全範圍 (超過代表需要找幫手 - 建立新的 worker thread)
  2. thread pool 在 job queue 超過安全範圍時, 要能動態建立新的 thread 來消化 queue 裡的工作
  3. worker thread 在 idle 時間超過 idle timeout 時, 則這個 worker thread 就要被回收
  4. 簡單的同步機制, 要能等待 thread pool 處理完所有的 job.
  5. 如果有多個 worker thread 要搶同一個 job 來執行, 要由 OS 決定, 不要由 thread pool 自己決定

每次在寫這些描述, 都會覺的怎麼寫起來比 code 還多... @_@, 沒錯, code 短到我可以直接貼上來, 不需要附檔案.. 我會把完整的 code 貼在最下方. 其它說明的部份只會貼片段.

首先, 先來決定 SimpleThreadPool 的 class define 為何. 依照需求及我希望它用起來的樣子, 為:

   1:      public class SimpleThreadPool : IDisposable
   2:      {
   3:          public SimpleThreadPool(int threads, ThreadPriority priority)
   4:          {
   5:          }
   6:   
   7:          public bool QueueUserWorkItem(WaitCallback callback)
   8:          {
   9:          }
  10:   
  11:          public bool QueueUserWorkItem(WaitCallback callback, object state)
  12:          {
  13:          }
  14:   
  15:          public void EndPool()
  16:          {
  17:          }
  18:   
  19:          public void CancelPool()
  20:          {
  21:          }
  22:   
  23:          public void EndPool(bool cancelQueueItem)
  24:          {
  25:          }
  26:   
  27:          private void DoWorkerThread()
  28:          {
  29:          }
  30:   
  31:          public void Dispose()
  32:          {
  33:              this.EndPool(false);
  34:          }

這個 ThreadPool 我希望它用起來像這樣, 貼一段理想中的用法 sample code:

   1:              SimpleThreadPool stp = new SimpleThreadPool(2, System.Threading.ThreadPriority.BelowNormal);
   2:   
   3:              for (int count = 0; count < 25; count++)
   4:              {
   5:                  stp.QueueUserWorkItem(
   6:                      new WaitCallback(ShowMessage),
   7:                      string.Format("STP1[{0}]", count));
   8:                  Thread.Sleep(new Random().Next(500));
   9:              }
  10:              Console.WriteLine("wait stop");
  11:              stp.EndPool();
把 ThreadPool 想像成一個服務櫃台, 很多人排隊等著處理. 因此整個實作會像是個工作的佇列 (job queue), 只要把你的工作放到 queue 裡 (排隊), 而服務人員 (worker thread) 就會一個一個的處理. 最後你可以決定要把所有工作做完才收攤 (呼叫 EndPool( ), 會 blocked 直到工作清光), 或是決定掛牌 "明日請早" (呼叫 CancelPool( )), 只把作到一半的工作處理掉, 剩下還在排隊的改天再來.

整個實作的關鍵部份是在 private void DoWorkerThread(), 裡面寫的 code 就是每一個 worker thread 要執行的所有內容. 補上實作的 code:

   1:   
   2:  private void DoWorkerThread()
   3:  {
   4:      while (true)
   5:      {
   6:          while (this._workitems.Count > 0)
   7:          {
   8:              WorkItem item = null;
   9:              lock (this._workitems)
  10:              {
  11:                  if (this._workitems.Count > 0) item = this._workitems.Dequeue();
  12:              }
  13:              if (item == null) continue;
  14:   
  15:              try
  16:              {
  17:                  item.Execute();
  18:              }
  19:              catch (Exception)
  20:              {
  21:                  //
  22:                  //  ToDo: exception handler
  23:                  //
  24:              }
  25:   
  26:              if (this._cancel_flag == true) break;
  27:          }
  28:   
  29:          if (this._stop_flag == true || this._cancel_flag == true) break;
  30:          if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;
  31:          break;
  32:      }
  33:   
  34:      this._workerThreads.Remove(Thread.CurrentThread);
  35:  }

每個 worker thread 就只作很簡單的一件事, 就是進入無窮迴圈, 只要開始上班就不段的接工作來處理, 一直到下班為止. 整個最外層的 while loop 就是指這部份. 離開 loop 後就代表這個 worker thread 該下班了.

迴圈內也很簡單, 上工的第一件事就是看 job queue 裡有沒有工作要做? 有就 dequeue 一個來處理, 一直重複到 job queue 空了為止, 或是直到老闆下令關店 (_cancel_flag 為 true).

無論是要關店或是工作做完了, 流程會跳離 line 6 ~ 27 這個 while loop. 後序的關鍵在 line 30:

   1:  if (this.enqueueNotify.WaitOne(this._maxWorkerThreadTimeout, true) == true) continue;

呼叫 WaitHandle 的 WaitOne( ) method, 會讓 worker thread 進入 blocked 狀態. 直到被叫醒為止 (叫醒它的 code 寫在 add queue 裡), 或是 idle timeout 時間到了. .NET API WaitHandle.WaitOne( ) 提供 option 指定 timeout 時間, 至於是被叫醒的 or 時間到了自己醒來, 就靠 return value 來判定. 以這段 code 來看, 被叫醒 (return true) 代表有新工作進來, 就執行 continue 指令, 繼續到 job queue 拿新的工作繼續努力, 如果是睡太飽自己醒的, 就執行 break, 準被收拾東西下班去...

整個 worker thread 的生命周期就是靠這段 code 來運作. 接下來看一下如何把 job 加進來:

 
   1:  private List<Thread> _workerThreads = new List<Thread>();
   2:  private Queue<WorkItem> _workitems = new Queue<WorkItem>();
   3:  private ManualResetEvent enqueueNotify = new ManualResetEvent(false);
   4:   
   5:  public bool QueueUserWorkItem(WaitCallback callback, object state)
   6:  {
   7:      if (this._stop_flag == true) return false;
   8:   
   9:      WorkItem wi = new WorkItem();
  10:      wi.callback = callback;
  11:      wi.state = state;
  12:   
  13:      if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();
  14:   
  15:      this._workitems.Enqueue(wi);
  16:      this.enqueueNotify.Set();
  17:   
  18:      return true;
  19:  }
 
扣掉一大半準備 WorkItem 的 code 之外, 剩下的就是把 workitem 加到 queue 裡了. 兩個關鍵的地方是:
   1:  if (this._workitems.Count > 0 && this._workerThreads.Count < this._maxWorkerThreadCount) this.CreateWorkerThread();

如果 job queue 堆的工作超過 0 個, 而總共的 worker thread 數量還沒超過上限, 就呼叫 CreateWorkerThread( ) 再叫一個 worker thread 來幫忙.

line 14 把 work item 加到 queue 之後, line 15 就緊接著呼叫 WaitHandle.Set( ), 通知所有正卡在 WaitOne( ) 睡覺中的 worker thread 該醒來工作了. 其實到這裡, thread pool 主要結構都說明完了, 剩下的都是細部實作, 比如如何封裝 job 的物件, 如何得知共有幾個 worker thread 等等, 這些直接看 code 比較快我就不多說明了. 搭配前一篇, 提到有各種 synchrinization 機制可以使用, 這裡我用的是 ManualResetEvent, 為什麼要挑這個? 先弄清楚觀念上的問題: 假設有五個 worker thread 都睡著等待新的工作進來, 這時只有一個新的工作進來, 到底是誰該醒來作事? 是由誰決定?

說明起來又是一大篇了... 改寫第三篇再繼續吧!



12/14/2007 2:35:00 AM

ThreadPool 實作 #1. 基本概念

Microsoft.NET | Threading | 技術隨筆

既然都花了力氣回憶起過去學的 ThreadPool Implementation, 而且都用 C# 寫好了, 不如就整理一下好了. 其實寫起來 code 真的都不難, 難的是人腦天生就不適合思考這種 multithreading 的東西, 想多了腦筋真的會打結. 另外一個障礙是有些東西要唸過 Operation System 才會懂, 沒這基礎的話, 光看 API 說明會一個頭兩個大...

這篇還不會貼完整的 code, 先把必要的基礎及認知說明一下. ThreadPool 的概念其實很簡單, 這 design pattern 目的是把過去的 multi-threading programming model 簡化, 把複雜的 threads control 拆到 thread pool implementation 裡封裝起來, 使用它的人只要把你的 job 封裝成一個 job object, 丟到 pool 裡面代為執行就可以了. 然後裡面就套用 "生產者 / 消費者" 的模式, User 不斷的生出 job 給 thread pool, 而 thread pool 不斷的消化掉 (執行 job) 它. 實作這些東西要面臨到的課題, 有這幾項:

  1. 基本的 thread sync 機制
  2. thread pool 內部的 thread 管理, thread 動態建立 / 回收機制
  3. 封裝 job, job queue

先從最抽像的 (1) 來說好了. 這是過去作業系統 (OS) 這門課, 特地花了一整章來說明的課題. 當年第一次碰到用 java 實作 thread pool 時, 我還特地把課本挖出來再看一次... @_@, 不過搬了兩次家, 課本也不曉得塞到那去了, 哈... 印像中記得裡面有 OS 管理下的 process 生命周期 state machine:

圖來自: http://en.wikipedia.org/wiki/Process_states

Image:Process states.svg

中間三個狀態是主要的部份, running 當然就是指執行中, 而要等待別人喚醒則是進入 blocked 狀態, block 狀態時是不會被 OS 分配到任何 cpu time slice 的. 等到被喚醒後, 並不是直接跳到 running, 而是跳至 waiting, 等待 OS 把下一段 cpu time slice 分給這個 thread 時, 它就會再度進到 running 狀態.

所謂的 synchronization, 就是指在 multi-threading 環境下, 每個 thread 各跑各的, 完全不管其它 thread 在作什麼. 但是在某些特殊情況下, 是要 threads 之間互相調整腳步的. 舉例來說, FlashGet 大家都用過吧? 它一次開好幾個 thread 下載同個檔案的不同部份. 那些情況需要 synchrinization ? 舉個 use case:

下載到一半想停掉, UI thread 接受了 user 的指令 (按按鈕), 每個 thread 必需適當的中止目前的動作及存檔. UI thread 必需 "等待" 每個 thread 完成後才能回報 user 停止的指令已經完成.

在 Multi-threading 的環境下, 千萬不要再去用老掉牙的同步方式 (啥? 就是用 global variable, 然後用 loop 一直去檢查), 正規的用法就是用 OS 提供的 synchronization 機制, 一邊 wait, 另一邊去叫醒它. 對照上面的圖來說, 也就是讓 thread 進到 blocked 狀態. 在 Java 裡就是 Thread 的 Notify 相關 method, 在 .NET 則是包裝成 WaitHandle 物件. 以這種最基本的 wait / notify, 在 .NET 可以用 ManualResetEvent 來達成. 簡單的寫兩段 code, 用起來像這樣:

 

   1:  // thread 1: 等著被喚醒
   2:  wait_handle.WaitOne( );
   3:   
   4:  // thread 2: 喚醒 thread 1
   5:  wait_handle.Set( );

 

更複雜的例子, 可以用其它不同型態的 WaitHandle 來達成. 在 .NET 是把所有這種用於同步機制的物件都抽像化成 System.Threading.WaitHandle, 它是 abstract class, 你要找一個合適的衍生類別來用. 這些機制原則上一定要靠 OS 的 API 才能提供, 請別異想天開自己搞一個...  列幾個常用的:

  • AutoResetEvent: 叫醒單一個 wait thread
  • ManualResetEvent: 叫醒多個 wait thread(s)
  • Semaphore (旗標): 只允許有限數量的 thread 執行某段程式.
    再舉 FlashGet 的例子, 如果某個網站只限最多 3 threads download, 就可以用 Sempahore.

其它還有一些, Mutex, Monitor, SpinLock... 就不一一說明了, 直接去翻 OS 課本.. [H]

為什麼花這麼多篇幅講這個? 因為這短短一兩行的 code, 可是控制 thread pool 運作的重要關鍵. Thread Pool 需要一個 Queue 來存放待處理的工作. ThreadPool 同時也要 "養" 數個 threads 來處理掉 Queue 裡面的工作. 正常情況下每個 thread 忙完後就到 Queue 再拿一個工作出來, 如果 Queue 空了, thread 就可以休息 (blocked). 如果 Queue 有新工作進來, 這些睡著 (blocked) 的 thread 就應該要醒來繼續處理堆在 queue 裡的工作.

這是 Thread Pool 的基本型, 通常會用 thread pool 有幾個理由:

  1. 要處理的工作數量很多. 不可能用最古董的作法: 每個工作建一個 thread, 做完就丟掉...  (因為 thread create / delete, OS 是要花成本的, 同時 thread 太多也會造成效能及回應時間下降)
  2. 工作是不斷的持續的產生, 需要有 thread 事先在那邊等著接工作來做, 降低回應時間.
  3. 工作的性質適合以有限的 thread 來處理時

最典型的例子就是 ASP.NET. ASP.NET Hosting 會養一堆 thread, 來服務前方 IIS 送來的一堆 request. 即使 CPU 只有一顆, 多個 thread 也可以有降低回應時間的好處. 記得照 MSDN 的建議, 一個 CPU 建議值是開 25 threads 的樣子... 因此會有一些變型, 以求效能更好一點. 通常 thread 的建立 / 回收, 很花時間. 養一堆 thread 也很花資源. 因此考量的重點都放在怎樣才不會重複建立/回收 thread, 怎樣才不會養太多不工作的 thread ... 歸納一下:

  1. 現有 threads 不夠 (或未達上限), 而還有工作還卡在 Queue 裡沒人處理, 就建立新的 thread 加進來幫忙.
  2. 如果工作都做完了, 多餘的 thread 就可以讓它發呆. 發呆太久的 "宂員" 就可以把它 fired 了... 判定的依據一般都用 idle timeout. 當然也有不同的策略, 那就不管了.

看起來很囉唆, 其實想通之後, 就像 recursive 一樣, 寫起來很簡潔, 多寫兩行都會覺的累贅... 我把流程用假的 code 整理一下:

每個 thread 運作的 body 就像這樣:

   1:  while (true)
   2:  {
   3:      //
   4:      //  從 queue 裡找 job 來做, 直到做完為止.
   5:      //
   6:   
   7:      //
   8:      //  idle.
   9:      //
  10:      if (超過IDLE時間 == true) break;
  11:  }

 

另外, 就是把 Job 加到 Queue 裡的動作要像這樣:

 

   1:  //
   2:  //  把 Job 加到 Queue
   3:  //
   4:   
   5:  if (Job太多)
   6:  {
   7:      //
   8:      //  多建立一些 thread 來幫忙
   9:      //
  10:  }
  11:   
  12:  if (有Idle的thread)
  13:  {
  14:      //
  15:      //  叫醒 thread 來工作
  16:      //
  17:  }

 

上面兩段 code 關鍵就在如何讓 thread idle ? 如何判定 idle 超過某段時間? 另外就是如何叫醒 idle 的 thread? 答案其實就是用上面講的 synchronization 的機制來做. 這些 code 搞定後, 包裝在一起, thread pool 其實就完成了. 很簡單吧? 哈哈... 實際的 code 等下篇再說... 正好寫第二篇的時間, 就讓大家想一想到底該怎麼寫... [H], 敬請期待下集!






精選文章

RUN! PC 文章及範例下載
2010/07. 結合檔案及資料庫的交易處理
2010/05. TxF讓檔案系統也能達到交易控制
2010/04. 生產者 vs 消費者 - 執行緒的供需問題
2008/11. 生產線模式的多執行緒應用
2008/09. 用ThreadPool發揮CPU運算能力
2008/06. SEMAPHORE在ASP.NET的應用
2008/04. 以ASP.NET開發同步WEB應用程式

如何學好 "寫程式" 系列
#1. 該如何學好 "寫程式" ??
#2. 為什麼 programmer 該學資料結構 ??
#3. 進階應用 - 資料結構 + 問題分析
#4. 你的程式夠 "可靠" 嗎?

#5. 善用 TRACE / ASSERT

安德魯是誰?

Andrew Wu | Create Your Badge

我喜歡鑽研物件導向、軟體工程及作業系統等相關技術。我會在這裡發表我的研究心得,也當作我自己的學習筆記。


Recent comments

Comment RSS