RUN!PC 精選文章 - 生產線模式的多執行緒應用

http://www.runpc.com.tw/content/main_content.aspx?mgo=178&fid=E08

無意間 search 我自己的名字,才發現這篇文章除了投稿到 RUN! PC 之外,原來還有刊在網站上的精選文章啊… 哈哈,暗爽一下,順道貼一下 link, 讓沒看到雜誌的網友們也有機會看一看在下的作品…

2019/06/23 補充:
RUN!PC 官網已經沒有這篇文章了 T_T, 上面的連結已經無效了, 我把當時投稿的內容重新整理一下貼在下方,有需要的朋友可以直接參考~


Thread Pipeline, 生產線模式的多執行緒應用

要運用到多核心CPU的運算能力,最普遍的方法就是把工作切成多個獨立的小工作,指派給不同的執行緒 (或是交給執行緒集區處理) 執行,來提升整體的效能。但是複雜的工作並無法切割成獨立的小工作,一定要照順序執行。本文會介紹另一種 “管線” (Pipeline) 的設計方式,就像工廠的生產線一樣,能把一連串相依的工作交給多個執行緒個別執行,一樣能提升效能。

要開發多執行緒程式最大的障礙,一種是開發跟除錯的困難,另一種是很多程式本質上就很難切成多執行緒來增加效能。這時必需用另外的設計模式來解決這個問題。這次的範例程式延續上一篇 [執行緒集區] 的例子: 批次處理大量照片。只不過我們把需求調整一下,這次的要求是把每張照片(*.JPG)轉成縮圖(*.PNG)後,最後還要把所有的照片檔再壓縮成一個 ZIP 壓縮檔。

這個範例會碰到的障礙是: 處理縮圖及壓縮ZIP檔都是需要大量 CPU 運算能力的動作,不過 ZIP 的壓縮動作,一定得等前面的縮圖處理完成後才能開工,而且壓成 ZIP 的部份不能像縮圖一樣切成多個執行緒同時進行,這樣的程式要能在多核心 CPU 系統下發揮效能,是設計上的一大挑戰。

在大部份的情況下,工作都是有先後順序的。之前介紹的執行序集區,對這類的問題就起不了作用,因此本文提出另一種切割方式,能在這種情況下運用多執行緒: 生產線。生產線的運作模式,是源自當年福特汽車廠用來大量生產汽車的作法。在開始寫程式前,我們先來瞭解它的運作方式。生產汽車是件很複雜的工作,整個過程有很多步驟,在輸送帶上每個步驟都有負責的人員,依序完成他的步驟,那麼當零件走到輸送帶的末端,車子就完成了。我們先來看看這張說明生產線運作方式的概念圖 (如圖一):

圖一 生產線的運作概念圖

假設生產一量車子要 100 小時,整個生產線把它平分成 100 個步驟,則每個步驟只要花費 1 小時。每個步驟完成之後,輸送帶會把半成品帶到下一個步驟,直到完成為止。如果生產線只生產一台車子,是沒有任何好處的。如果生產線持續運作,結果就不一樣了。來看看每個步驟跟時間的關係圖 (如圖二)

圖二 生產線的步驟與時間的關係圖

扣掉一開始,後面階段還沒動起來的部份,每一個週期過了,都會有一輛車子完成。用上面的例子來看,每一個小時就可以完成一輛車。

這種模式的好處是簡單,而且容易實作。因為它是用生產線的模式,按照順序進行,因此一個大的工作很容易切割,因為不再有每個小工作必需能獨立運作的限制。生產線的步驟切的越細,則階段的週期就越短,相對的效率就越高。前面的例子每個小時就能完成一輛車,如果我們重新設計生產線,規劃為 500 個階段,則每個階段只需要 12 分鐘就能完成,換句話說最後每 12 分鐘就能生產一輛車,效率提升了 500 倍!

生產線的方法,特點就是能夠視情況需要,拉長生產線的階段,就能提高生產速度。概念暫時先介紹到這裡。我們回過頭來看看,這樣的工作安排方式能夠怎麼替我們解決問題? 這次的範例,我們該怎樣把它套用在生產線的模式? 如圖三:

圖三 處理圖檔及壓縮作業的工廠意示圖

圖四 沒有使用生產線的步骤及時間關係圖

圖五 使用生產線的步骤及時間關係圖

接下來看看程式的兩個階段與時間的關係圖。圖四是未採用生產線模式,而圖五則是採用生產線的方式,每個階段都由一個專用的執行緒來負責。可以看到因為採用生產線,所以兩個步驟在時間上就有部份重疊了,整體工作完成的時間少了一半。來看看程式怎麼寫。先看看 MakeThumbPipeWorkItem 這類別怎麼把工作切成兩個階段 (程式1):

程式1 處理縮圖及壓縮的程式碼:


public class MakeThumbPipeWorkItem : PipeWorkItem
{
    public ZipOutputStream zipos = null;
    public string SourceImageFile = @"D:\TempDisk\IMAGES\IMG_6421 (Canon IXY DIGITAL 55).JPG";


    private string temp = null;

    public override void Stage1()
    {
        //第一階段: 縮圖
        this.temp = Path.ChangeExtension(this.SourceImageFile, ".temp");
        if (File.Exists(this.temp) == true) File.Delete(this.temp);
        MakeThumb(this.SourceImageFile, this.temp, 1600, 1600);
    }

    public override void Stage2()
    {
        //第二階段: 壓縮
        ZipEntry ze = new ZipEntry(Path.GetFileName(Path.ChangeExtension(this.SourceImageFile, ".PNG")));
        zipos.PutNextEntry(ze);

        byte[] buffer = File.ReadAllBytes(this.temp);
        zipos.Write(
            buffer,
            0, 
            buffer.Length);

        File.Delete(this.temp);
        this.temp = null;
        Console.Write('Z');
    }

    private static void MakeThumb(string srcfile, string trgfile, int maxWidth, int maxHeight)
    {
        //略,把 srcfile 圖檔縮成指定大小,存在trgfile路逕
    }
}

接下來來看看生產線如何依序執行這一連串的工作? 如程式2:

程式 2: 生產線的程式碼


public class PipeWorkRunner
{
    public void Start()
    {
        this._is_start = true;
        this._stage1_thread = new Thread(this.Stage1Runner);
        this._stage2_thread = new Thread(this.Stage2Runner);
        this._stage1_thread.Start();
        this._stage2_thread.Start();
        this._stage1_thread.Join();
        this._stage2_thread.Join();
    }

    private Thread _stage1_thread = null;
    private Thread _stage2_thread = null;

    private Queue<PipeWorkItem> _stage1_queue = new Queue<PipeWorkItem>();
    private Queue<PipeWorkItem> _stage2_queue = new Queue<PipeWorkItem>();

    private ManualResetEvent _notify_stage2 = new ManualResetEvent(false);

    private void Stage1Runner()
    {
        {
            while (this._stage1_queue.Count > 0)
            {
                PipeWorkItem pwi = this._stage1_queue.Dequeue();
                pwi.Stage1();
                this._stage2_queue.Enqueue(pwi);
                this._notify_stage2.Set();
            }
        }
    }

    private void Stage2Runner()
    {
        while (true)
        {
            while (this._stage2_queue.Count > 0)
            {
                PipeWorkItem pwi = this._stage2_queue.Dequeue();
                pwi.Stage2();
            }

            if (this._stage1_thread.IsAlive == false) break;

            this._notify_stage2.WaitOne();
        }
    }
}

程式的邏輯並不難,而且出乎意料的簡單。圖三再重畫一次,這次直接把程式用到的資料結構跟類別直接標上去 (如圖六):

圖六: 生產線的程式流程圖

概念上每個階段是靠輸送帶,把半成品從上一階段送到下一階段。程式實作則是用佇列(QUEUE)來扮演兩個階段之間的輸送帶。原料(原圖圖檔)會包裝成 PipeWorkItem 物件,送到 _stage1_queue ,而 _stage1_thread 執行緒會不斷的把 _stage1_queue 裡的 PipeWorkItem 取出,執行完它的階段作業 Stage1() 之後,把半成品放到送到 Stage2 的佇列: _stage2_queue。同樣的 _stage2_thread 執行緒也會不斷的把 _stage2_queue 內的 PipeWorkItem 取出執行 Stage2( ) 。所有的 PipeWorkItem 都執行完畢之後,程式就完成它的任務了。而階段之間就靠 ManualResetEvent: _notify_stage2 來同步。必要時階段 2 會閒置,直到階段 1 通知它有新的工作才會醒過來。

先來看看執行的效率。為了做對照組,我特別準備了沒有採用執行緒的版本,要來看看兩者之間效能的差異 (如表一):

在 Vista x64 環境下 (CPU: Q9300 四核心)

  • 沒有採用執行緒的版本: 執行共花掉了 251.4 秒,CPU使用率約為 27%
  • 採用生產線模式的版本: 執行共花掉了 163.7 秒,CPU使用率約為 43%

表一 執行的時間統計表

效率有明顯的改善,約提升為 1.5 倍。為什麼執行的效率沒有如預期的變成一倍? 這顯示了這種模式的第一個缺點: 因為整個生產的過程是連續的,任何一個步驟耽誤了時間,則後面的步驟就會全部往後延。舉例來說,第一階段生產過慢,則第二階段沒事做就會閒置,反過來第一階段生產過快,半成品都會卡在 _stage2_queue 裡面,第二階段會來不及消化這些工作量,最後的影響就是效能的降低。

要最佳化整體效能的話,就要想辦法讓整個生產線流暢一點,不要發生效能的瓶頸,否則整個生產線會卡在最慢的那個階段。我們在執行 Stage2 的部份加上記時的物件 (Stopwatch),看看第二階段到底閒置多久才會被喚醒 (程式3、表二)?

程式3 偵測 Stage2 閒置時間的程式碼:


idle_timer.Reset();
idle_timer.Start();
this._notify_stage2.WaitOne();
this._notify_stage2.Reset();
Console.WriteLine("Stage 2: Idle {0} msec", idle_timer.ElapsedMilliseconds);

執行結果:


Stage 2: Idle 389 msec
Stage 2: Idle 422 msec
Stage 2: Idle 389 msec
Stage 2: Idle 425 msec
Stage 2: Idle 429 msec
Stage 2: Idle 425 msec
Stage 2: Idle 441 msec
Stage 2: Idle 418 msec

表二 階段2 IDLE 時間

很明顯的,看來壓縮 ZIP 的速度比處理縮圖還快,所以 Stage2 每處理完一個縮圖,都得閒置約 400 msec 後才等的到下一個工作,因此要調整的是在第一階段。改善的方法,不外乎是把第一階段再切成更小的步驟,不過從程式碼看來,已經沒有什麼地方可以切了。另一種方法,則是加派人手,多一組人馬來加快第一階段的做業,用兩倍的人力來處理第一階段,而第二階段效能比較好,繼續維持一組人馬負責就好。

再來看看調整過的程式碼 (程式4),我們在第一階段加派人馬,多了一條專用的執行緒,因此原圖要調整成這樣 (如圖七):

程式4 每個階段的執行緒數量調整:


public void Start()
{
    this._is_start = true;

    this._stage1_thread1 = new Thread(this.Stage1Runner);
    this._stage1_thread2 = new Thread(this.Stage1Runner);
    this._stage2_thread = new Thread(this.Stage2Runner);

    //this._stage1_thread1.Start();
    this._stage1_thread2.Start();
    this._stage2_thread.Start();

    //this._stage1_thread1.Join();
    this._stage1_thread2.Join();
    this._stage2_thread.Join();
}
private Thread _stage1_thread1 = null;
private Thread _stage1_thread2 = null;
private Thread _stage2_thread = null;


private Queue<PipeWorkItem> _stage1_queue = new Queue<PipeWorkItem>();
private Queue<PipeWorkItem> _stage2_queue = new Queue<PipeWorkItem>();

//private ManualResetEvent _notify_stage1 = new ManualResetEvent(false);
private ManualResetEvent _notify_stage2 = new ManualResetEvent(false);

private void Stage1Runner()
{
    PipeWorkItem pwi = null;
    while (true)
    {
        pwi = null;

        lock (this._stage1_queue)
        {
            if (this._stage1_queue.Count > 0) pwi = this._stage1_queue.Dequeue();
        }

        if (pwi == null) break;

        pwi.Stage1();

        //lock (this._stage2_queue)
        {
            this._stage2_queue.Enqueue(pwi);
        }

        this._notify_stage2.Set();
    }
}

圖七 第一階段加派人力

修改過後,Stage2 果然忙碌許多,除了偶爾的幾次閒置之外 (閒置時間也不長,都在 100 msec 之內),其它都是忙碌的不停工作。當然修正的過程中,一樣要注意是否有 ThreadSafe 的問題,除了修改為一步驟多執行緒之外,也加上了 lock 鎖定,以確保程式碼是 ThreadSafe 的。修改過後的執行效能結果 (如表3):

執行共花掉了 98.8 秒,CPU使 用率約為 75%~78%

表3 STAGE2改為兩個執行緒的執行結果

眼尖的讀者也許會發現,是不是能用上一篇文章提到的執行緒集區(ThreadPool)來加速第一階段的效能? 當然可以。只不過千萬別加速過頭了。一方面用更多執行緒是能加速第一階段的產量沒錯,但是一來加速過頭讓第二階段跟不上,就沒用了。更糟的情況是第一階段佔用太多 CPU 運算資源,如果 CPU 核心數量不夠,也有可能讓第二階段的處理速度分掉,讓生產線各階段產能不平衡的問題更嚴重。

前面以汽車工廠生產線為例在討論效能問題時,曾提到步驟切的越多越有效,這個規則套用到我們的例子不一定適用。實際上要考慮的點有兩個地方。

  1. 步驟越多,則啟動及結束生產線的成本越高。 還記得前面的圖X? 你是否注意到,當生產線剛啟動時,後面的階段其實都是閒置的 (圖二右下方的部份都是空白的),因為跟本都還沒有半成品可以處理。同樣的道理,生產線要停工時,前面的階段已經沒有原料可以處理了,因此前面的階段也會處於閒置狀態 (圖二左上方的部份都是空白的)。因此設計時你必需仔細評估步驟該切成幾個? 越多的階段,你需要生產越多的產品,才能攤平更高的生產線啟動成本。其實半導體廠也是典型的例子,整個從晶原到晶片的製作過程也很長。因此你會常聽到新聞報導地震或是停電等因素,如果造成生產線的停頓,那些半成品就都報廢了,重新啟動可能會讓損失金額高達數千萬元,就是這個道理。
  2. 切割的步驟數量過多,則效能提升有限。 我們的實作方式,是以特定的執行序來執行特定的步驟。同時間有過多的執行緒在執行,而CPU的核心速度或運算資源不足時,會讓每個階段的效能往下掉。CPU早就滿載了,當整體效能不能再往上提升時,啟動的成本又增加,對效能也有負面的影響,不可不慎!

這篇文章主要的目的,是以另一種角度來利用多核心 CPU 的運算資源。常見的作法大都以水平的切割為主,把大量且獨立的運算丟給執行緒來處理。而本文以另一種角度,把工作做垂直的切割,分配給數個執行序來處理。希望這篇文章能帶給你另一種觀點,來解決你程式在多核 CPU 執行的效能問題。






安德魯部落格 GPTs

試試用 GPTs 幫你讀文章!
直接用白話文詢問,"安德魯的部落格 GPTs" 會幫你找到相關文章,也會用我文章的知識來回答你的問題。

Facebook Pages

Edit Post (Pull Request)

Post Directory