好久沒有來練習解題的技巧了,這次來聊個有趣的題目:

如果我透過 API 短時間收到大量的 Request, 我如何保證訊息必須按照順序處理?

順序問題,我常碰到的是都不分青紅皂白的就丟到 Message Queue, 結果塞進去的順序就已經不對了,才來問我順序該怎麼辦, 我只能雙手一攤說我也沒轍啊 XDD! 如果你沒辦法一開始就用對的順序放進 Message Queue,你就必須自己面對這問題了。這篇我就是想來聊聊,當 “必要” 時,你會如何處理這種問題?

當年在學網路通訊時,TCP 跟 UPD 的差異就是會處理封包順序 & 重送問題,也剛好讀過背後的作法,你懂原理後就會有機會自己解決。類似例子其實很多,QoS 的應用也是,先前研究過的 微服務基礎建設: 斷路器 #1, 服務負載的控制 那篇,你懂 QoS 怎麼做 Rate Limit,必要時你就能自己解決商業需求。

練習前的思考: 我需要了解這些機制嗎?

所以,別再糾結 “是否有必要重新發明輪子” 的問題了,你不需要重新發明每個輪子,但是要不要讓自己有能力重新建立 “必要” 的輪子,就是你的選擇了。我認為最佳的平衡點,就是做好必要的練習就夠了,一方面投入不算多,另一方面可以得到保障,確保你有能力建構複雜且需要高度整合的系統。我自己擔任架構師的角色,這投資是很值得的,因為我有不得不面對這些難題的理由。對這問題我下個結論:

你需要有能力了解原理 (但不用真的去開發),然後才能判斷要不要自己開發 (重新發明輪子)。如果必要,你才有能力執行。

因此,有沒有必要? 你需要自行判斷。一般小規模的案子應該都不需要用上這樣的技巧,但是當你負責特定系統的關鍵設計時,你可能就需要具備這樣的能力,確保系統的開發不會被技術限制所影響。你需要做的,至少先了解背後的原理或是演算法,並且照著這篇的練習,練過你就知道真正要自己做的難度,在心裡記得這經驗即可。這些練習不大花時間,但是將來你在判斷技術選擇時,就能幫助你立刻做出技術決策。

1. 訊息排序的基本觀念

回到主題: re-order 重新排列。由於你沒辦法像 “排序” (sort) 一般,等到資料收齊了再一次性排序;這邊面對的狀況是你是 “一個一個” 收到不一定正確的順序排列的訊息,而你又必須盡快地照順序處理掉,因此你需要的流程大致上是這樣:

如果收到的訊息是對的,就馬上處理;如果不對,那就放在手上 (時間越短越好),看是要等缺的訊息補上,或是放棄某個訊息繼續往下。其中你要抉擇的是,到底是要等待的時間越短越好? 還是放棄的訊息數量越少越好? 兩者通常必須取捨..

其實除了 TCP 通訊機制之外,也有不少類似問題的討論。我貼幾篇我看過的參考:

Event Ordering in Distributed System - GeeksforGeeks

Causal Ordering of Messages in Distributed System - GeeksforGeeks

另外,我也貼一下 Wiki 上面提到 TCP 處理封包沒有依序傳輸的處理方式:

傳輸控制協定 (TCP, Transmission Control Protocol)

其中有一張圖,說明了傳輸過程中接收端怎麼靠 buffer + ack 達成這件事:

平常我都會看這類介紹看一下人家怎麼解題的,但是大部分都看完有個印象就好了。這年代你不需要用腦袋 “背” 下所有的解決方式,腦袋很貴的,你只要當作索引,需要時有正確關鍵字 (或是現在要改成: 有正確的 Prompt) 等到真的需要實做時,你能馬上查出正確解法就夠了。

我自己消化過這些問題,結論有幾個:

  1. 你的訊息必須要能標示順序
    當你拿到任意兩個 message, 你能否從訊息本身知道誰先誰後? 能否知道中間有無漏掉第三個 (或更多) 訊息? 舉例來說,有來源標記的 timestamp, 就能判斷先後;有來源標記的 sequence number, 你就能判斷中間有無掉號碼..

  2. 你必須界定處理範圍 (緩衝時間)
    如果你先收到後面的訊息,你有多少時間 (緩衝) 能等待前面的訊息送過來? 例如 3 sec 內前面的訊息沒收到你就要放棄了 (有些訊息有時效性,你無法抓著他等太久)

  3. 你必須界定處理範圍 (緩衝空間)
    類似 (2),如果你先收到後面的訊息,你有多少空間 (緩衝) 能等待前面的訊息送過來? 例如中間掉的訊息超過 10 個你就要放棄了 (有些場合你的暫存空間有限,你無法保留過多訊息)

其中,(1) 談的其實是訊息結構的設計與定義,(2) (3) 則是處理能力與容錯能力 (我把他歸類在 SLO) 的要求。基本概念就是這樣,從來源不斷收到的訊息,會擺在 buffer 內,如果你判定順序都對,而且是連續的不需要處理,就直接處理掉了。如果不連續,你就必須先收著 (留在 buffer 內),並且等待下一個應該收到的訊息到來,直到 buffer 內暫時留著的訊息也能拿出來為止。

而例外處理則是 buffer 空間不夠,或是保留的時間太久,你不能再等下去了,你就要明確地告訴後面一關,有多少訊息被放棄了。這大概就是約略的處理方式。

想清楚作法,我想建立一個模型,來表達上面的結構。同時這個模型,我會把關鍵的部分 (溝通介面 & 資料定義) 直接用 code 來描述規格。我想像的處理模型:

對應到程式碼,主要就這三部分,以及我自己追加的監控體系:

  1. command source:
    訊息來源, 會按照流水號標號送出, 並且模擬網路傳輸會有不固定的延遲, 導致接收時可能會有順序的變化 ( ‘GetCommands` )
  2. command buffer:
    需要緩衝 (buffer) 來存放未按順序收到的 command, 重新排列後往後送的機制 (IReOrderBuffer)
  3. command handler:
    執行接收到的 command, 這邊預期一定要是正確的順序 (ExecuteCommand)
  4. monitor:
    Metrics, (後面說明)

接下來,我就分別用程式碼來描述上面這四項應該長成什麼樣子,先把整個解題的框架建立起來,而你要練習的,就是把 (2) 用你的想法做出來,讓整個體系能夠運作。最後 (4) 則是我要模擬觀察特定的指標 (Metrics) 來了解整個系統的運作狀況。

相關程式碼,如果你忍不住的話可以先看我的 GitRepo: Andrew.ReOrderDemo, 或是看完我的說明再去挖 code 出來看也可以。

上面列的四點,都分別在 Program.cs 內有應用的方式,我按照這邏輯個別介紹:

1-1, 訊息來源 ( GetCommands )

直接來看 code, 我也不多寫什麼框架或是類別了,我直接在 Program.cs 就放上我的主程式架構:


// 模擬實際接收到的 Command 順序 (會按照亂數,隨機的局部前後位移)
static IEnumerable<OrderedCommand> GetCommands(int period = 100, int noise = 500) { ... }
        

// 接收到的 Command 結構
public class OrderedCommand
{
    // 流水號,從 0 開始,按照約定必須是連續編號
    public int Position = 0;

    // command 建立的時間
    public DateTime Origin = DateTime.MinValue;

    // command 收到時間
    public DateTime OccurAt = DateTime.MinValue;

    // command 內容說明
    public string Message;
}

我先定義每個 Command 該長什麼樣子的模型,我用了 OrderedCommand 這類別來代表。前面提到一定要能分辨明確的順序,並且判定中間是否有漏接 Command, 我用的是 Position 流水號欄位來標示。而 IEnumerable<OrderedCommand> GetCommands(...) 則是用 yield return 連續產生一連串會隨機局部錯開順序的 Command 產生器。

每個 Command 要標示順序 (Position), 預設從 0 開始編號, 一定會是連續流水號。除了標號之外也要標示來源端發出時間 (Origin), 以及模擬網路傳輸,可能造成的隨機延遲 ( 0 ~ {command_noise} msec ) 標示實際收到的時間 (OccurAt)。最終 GetCommands() 傳回的順序,會按照 OccurAt 來排列的,而不是 Origin (意思是會考慮傳輸的隨機延遲後真正收到的順序), 因此你要在後面的階段 (Command Buffer) 想辦法把錯亂的順序修正回來。修正過程中可能會有些狀況,因此你必須盡快的辨識出哪些 Command 能正常送出 (SEND),那些已收到的 Command 必須被丟棄 (DROP),那些應該收到但是遲遲未收到的應該被略過 (SKIP)。

1-2, 重整訊息順序 ( IReOrderBuffer )


public interface IReOrderBuffer
{
    public bool Push(OrderedCommand data);
    public bool Flush();

    public event CommandProcessEventHandler CommandIsReadyToSend;
    public event CommandProcessEventHandler CommandWasDroped;
    public event CommandSkipEventHandler CommandWasSkipped;
}

我的構想是: 從 1-1 收到的訊息,都立刻放進 (Push) 能串流重新排序的 IReOrderBuffer,後續的動作都由 IReOrderBuffer 的實做 (就是要練習的部分) 來決定。你要決定哪些訊息可以被處理,決定好了就透過事件來通知 Command Handler 該做什麼事。這邊定義了三個事件:

  1. CommandIsReadyToSend:
    這事件就可以通知 Command Handler, 某個 Command
  2. CommandWasDroped:
    Command 已經收到, 但是因為各種原因 (例如收到過去流水號的指令,或是 Buffer 滿了被迫丟棄) 必須放棄處理的通知
  3. CommandWasSkipped:
    跟 CommandWasDropped 類似,但是差別在於 Skip 的指令還沒收到 (因此我也不知道內容是啥),只是從流水號判定中間應該還有存在某些 Command 掉在半路上。同樣,因為這種原因,告知必須略過指定的 Command 事件通知

1-3, 處理訊息 ( ExecuteCommand )

這部分就沒什麼了,單純的把 IReOrderBuffer 判定結果後通知的 Command 接過來處理而已,處理前做一點基本的檢查 (順序):


static object _sync_command = new object();
static int _last_command_position = 0;
static bool ExecuteCommand(OrderedCommand cmd)
{
    if (cmd == null) return false;
    if (cmd.Position <= _last_command_position)
    {
        Console.WriteLine("Execute Command Fail: Wrong Orders...");
        return false;
    }

    lock(_sync_command)
    {
        if (cmd.Position <= _last_command_position)
        {
            Console.WriteLine("Execute Command Fail: Wrong Orders...");
            return false;
        }
        _last_command_position = cmd.Position;
    }

    Console.WriteLine($"Execute Command: {cmd}");
    return true;
}

既然有 GetCommands(…), 當然也有 ExecuteCommand(…). 經由 GetCommands(…) 收到, 包含隨機的延遲送來的 commands, 經過 Buffer 的處理重新排列後, 依序交給 ExecuteCommand(…) 處理。

這邊收到 Command 該做什麼事,並不是我這篇的重點,所以我只簡單地寫了一段 code, 確保交給 ExecuteCommand(…) 的 command 絕對是按照順序的 (我只 check 先後順序,沒有 check 跳號)。跳號沒有檢查,是因為傳輸的過程中可能真的有掉封包,你連收都收不到。

1-4, 監控運作狀態 ( Metrics )

這段我後面實做再補充細節,我只先交代目的。我希望能有類似監控介面來了解這個 Buffer 的運作狀況,於是我先抓了幾個指標 (metrics), 我想要每秒知道這些數據:

  • Push: 每秒有多少 command 被 Push 到 Buffer 內
  • Pop: 每秒有多少 command 能順利被執行 ( CommandIsReadyToSend 事件發出 )
  • Drop: 每秒有多少 command 判定被丟棄 ( CommandWasDropped 事件發出 )
  • Drop Rate (%): 丟失率, drop / push x 100%
  • Buffer Delay: 因為 Buffer 處理的延遲時間
  • Buffer Usage: 因為 Buffer 處理所需的保留 command 空間 (區間最大值)

後面我會模擬幾種狀況,實際跑看看這些數據。不同的實做,我相信這些指標的表現都會有差異,你可以先思考的是你最看重 / 最想優化的指標是哪一個。先能量測才能改善,量測的基礎就是定義指標並且要有能力觀察他。

1-5, 牛刀小試

介面都先設計好了,理論上整個系統結構該怎麼交互運作,應該都確定了吧! 先來 TDD 一下,用 Unit Test 來模擬一下我想像的幾個情境:


[TestMethod]
public void BasicScenario2_OutOfOrderCommand()
{
    this.SequenceTest(
        100,
        new int[] { 0, 1, 2, 3, 5, 4, 6, 7, 8, 9, 10 },
        new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 });
}
        

這段測試,我人工指定了 command 的接收順序, 留意一下第一個參數是 source_sequence, 別眼花了, 不是照順序安排的, #0 ~ #10 之間, #4 跟 #5 的順序是對調的。

我收到這樣的 input, 我期待的結果是整套機制能幫我重新排好, 因此我期待的結果 expected_sequence 是 #0 ~ #10 連續排列。

不過別誤會了,這可不是一般排序的作業練習。看不到的時間序背後,你可能拿不到完整資料前,你就必須決定部分結果了。例如我希望收到 #0 時,你已經確定 #0 是第一個了,你根本不用等待後面的指令,你就可以把 #0 送出了。而收到 #3 之後接著收到 #5, 你就必須先抓著 #5, 等等看待會會不會收到 #4, 在允許時間內收到的話再一次把 #4 & #5 送出去。

因為有串流 & 時間延遲等問題,因此才會有 Metrics 這回事。在結果正確的前提下,你還要想辦法讓這些指標越漂亮越好。因此,來看看 SequenceTest(...) 背後做了什麼:


private void SequenceTest(int buffer_size, int[] source_sequence, int[] expect_sequence)
{
    IReOrderBuffer buffer = new ReOrderBuffer(buffer_size);

    int count = 0;
    buffer.CommandIsReadyToSend += (sender, args) =>
    {
        Console.WriteLine($"SEND: {sender.Position} - {args.Reason}");
        Assert.AreEqual(expect_sequence[count], sender.Position);
        count++;
    };
    buffer.CommandWasDroped += (sender, args) =>
    {
        Console.WriteLine($"DROP: {sender.Position} - {args.Reason}");
    };
    buffer.CommandWasSkipped += (sender, args) =>
    {
        Console.WriteLine($"SKIP: {sender} - {args.Reason}");
    };

    foreach (var cmd in this.GetBasicCommands(source_sequence))
    {
        bool result = buffer.Push(cmd);
    }

    buffer.Flush();
    Assert.AreEqual(expect_sequence.Length, count);


    var metrics = (buffer as ReOrderBuffer).ResetMetrics();
    Console.WriteLine($"-----------------------------------");
    Console.WriteLine($"Metrics:");
    Console.WriteLine($"- PUSH:          {metrics.push}");
    Console.WriteLine($"- SEND:          {metrics.send}");
    Console.WriteLine($"- DROP:          {metrics.drop}");
    Console.WriteLine($"- SKIP:          {metrics.skip}");

    Console.WriteLine($"- Command Delay: {metrics.delay / metrics.push:0.000} msec");
    Console.WriteLine($"- Buffer Usage:  {metrics.buffer_max}");
}


準備好 IReOrderBuffer 的實做之後,把物件產生出來,設定好各種事件 (CommandIsReadyToSend, CommandWasDropped, CommandWasSkipped) 該怎麼處理, 之後就是從 GetBasicCommands() 逐一把 Command 放 (Push) 進 IReOrderBuffer 就結束了。

我在 CommandIsReadyToSend 的 event hander 內埋了一點 code, 用來比對跟 expected_sequence 是否一致:


int count = 0;
buffer.CommandIsReadyToSend += (sender, args) =>
{
    Console.WriteLine($"SEND: {sender.Position} - {args.Reason}");
    Assert.AreEqual(expect_sequence[count], sender.Position);
    count++;
};

不一樣的話,Assert 就會讓整個 Unit Test 的框架回報給我測試失敗了。這大家應該都熟了,我就不多說。後面我多印了一點輔助資訊,不影響測試是否通過,不過有助於了解執行狀況,我先貼一下執行這個單元測試的 test output:


BasicScenario2_OutOfOrderCommand
   Source: BasicOrderedTests.cs line 21
   Duration: 10 ms


Standard Output: 
SEND: 0 - SEND_PASSTHRU
SEND: 1 - SEND_PASSTHRU
SEND: 2 - SEND_PASSTHRU
SEND: 3 - SEND_PASSTHRU
SEND: 4 - SEND_PASSTHRU
SEND: 5 - SEND_BUFFERED
SEND: 6 - SEND_PASSTHRU
SEND: 7 - SEND_PASSTHRU
SEND: 8 - SEND_PASSTHRU
SEND: 9 - SEND_PASSTHRU
SEND: 10 - SEND_PASSTHRU
-----------------------------------
Metrics:
- PUSH:          11
- SEND:          11
- DROP:          0
- SKIP:          0
- Command Delay: 0.132 msec
- Buffer Usage:  1

細節後面都會再交代,這 test output 看幾個重點就好:

  1. 看時間序
    總共印出了 11 行 ( #0 ~ #10 ) SEND, 背後有標記 SEND 的 “原因”。除了 #5 是 “SEND_BUFFERED” 之外,其他的原因都是 “SEND_PASSTHRU“。這代表 #5 是預先被收到並暫存在 Buffer 內,延遲一段時間才被送出執行的,其他都是接收到直接轉送 (穿透: PASSTHRU) 立刻執行。
  2. 看統計
    #5 被暫存, 因此他的延遲可能特別高一點,最後 Command Delay 顯示的是所有 Command 的平均延遲, 演算法沒調好的人 (假設你用很蠢的方式全部收完再排序一次送出,延遲就會很難看)。另外其他的統計,Buffer Usage 代表過程中你 Buffer 最大放了多少站存 Command 在裡面,以及 PUSH / SEND / DROP / SKIP 的次數統計..

其他還好幾個測試 (到目前為止我寫了 13 個),我就不一個一個貼了,後面有需要再調出來看

2. 環境模擬

基礎結構定義好,基本運作的單元測試也都通過了之後,我開始來設計環境模擬的部分了,同時會更進一步模擬真實的環境。

2-1. 模擬網路傳輸延遲 (隨機)

我在裡面設置了一些機關,讓每個 Command 都有可能隨機產生局部的 “飄移”,先看看 code:


static IEnumerable<OrderedCommand> GetCommands(int period = 100, int noise = 500)
{
    int total_count = 1000;
    TimeSpan cmd_period = TimeSpan.FromMilliseconds(period);
    int cmd_noise = noise;

    List<OrderedCommand> orders = new List<OrderedCommand>();
    DateTime start = DateTimeUtil.Instance.Now.AddSeconds(1.0); // warn up time

    Random rnd = new Random(867);

    for (int i = 0; i < total_count; i++)
    {
        //if (rnd.Next(100) == 0)
        //{
        //    Console.WriteLine($"RANDOM-LOST: {i}");
        //    continue;   // 1% lost rate
        //}

        //
        // todo: 隨機可以改成高斯分布
        //
        var order = new OrderedCommand()
        {
            Position = i,
            Origin = start + cmd_period * i,
            OccurAt = start + cmd_period * i + TimeSpan.FromMilliseconds(rnd.Next(cmd_noise)), 
            Message = $"CMD-{i:#00000}"
        };
        orders.Add(order);
    }

    int check_count = 0;
    foreach (var c in (from x in orders orderby x.OccurAt ascending select x))
    {
        DateTimeUtil.Instance.TimeSeek(c.OccurAt);
        check_count++;
        yield return c;
    }

    Console.WriteLine($"CHECK-COUNT: {check_count}, {orders.Count}");
}

這邊為了重複測試,所以隨機 Random 我特地給固定的種子,如果你調整參數重複跑案例,則每次跑的亂數表都是一樣的,這樣調整才看得出效果。這種 POC 驗證用途的 code, 我選擇不按照一般 production code 的規範來要求他,因為目的完全不同。我為了 GetCommands(...) 產生模擬用的 OrderedCommand 序列, 定了兩個參數,分別是產生週期 (int period = 100), 跟傳輸的干擾雜訊強度 (int noise = 500)。我用一張圖來說明:

在上圖的例子, #1 在 Command Source 時間線產生的時間點就是 {#1}.Origin, 而傳送到 Command Buffer 端的時間點就是 {#1}.OccurAt, 中間的時間差就是 Delay (msec). 而 GetCommand 的第一個參數 period (單位 msec),是說每個 Command 在 Command Source 端間隔多久會被發出。

第二個參數 noise 則代表每個 Command 的延遲 (delay) 都會落於 0 msec ~ {noise} msec 的範圍內, 隨機分布 (其實我有寫註解,這邊應該用高斯分布會更好一點,不過我一時偷懶就沒補上這段 code)。給定這兩個參數後,GetCommands(...) 就會用這樣的條件,幫你產生 1000 筆連續的 Command 序列。

試想一下,你有可能碰到圖上 #2 跟 #3 的例子: 當 #2 延遲了很久才送到,#3 很快就送到了。只要 period 夠小, noise 夠大, 你碰到的機會就越大。極端一點你會碰上 #5 都收到了,#2 還沒收到的例子。這時 Buffer 的大小就很重要,我保留這些參數就是後面打算觀察這些行為。

2-2. 模擬網路傳輸丟失請求

另外我中間也註解掉一段,後面會再打開。我也模擬了一定機率的 Command 會憑空消失,你等多久都收不到的情況。如果你有仔細看我前面的 Unit Test 那 13 個測試案例,其中有兩個就是模擬這狀況,我先貼測試案例的 code:


[TestMethod]
public void BasicScenario5_LostCommand()
{
    this.SequenceTest(
        100,
        new int[] { 0, 1, 2, 3, 4,    6, 7, 8, 9, 10 },
        new int[] { 0, 1, 2, 3, 4,    6, 7, 8, 9, 10 });
}

這是 #0 ~ #10, 中間缺了 #5 的測試案例,而這案例中我給了 Buffer Size 是 100 (Buffer 最多允許暫存 100 commands), 跑出來的輸出跟統計:


PUSH: 0
SEND: 0 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 1
SEND: 1 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 2
SEND: 2 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 3
SEND: 3 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 4
SEND: 4 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 6
      (Buffer: 6)
PUSH: 7
      (Buffer: 6, 7)
PUSH: 8
      (Buffer: 6, 7, 8)
PUSH: 9
      (Buffer: 6, 7, 8, 9)
PUSH: 10
      (Buffer: 6, 7, 8, 9, 10)
SKIP: 5 - SKIP_BUFFERFULL
SEND: 6 - SEND_BUFFERED
SEND: 7 - SEND_BUFFERED
SEND: 8 - SEND_BUFFERED
SEND: 9 - SEND_BUFFERED
SEND: 10 - SEND_BUFFERED
-----------------------------------
Metrics:
- PUSH:          10
- SEND:          10
- DROP:          0
- SKIP:          1
- Max Delay:     400.029 msec
- Average Delay: 100.021 msec
- Buffer Usage:  5


[TestMethod]
public void BasicScenario13_BufferLimitAndLostCommand()
{
    this.SequenceTest(
        3,
        new int[] { 0, 1, 2, 3, 4,    6, 7, 8, 9, 10 },
        new int[] { 0, 1, 2, 3, 4,    6, 7, 8, 9, 10 });
}

同 case 5, case 13 的 Buffer size 只有 3 的情況下輸出的結果:


PUSH: 0
SEND: 0 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 1
SEND: 1 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 2
SEND: 2 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 3
SEND: 3 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 4
SEND: 4 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 6
      (Buffer: 6)
PUSH: 7
      (Buffer: 6, 7)
PUSH: 8
      (Buffer: 6, 7, 8)
PUSH: 9
SKIP: 5 - SKIP_BUFFERFULL
SEND: 6 - SEND_BUFFERED
SEND: 7 - SEND_BUFFERED
SEND: 8 - SEND_BUFFERED
SEND: 9 - SEND_BUFFERED
      (Buffer: --)
PUSH: 10
SEND: 10 - SEND_PASSTHRU
      (Buffer: --)
-----------------------------------
Metrics:
- PUSH:          10
- SEND:          10
- DROP:          0
- SKIP:          1
- Max Delay:     300.300 msec
- Average Delay: 60.423 msec
- Buffer Usage:  3


一模一樣的案例,都是測試 #0 ~ #10, 中間掉了 #5 的情況,差別只是 Buffer 不一樣大 (從這案例可以知道, Buffer 並不是越大越好)。最後得出來的結果,順序完全依樣 (#5 都被迫放棄了),但是 case 5 因為 buffer 夠大,所以反而被延遲了更久的時間。你可以想像這行為也是隨機的,就是我拿掉的那段 code 想表達的意義。

我印一小段產生出來的 GetCommands(...) 序列,驗證一下我上面的描述 (參數是 period = 30, noise = 500),產生出來的序列,就少了 #9 (畫面上其實還少了 #14, #15, #17, 不過那些是我沒截到圖而已):


PUSH: #1,  Origin: 00:00:01.039, OccurAt: 00:00:01.144, Delay: 105.000 msec
PUSH: #5,  Origin: 00:00:01.159, OccurAt: 00:00:01.273, Delay: 114.000 msec
PUSH: #10, Origin: 00:00:01.309, OccurAt: 00:00:01.327, Delay:  18.000 msec
PUSH: #0,  Origin: 00:00:01.009, OccurAt: 00:00:01.343, Delay: 334.000 msec
PUSH: #8,  Origin: 00:00:01.249, OccurAt: 00:00:01.368, Delay: 119.000 msec
PUSH: #6,  Origin: 00:00:01.189, OccurAt: 00:00:01.374, Delay: 185.000 msec
PUSH: #3,  Origin: 00:00:01.099, OccurAt: 00:00:01.405, Delay: 306.000 msec
PUSH: #7,  Origin: 00:00:01.219, OccurAt: 00:00:01.420, Delay: 201.000 msec
PUSH: #4,  Origin: 00:00:01.129, OccurAt: 00:00:01.451, Delay: 322.000 msec
PUSH: #2,  Origin: 00:00:01.069, OccurAt: 00:00:01.464, Delay: 395.000 msec
PUSH: #11, Origin: 00:00:01.339, OccurAt: 00:00:01.512, Delay: 173.000 msec
PUSH: #19, Origin: 00:00:01.579, OccurAt: 00:00:01.589, Delay:  10.000 msec
PUSH: #20, Origin: 00:00:01.609, OccurAt: 00:00:01.616, Delay:   7.000 msec
PUSH: #18, Origin: 00:00:01.549, OccurAt: 00:00:01.622, Delay:  73.000 msec
PUSH: #16, Origin: 00:00:01.489, OccurAt: 00:00:01.645, Delay: 156.000 msec
PUSH: #13, Origin: 00:00:01.399, OccurAt: 00:00:01.664, Delay: 265.000 msec
PUSH: #12, Origin: 00:00:01.369, OccurAt: 00:00:01.695, Delay: 326.000 msec

2-3. 用 Buffer 串接來源與目的地

若把這些 code 都組在一起 ( Command Source + Buffer + Handler ),整合起來的用法大概像這樣:



static void Demo2_ExecuteCommandWithReorderBuffer()
{
    int command_period = 100;
    int command_noise = 500;
    int buffer_size = 10;

    DateTimeUtil.Init(new DateTime(2023, 09, 16));
    IReOrderBuffer ro = new ReOrderBuffer(buffer_size);

    // 略, 每秒鐘會讀取一次 ro 的 metrics 寫成 csv
    DateTimeUtil.Instance.RaiseSecondPassEvent += (sender, args) =>
    {
        // write metrics
    };

    ro.CommandIsReadyToSend += (sender, args) =>
    {
        ExecuteCommand(sender);
    };

    foreach (var item in GetCommands(command_period, command_noise))
    {
        ro.Push(item);
    }
    ro.Flush();

    // 略, 印出執行過程中的 metrics 統計
}

準備好 ReOrderBuffer(...) 物件 ro 之後,先設置好各種 event 對應的程序後,之後就用 foreach(...) 驅動 GetCommands(...) , 依序把每個模擬出來的 OrderedCommand 依序 PUSH 到 ReOrderBuffer 內,隨著時間與序列的推進,對應的 Event 就會被觸發處理。整個程序就會因為這個 foreach(...) 而驅動起來,你可以從 Console Output 看到整個程序的進行。

2-4. 模擬監控機制

回頭來看看,2-3 我沒有說明的 Metrics, 我在 Timer 設置了每秒觸發一次的 Event (RaiseSecondPassEvent),每秒鐘讀取一次當下的 Metrics 統計,並且 Reset 歸零,等著下一秒再統計一次:


int _log_sequence = 0;
Console.Error.WriteLine($"TimeInSec,Push,Send,Drop,Skip,BufferMax,Delay");

var overall_metrics = (ro as ReOrderBuffer).ResetMetrics();
DateTimeUtil.Instance.RaiseSecondPassEvent += (sender, args) =>
{
    // write metrics
    Interlocked.Increment(ref _log_sequence);
    var metrics = (ro as ReOrderBuffer).ResetMetrics();
    double avg_latency = 0;
    if (metrics.send > 0) avg_latency = metrics.delay / metrics.send;
    Console.Error.WriteLine($"{_log_sequence},{metrics.push},{metrics.send},{metrics.drop},{metrics.skip},{metrics.buffer_max},{avg_latency}");
};

這邊我用懶人的做法,什麼套件都不用,用我 20 年前就在用的伎倆… 我把 CSV 的內容寫到 Standard Error, 你看到的第二行就是 CSV 的擋頭欄位名稱,後面每秒就把 metrics 照欄位,用逗號分隔印出來。

雖然這些都會被混在 console 上面,不過實際上 Standard Output 跟 Standard Error, 是兩個獨立的串流, 你用 pipeline 可以分別導向到檔案。

你想把 Stdout 轉到檔案,可以用 > 來做:


dotnet run > myfile.txt

而如果你想把 Stderr 轉到檔案,則可以用 2> 來做 (就是第二個串流的意思):


dotnet run 2> myerror.txt

而我準備了這個 windows command prompt 用的批次檔 (不是 powershell, 我真的是老人 XDD):


cls
rd /s /q output
mkdir output

:: Usage: dotnet run {command period in msec} {command noise} {buffer size}

dotnet run 100 100  100 2> output\metrics-100-100-100.csv
dotnet run 100 200  100 2> output\metrics-100-200-100.csv
dotnet run 100 300  100 2> output\metrics-100-300-100.csv
dotnet run 100 400  100 2> output\metrics-100-400-100.csv
dotnet run 100 500  100 2> output\metrics-100-500-100.csv
					 						  
dotnet run  70 100  100 2> output\metrics-070-100-100.csv
dotnet run  70 200  100 2> output\metrics-070-200-100.csv
dotnet run  70 300  100 2> output\metrics-070-300-100.csv
dotnet run  70 400  100 2> output\metrics-070-400-100.csv
dotnet run  70 500  100 2> output\metrics-070-500-100.csv
					 						  
dotnet run  30 100  100 2> output\metrics-030-100-100.csv
dotnet run  30 200  100 2> output\metrics-030-200-100.csv
dotnet run  30 300  100 2> output\metrics-030-300-100.csv
dotnet run  30 400  100 2> output\metrics-030-400-100.csv
dotnet run  30 500  100 2> output\metrics-030-500-100.csv

跑完後我就可以去收結果的 .csv 了。這技巧我過去用過好幾次了,常看我文章的朋友們應該不陌生,用 EXCEL 打開,加上簡單的圖表,你就可以看到這些每秒印出一筆的 metric 展現的樣子:

我刻意切一些 EXCEL 的畫面,背後的數字就是 CSV 來的。這樣做的目的很單純,將來如果這是個重要的系統,我一定得監控他才行。甚至我在 POC 的時候我就想要監控了。但是,如果你現在就要把 metric 寫到監控系統,這對架構師來說太折磨了 XDD,我重點不是要整合到監控系統,而是我想在 POC 階段就思考我該看那些 metrics, 我想知道我的設計在模擬的情況下,這些 metrics 會有甚麼表現。

因此這做法,就完美的模擬了以後系統真的上線後,我想要看的 dashboard. 我現在在意的是:

  1. 我是否抓對指標?
  2. 實際模擬跑起來的狀況?

透過這幾行 code 的技巧,我完全可以把整套系統的設計思路,控制在 500 行 code 內,單機就能重複模擬執行的複雜度內。這些都滿意了,再交給執行團隊,幫我把它變成真正能上線的系統。

環境模擬到這邊大概都告一段落,萬事俱備,剩下的就等你怎麼實做 IReOrderBuffer 介面了!

如果你想自己挑戰看看,先看到這邊就別往下看了,可以抓我的 code 回去自己挑戰看看。當然我的實做也跳過很多進階的功能,你也可以看完我後面的基本實做,然後再改進補上我沒做完的部分。

準備好的話就往下看,這次看看 IReOrderBuffer 我的設計思考方式

3. Reordering Buffer 的設計

回到主軸, IReOrderBuffer 的設計。前面提到這個介面的用法,也先用測試案例示範怎麼運用這個 interface 了 (還真的是測試驅動開發 + Contract First 啊),我這段打算用這幾個主軸來談:

  1. interface + event definition
  2. data structure & settings
  3. push - core logic, 驅動整個 buffer 運作的主要引擎
  4. (不談) 另一個驅動的引擎 timer
  5. metrics, 檢視引擎運作狀況的儀表板
  6. adapters, 整合引擎 (push),資料結構 與事件

然而,從需求面來看,我這次不會處理所有問題。我先挑最關鍵的主結構,其他能延伸出去處理的議題,我就暫時略過,留給各位練習的機會 (其實是我懶而已 XDD,有機會再補下一篇的話我再追加)。

我接下來的示範,我會處理這些範圍:

  1. 重新排列序列
  2. 控制 Buffer Size
  3. 處理掉訊息的機制

我不會處理這些需求:

  1. 訊息重送的處理機制 (同一個訊息送兩次)
  2. 精準的時間控制 (例如: 都沒有新訊息進來,但是有訊息超時了也能主動 DROP / SKIP)
  3. 訊息過期的處理

既然介面是重點,那再貼一次吧,沒幾行多貼一次省的還要捲回去看:


public interface IReOrderBuffer
{
    public bool Push(OrderedCommand data);
    public bool Flush();

    public event CommandProcessEventHandler CommandIsReadyToSend;
    public event CommandProcessEventHandler CommandWasDroped;
    public event CommandSkipEventHandler CommandWasSkipped;
}

為了讓這些介面能夠順利動起來,是需要上面那些東西一起串起來的,我拿掉實做,只留下定義跟類別簽章:


public class ReOrderBuffer : IReOrderBuffer
{
    // data structure & settings
    private int _current_next_index = 0;
    private SortedSet<OrderedCommand> _buffer = new SortedSet<OrderedCommand>(new OrderedCommandComparer());
    protected readonly int _buffer_size = 0;

    public ReOrderBuffer(int buffer_size_limit) { ... }

    // IReOrderBuffer interface
    bool IReOrderBuffer.Push(OrderedCommand data) { ... }
    bool IReOrderBuffer.Flush() { ... }
    event CommandProcessEventHandler IReOrderBuffer.CommandIsReadyToSend { ... }
    event CommandProcessEventHandler IReOrderBuffer.CommandWasDroped { ... }
    event CommandSkipEventHandler IReOrderBuffer.CommandWasSkipped { ... }

    // metrics
    private int _metrics_total_push = 0;
    private int _metrics_total_send = 0;
    private int _metrics_total_drop = 0;
    private int _metrics_total_skip = 0;
    private int _metrics_buffer_max = 0;
    private double _metrics_buffer_delay = 0.0;

    public (int push, int send, int drop, int skip, int buffer_max, double delay) ResetMetrics() { ... }

    // adapters
    protected bool Send(OrderedCommand data, CommandProcessReasonEnum reason) { ... }
    protected bool Drop(OrderedCommand data, CommandProcessReasonEnum reason) { ... }
    protected bool Skip(int position, CommandProcessReasonEnum reason) { ... }
}


3-1, 用 Buffer 來 Re-Order Command 的規則

整個 Buffer 的核心邏輯,還是在演算法身上。我就先從怎麼用 Buffer 來處理 command 的規則說起好了,我看了一些相關文章介紹,盡可能剝掉不必要的細節 (那些可以以後再加回去) 留下關鍵的規則。這個題目從一開始,都是希望有個 ReOrderBuffer 來處理重新排序的問題,給他限制資源 (這就是資料結構問題),只能暫存一定時間內的 command (這就是核心邏輯問題), 弄清楚資源必須準備多少 (這就是時間跟空間複雜度問題),超過這範圍的 command 就得丟棄。

因此一切設計都從這裡開始,我多描述一點串流排序的做法。來看這個案例,如果我收到的 command 順序是這樣:

#1, #3, #4, #2

則:

  • 收到 #1 會立即執行 #1
  • 收到 #3 偵測到缺 #2, 所以 #3 會先被保留起來 (這時 Buffer 只有 #3)
  • 同上,收到 #4 也會被保留起來 (這時 Buffer 有 #3, #4 兩筆)
  • 收到 #2 會立即執行 #2, 執行後檢查 buffer, 發現連續的 #3, #4 也都在, 就會緊接著執行 (這時 Buffer 會被清空)

用時間序 (從上到下) 來對比收到的跟執行的順序:

time sequence push execute buffered
1 #1 #1 (空)
2 #3   #3
3 #4   #3, #4
4 #2 #2, #3, #4 (空)

還記得前面的 Unit Test 嗎? 我事後也用我自己寫的 Unit Test 跑了這個 case 14, 輸出就當作對照組: (每次 PUSH 可能都會引發接續的 SEND / DROP / SKIP 動作, 這些動作結束後會印出 Buffer 內的清單)

測試案例 #14: BasicScenario14_ArticleDemo1


[TestMethod]
public void BasicScenario14_ArticleDemo1()
{
    this.SequenceTest(
        100,
        new int[] { 0, 1, 3, 4, 2 },
        new int[] { 0, 1, 2, 3, 4 });
}

測試輸出訊息:


PUSH: 0
SEND: 0 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 1
SEND: 1 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 3
      (Buffer: 3)
PUSH: 4
      (Buffer: 3, 4)
PUSH: 2
SEND: 2 - SEND_PASSTHRU
SEND: 3 - SEND_BUFFERED
SEND: 4 - SEND_BUFFERED
      (Buffer: --)

果然,寫 code 沒那麼複雜的話,寫 code 比寫文章輕鬆啊 XDD, 再來看複雜一點的例子,如果我收到的順序是: #1, #3, #5, #2, #6, #4, #7, #8 的話

time sequence push execute buffered
1 #1 #1 (空)
2 #3   #3
3 #5   #3, #5
4 #2 #2, #3 #5
5 #6   #5, #6
6 #4 #4, #5, #6 (空)
7 #7 #7 (空)
8 #8 #8 (空)

對照組 測試案例 #15, BasicScenario15_ArticleDemo2


[TestMethod]
public void BasicScenario15_ArticleDemo2()
{
    this.SequenceTest(
        100,
        new int[] { 0, 1, 3, 5, 2, 6, 4, 7, 8 },
        new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 });
}


PUSH: 0
SEND: 0 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 1
SEND: 1 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 3
      (Buffer: 3)
PUSH: 5
      (Buffer: 3, 5)
PUSH: 2
SEND: 2 - SEND_PASSTHRU
SEND: 3 - SEND_BUFFERED
      (Buffer: 5)
PUSH: 6
      (Buffer: 5, 6)
PUSH: 4
SEND: 4 - SEND_PASSTHRU
SEND: 5 - SEND_BUFFERED
SEND: 6 - SEND_BUFFERED
      (Buffer: --)
PUSH: 7
SEND: 7 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 8
SEND: 8 - SEND_PASSTHRU
      (Buffer: --)

以上,是靠 Buffer, 將順序不對但是先接收到的 Command 暫存起來,然後適當時機再送出去的邏輯,你就想像你在玩牌,等著抽到缺的那張關鍵牌,抽到後就能整個排組打出去的感覺,你手上的手牌就是 Buffer, 你的出牌技巧就是演算法。你得判斷哪些牌要留久一點 (要賭),那些要盡快出出去。

3-2, Buffer Size 越大越好嗎?

接下來,考慮一個複雜一點的狀況: 掉訊息

掉訊息困難的地方在於: 在當下你不會知道這訊息是已經掉了,還是只是晚一點才會拿到。已經掉了的話你越早放棄他 (SKIP) 越好,反正你等再久也等不到,早點放棄可以用更低的 Buffer,也能讓後面的訊息更早發出去;

晚一點到的話你等他越久越好,你的訊息完整性會更好 (都不用放棄任何一個訊息),但是你需要更大的 Buffer, 暫存更多 Command 在 Buffer 內,也會讓更多 Command 的延遲提高,有可能違背你對整個系統的 SLO 期待。

決策的部分下一章在聊,這段先聊做法。我用這兩個測試案例來說明。直接看測試案例 #5 跟 #13,兩者安排的 source_sequence 完全一樣,都是 #0 ~ #10, 但是中間少掉 #5,而兩個案例只差在 Case #5 的 Buffer Size 給很夠 (100), 另一個 Case #13 則給剛剛好不夠的大小 (3),來看看預期的反應:

測試案例 #5: BasicScenario5_LostCommand


[TestMethod]
public void BasicScenario5_LostCommand()
{
    this.SequenceTest(
        100,
        new int[] { 0, 1, 2, 3, 4,    6, 7, 8, 9, 10 },
        new int[] { 0, 1, 2, 3, 4,    6, 7, 8, 9, 10 });
}

跑出來的結果:

time sequence push execute buffered
1 #0 #0  
2 #1 #1  
3 #2 #2  
4 #3 #3  
5 #4 #4  
6 #6   #6
7 #7   #6, #7
8 #8   #6, #7, #8
9 #9   #6, #7, #8, #9
10 #10   #6, #7, #8, #9, #10
11 (flush) #5(skip), #6, #7, #8, #9, #10  

PUSH: 0
SEND: 0 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 1
SEND: 1 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 2
SEND: 2 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 3
SEND: 3 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 4
SEND: 4 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 6
      (Buffer: 6)
PUSH: 7
      (Buffer: 6, 7)
PUSH: 8
      (Buffer: 6, 7, 8)
PUSH: 9
      (Buffer: 6, 7, 8, 9)
PUSH: 10
      (Buffer: 6, 7, 8, 9, 10)
SKIP: 5 - SKIP_BUFFERFULL
SEND: 6 - SEND_BUFFERED
SEND: 7 - SEND_BUFFERED
SEND: 8 - SEND_BUFFERED
SEND: 9 - SEND_BUFFERED
SEND: 10 - SEND_BUFFERED
-----------------------------------
Metrics:
- PUSH:          10
- SEND:          10
- DROP:          0
- SKIP:          1
- Max Delay:     400.029 msec
- Average Delay: 100.021 msec
- Buffer Usage:  5

因為 Buffer 夠大 (處理過程需要 5, 這案例給 100), 所以看起來為了等 #5 進來等了很久,連 #10 都收到了還在等,要不是測試已經跑到最後一步強迫呼叫 Flush() 來清光 Buffer, 這 case 可能還會繼續等下去。試想一個極端的 case, 如果你有 1000 或更大的 Buffer, 而你的 Command 從 #0 ~ #1000, 你只掉了 #5, 這麼大的 Buffer 會導致你把整個序列都放進 Buffer, 而 #5 以後的 Command 你一個也送不出去,直到 Flush() 為止.. (這不見得是你期待的結果)。

留意一下 Metrics, 我統計了三個有趣的數字:

  1. Max Delay,
    我統計了每個 Command, 從 Origin (被產生),至送到 Buffer 手上 OccurAt (被 PUSH),再到被執行 (被 SEND) 的延遲。在這個 case, 最大延遲是 400msec, 只的就是在 Flush(...) 那瞬間, #5 決定被放棄後,當時在 Buffer 被保留最久的 #6 足足延遲了 400ms 才被送出..
  2. Avg Delay,
    其實就是每個 Command 的 Delay 平均, 參考用
  3. Buffer Usage,
    雖然這 case 我開了 100 的 Buffer Size, 但是我還是在追蹤實際使用量,整個過程中最大只用到 5 而已。實際情況下,你可以透過監控這數字來了解你的 Buffer 是不是開太大了

再來看測試案例 #13, BasicScenario13_BufferLimitAndLostCommand, 一樣的輸入, 只是我故意把 Buffer 限縮到只有 3, 要呈現 Buffer 不夠處理強迫丟棄的情境:



[TestMethod]
public void BasicScenario13_BufferLimitAndLostCommand()
{
    this.SequenceTest(
        3,
        new int[] { 0, 1, 2, 3, 4,    6, 7, 8, 9, 10 },
        new int[] { 0, 1, 2, 3, 4,    6, 7, 8, 9, 10 });
}

跑出來的結果:

time sequence push execute buffered
1 #0 #0  
2 #1 #1  
3 #2 #2  
4 #3 #3  
5 #4 #4  
6 #6   #6
7 #7   #6, #7
8 #8   #6, #7, #8
9 #9 #5(skip), #6, #7, #8, #9  
10 #10 #10  
11 (flush)    
PUSH: 0
SEND: 0 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 1
SEND: 1 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 2
SEND: 2 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 3
SEND: 3 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 4
SEND: 4 - SEND_PASSTHRU
      (Buffer: --)
PUSH: 6
      (Buffer: 6)
PUSH: 7
      (Buffer: 6, 7)
PUSH: 8
      (Buffer: 6, 7, 8)
PUSH: 9
SKIP: 5 - SKIP_BUFFERFULL
SEND: 6 - SEND_BUFFERED
SEND: 7 - SEND_BUFFERED
SEND: 8 - SEND_BUFFERED
SEND: 9 - SEND_BUFFERED
      (Buffer: --)
PUSH: 10
SEND: 10 - SEND_PASSTHRU
      (Buffer: --)
-----------------------------------
Metrics:
- PUSH:          10
- SEND:          10
- DROP:          0
- SKIP:          1
- Max Delay:     300.300 msec
- Average Delay: 60.423 msec
- Buffer Usage:  3



這是另一個極端的例子,Buffer 給的很小 (Size 只有 3),因此等不了多久,才收到 #8 就決定要放棄等待 #5, 直接 SKIP(#5), 後面就一路順暢了。整體 Buffer 佔用空間很少,延遲也很低。但是如果 #5 在 #9 後面才出現,你收到也沒用,被迫得 DROP(#5) 了,相較 Buffer 夠大的情況下,能夠完全沒有 DROP command 的情況下完成任務。

來看看同樣的 Metrics, 這次我們只看 Max Delay … 上一個 case 5, 這數字是 400 msec, 這次則只有 300 msec, 主要是因為 Buffer Size 的因素,#5 提前在 #9 PUSH 後就被迫放棄 (原本是 #10 PUSH 後 Flush 才放棄),提早放棄 #5, 就提早讓躺在 Buffer 裡待最久的 #6 被放出來。因此這 case #6 只需要被 delay 300 msec 就能送出,延遲時間從 400 msec 改善到 300 msec

請牢記這 Buffer 的作法,與這幾個 Test Case, 用這些案例套用待會的 code 你才能理解為何 code 需要這樣安排。演算法就先聊到這邊,接下來看 code…

3-3. 資料結構

為了實做上面那些演算法,我在 ReOrderBuffer.cs 這類別實做了 IReOrderBuffer 這介面,並且宣告了這些 private variable, 主要就是要建構能處理上述那些案例的 data structure (包含 settings), 我貼這部分的 code:


public class ReOrderBuffer : IReOrderBuffer
{
    // data structure & settings
    private int _current_next_index = 0;
    private SortedSet<OrderedCommand> _buffer = new SortedSet<OrderedCommand>(new OrderedCommandComparer());
    protected readonly int _buffer_size = 0;


    // metrics
    private int _metrics_total_push = 0;
    private int _metrics_total_send = 0;
    private int _metrics_total_drop = 0;
    private int _metrics_total_skip = 0;
    private int _metrics_buffer_max = 0;
    private double _metrics_buffer_delay = 0.0;

    public (int push, int send, int drop, int skip, int buffer_max, double delay) ResetMetrics() { ... }
}

資料結構這部分很單純,我只宣告了一個變數 int _current_next_index, 用來標記照順序的話下一個 Command Position 應該是哪個;另外我宣告了一個 SortedSet<OrderedCommand> 來代表佔存那些已經收到,但是還無法處理的 Command 用的 Buffer 而已。我用了 SortedSet<T>, 它可以幫我排序 (為了讓他自動排序,所以我替 OrderedCommand 實做了 IComparer<T> 介面),同時 Set 會幫我排除相同的 Command. 如果將來我還需要實做排除已經略過的 Command, .NET 的 SortedSet<T> 還能支援我做各種交集 (Interset)、聯集 (Union) 等等集合的運算。不過這次還用不到,點到為止。

附上為了讓 SortedSet 能正常運作,補上的 IComparer 實做:


public class OrderedCommandComparer : IComparer<OrderedCommand>
{
    public int Compare([AllowNull] OrderedCommand x, [AllowNull] OrderedCommand y)
    {
        return x.Position.CompareTo(y.Position);
    }
}
    

而 Buffer 的運作,需要一些來自外部的設定。在 3-2 聊了一些 Buffer 大小的選擇,就是其中之一 (目前也只有這個),就是 int _buffer_size, 這變數決定了 SortedSet 的容量上限,超過了你還 PUSH 的話,Buffer 就應該觸發 SKIP 的機制,預先略過部分可能等不到它的 command.

理論上這些範圍限制也需要包含時間,不過這次略過時間,有機會再回過頭聊這個主題時再補上這部分。

3-4, 監控指標 Metrics

為了監控這整個過程的運作,我設置了六個監控指標:


// metrics
private int _metrics_total_push = 0;
private int _metrics_total_send = 0;
private int _metrics_total_drop = 0;
private int _metrics_total_skip = 0;
private int _metrics_buffer_max = 0;
private double _metrics_buffer_delay = 0.0;


public (int push, int send, int drop, int skip, int buffer_max, double delay) ResetMetrics()
{
    return (
        Interlocked.Exchange(ref this._metrics_total_push, 0),
        Interlocked.Exchange(ref this._metrics_total_send, 0),
        Interlocked.Exchange(ref this._metrics_total_drop, 0),
        Interlocked.Exchange(ref this._metrics_total_skip, 0),
        Interlocked.Exchange(ref this._metrics_buffer_max, 0),
        Interlocked.Exchange(ref this._metrics_buffer_delay, 0));
}

在 Buffer 運作的過程中,隨時都會更新這六個數值,其中四個 ( push, send, drop, skip ) 是單純的 counter, 而 buffer_max 則是只保留最大值的指標, delay 則是保留所有 send 的 command 的 delay 時間總和。

為了能方便取用,我設計了一個 method: ResetMetrics(), 它能同時歸零這六個指標,同時把歸零前那瞬間的數值傳回來。我這邊特地用了 Interlocked 的技巧來實做,不是說我的 POC 負載有多高,會碰到 racing condition 等狀況,而是這是我自己用來提醒以後所有看到這段 code 的人,POC 高度簡化的環境,很多問題都會被遮蔽,而我習慣用 “降級” 的技巧來簡化問題,這樣寫是提醒我,未來我真的把這些 metrics 搬出去 (例如放到 Redis), 這是提醒我我應該要用同等規格,用正確地 Redis 指令來操作它們 (Redis 也有 GETSET 這指令可以直接對應 Interlocked.Exchange, 這在計算機科學裡面, 聊到作業系統的多工處理, 都是很基本的課題)

題外話,如果你有參加我在 DevOpsDays Taipei 2023 的 Keynote Speech 的話,我提到的維運設計,就是指這樣的思路。

3-5. Push() , 驅動 Buffer 運作的引擎

直接來看最核心的部分,也是驅動整個 ReOrderBuffer 的主要 method: Push()

規格跟定義清楚後,code 其實不難寫,邏輯重點抓好就好了。我移除非關鍵的 code, 直接來看 ReOrderBuffer 的實作。我只貼Buffer 本身的資料結構,以及驅動整個 Buffer 運作的 Push():


public class ReOrderBuffer : IReOrderBuffer
{
    private int _current_next_index = 0;
    private SortedSet<OrderedCommand> _buffer = new SortedSet<OrderedCommand>(new OrderedCommandComparer());
    protected readonly int _buffer_size = 0;


    public ReOrderBuffer(int buffer_size_limit)
    {
        this._buffer_size = buffer_size_limit;
    }


    bool IReOrderBuffer.Push(OrderedCommand data)
    {
        this._metrics_total_push++;
        this._metrics_buffer_max = Math.Max(this._metrics_buffer_max, this._buffer.Count);

        if (data.Position < this._current_next_index)
        {
            // drop
            this.Drop(data, CommandProcessReasonEnum.DROP_OUTOFORDER);
            return false;
        }
        else 
        {
            if (data.Position == this._current_next_index)
            {
                this.Send(data, CommandProcessReasonEnum.SEND_PASSTHRU);
                this._current_next_index = data.Position + 1;
            }
            else
            {
                this._buffer.Add(data);
            }

            do
            {
                if (this._buffer.Count > this._buffer_size && this._current_next_index < this._buffer.Min.Position)
                {
                    // skip:
                    this.Skip(this._current_next_index, CommandProcessReasonEnum.SKIP_BUFFERFULL);
                    this._current_next_index++;
                }
                while (this._buffer.Count > 0 && this._current_next_index == this._buffer.Min.Position)
                {
                    var m = this._buffer.Min;
                    this._buffer.Remove(m);
                    this.Send(m, CommandProcessReasonEnum.SEND_BUFFERED);
                    this._current_next_index++;
                }
            } while (this._buffer.Count > this._buffer_size);

            this._metrics_buffer_max = Math.Max(this._metrics_buffer_max, this._buffer.Count);
            return true;
        }
    }

    
}

目前的版本,由於都是由 Push() 驅動,所以寫起來很單純。實際狀況則都是跟時間賽跑啊,我舉個情境,如果照 #1, #3, #2, #4 順序 PUSH, 而輸入 #1, #3 的時候就停了, 如果你想要保證 500ms 內回應, 所以需要在完全沒有任何 input (push) 的情況下, 主動 Skip #2, 才能讓早已收到的 #3 能快點被處理的話該怎麼辦?

這時,就不能只單靠 Push() 來驅動了,必須有另一條管道是由時間 (timer) 來驅動的途徑才辦的到。不過我前面就講了, 這暫時不再我這次要實做的範圍內,跟時間相關的機制,各位有興趣可以挑戰看看,也歡迎發 PR 給我 XDD, 不然就等我哪天想到了再來補後半段..

上面這段 code 也不長,我猜各為自己看 code 應該也能看得懂。前面說明過資料結構,這時 Push( ) 的內容就很明瞭了,收到 Command, 先排除完全不處理 (早就過號的) Command, 剩下的就分兩路,一個是 position 正好就是下一個, 馬上就 SEND_PASSTHRU 轉發出去了。

接下來,不管是不是直接轉發,或是被存起來,後面的動作就一致,看看補上了目前這個 Command 之後,Buffer 內是否有能夠一起跟著送出去的 Command? (就是 SEND_BUFFERED 的狀況)

這時,如果 Buffer 已經滿了,就要做額外的裁定了。如果 Buffer 還沒滿,就繼續暫存無所謂,如果已經滿了,而你卻又在等待某個號碼補進來,這時你就被迫得立刻做決定了。我的選擇是優先保留已經收到的 command (順位比較大的),而選擇放棄不再等待下一個就要執行,但是卻還沒進來的 command. 原因是網路不穩定的話,這個 command 可能永遠不會進來,我可能賠了夫人又折兵。另外,丟掉後面的 command, 也可能造成連鎖效應,等到我要處理到後面的時候,又會面臨那個 command 也不會再進來了,丟失 command 會變成連鎖反應越滾越大..

結果好壞後面再說,這裡是要讓各位了解判斷的邏輯。規則弄清楚了,對照著 code 大概就能理解背後的邏輯,我就不逐行解釋了。

會驅動整個 Buffer 運作的,除了 PUSH 之外,另一個就是 Flush, Flush 是在你確定指令通通都結束時,強迫 Buffer 做個清算,把還留在 Buffer 裡的 command, 該 Skip, 該 Send 的都一次清算完畢。規則類似,只是條件不同,我直接貼 code 就好:


bool IReOrderBuffer.Flush()
{
    while(this._buffer.Count > 0)
    {
        if (this._current_next_index == this._buffer.Min.Position)
        {
            // pop
            var m = this._buffer.Min;
            this._buffer.Remove(m);
            this.Send(m, CommandProcessReasonEnum.SEND_BUFFERED);
            this._current_next_index++;
        }
        else
        {
            // skip
            this.Drop(
                new OrderedCommand()
                {
                    Position = this._current_next_index,
                    Message = "Command not received, and skip waiting. Message body unknown."
                },
                CommandProcessReasonEnum.SKIP_BUFFERFULL);
            this._current_next_index++;
        }
    }            

    return true;
}


3-6, Adapters 串聯事件機制

最後,剩下轉發出來的 SEND / DROP / SKIP, 額只是更新 metrics, 轉發事件等搭配的作業,我直接貼程式碼。最終的事件都是在這邊被發出來的,發出事件後,外界就可以透過 CommandIsReadyToSend / CommandWasSkipped / CommandWasDropped 來接收通知,做對應的處理了 (你要把它轉發到支援 FIFO 的 Message Queue 也行)。

這部分的 code:


protected bool Send(OrderedCommand data, CommandProcessReasonEnum reason)
{
    this._metrics_buffer_delay += (DateTimeUtil.Instance.Now - data.Origin).TotalMilliseconds;// (this._metrics_average_latency * this._metrics_total_pop + (data.OccurAt - data.Origin).TotalMilliseconds) / (this._metrics_total_pop + 1);
    this._metrics_total_send++;
    this._send?.Invoke(data, new CommandProcessEventArgs()
    {
        Result = CommandProcessResultEnum.SEND,
        Reason = reason,
    });

    return true;
}


protected bool Drop(OrderedCommand data, CommandProcessReasonEnum reason)
{
    this._metrics_total_drop++;
    this._drop?.Invoke(data, new CommandProcessEventArgs()
    {
        Result = CommandProcessResultEnum.DROP,
        Reason = reason,
    });

    return true;
}

protected bool Skip(int position, CommandProcessReasonEnum reason)
{
    this._metrics_total_skip++;
    this._skip?.Invoke(position, new CommandProcessEventArgs()
    {
        Result = CommandProcessResultEnum.SKIP,
        Reason = reason,
    });

    return true;
}




3-7. DateTime Mock

另一個需要特別交代的,你可能會看到一個不認識的類別: DateTimeUtil, 因為這段模擬會牽涉到時間序,而我要是在 code 裡面直接用到 DateTime.Now 這類功能時,會很麻煩,一不小心我現在 run 的好好的 code, 明年就不能跑了。所以我自己寫了一個 DateTime 的 Mock, 可以控制我想要模擬的時間軸,除了讓我可以任意自訂 “現在” 的時間之外,我也可以壓縮時間線的進行,例如前面提到的 GetCommand(), 我不需要真的 Sleep 100ms, 我只要用 DateTimeUtil.TimePass() 就能做十間跳躍 (只能往後跳), 如果你真的用 Sleep, 時間也會往後跳。而我設置每秒一次的 timer, 他也會正確地幫我觸發。

先前我有一篇說明這個 DateTime Mock, 有興趣可以參考:

4. 模擬與監控

設計概念,單元測試,資料結構都聊完了,最後這段就來跑跑看實際的模擬測試吧。擺了 Metrics 的目的就是要在這階段看看實際的表現 & 調教。首先先來看第一組基本的模擬測試。

模擬測試,為了方便重複執行,我做了一點調整,基本上主程式跟前面做的都一樣,只是我把三個控制環境的條件抽出來當作命令列參數而已:


static void Demo2_ExecuteCommandWithReorderBuffer(string[] args)
{
    int command_period = 100;
    int command_noise = 500;
    
    int buffer_size = 10;

            if (args.Length == 0)
            {
                Console.WriteLine($"Usage: [execute]   ");
                return;
            }
            else
            {
                command_period = int.Parse(args[0]);
                command_noise = int.Parse(args[1]);
                buffer_size = int.Parse(args[2]);
            }

    DateTimeUtil.Init(new DateTime(2023, 09, 16));
    IReOrderBuffer ro = new ReOrderBuffer(buffer_size);


    int _log_sequence = 0;
    Console.Error.WriteLine($"TimeInSec,Push,Send,Drop,Skip,BufferMax,BufferDelay");

    var overall_metrics = (ro as ReOrderBuffer).ResetMetrics();
    DateTimeUtil.Instance.RaiseSecondPassEvent += (sender, args) =>
    {
        // write metrics
        Interlocked.Increment(ref _log_sequence);
        var metrics = (ro as ReOrderBuffer).ResetMetrics();
        double avg_buffer_delay = 0;
        if (metrics.send > 0)
        {
            avg_buffer_delay = metrics.total_delay / metrics.send;
        }
        Console.Error.WriteLine($"{_log_sequence},{metrics.push},{metrics.send},{metrics.drop},{metrics.skip},{metrics.buffer_max},{avg_buffer_delay}");

        // update overall statistics
        overall_metrics.push += metrics.push;
        overall_metrics.send += metrics.send;
        overall_metrics.drop += metrics.drop;
        overall_metrics.skip += metrics.skip;
        overall_metrics.buffer_max = Math.Max(metrics.buffer_max, overall_metrics.buffer_max);
        overall_metrics.max_delay = Math.Max(overall_metrics.max_delay, metrics.max_delay);
        overall_metrics.total_delay += metrics.total_delay;
    };


    ro.CommandIsReadyToSend += (sender, args) =>
    {
        ExecuteCommand(sender);
    };

    ro.CommandWasDroped += (sender, args) =>
    {
        Console.WriteLine($"- {args.Reason,-20},  #{sender.Position}, {(sender.OccurAt - sender.Origin).TotalMilliseconds,5} msec, {sender.Message}");
    };


    foreach (var item in GetCommands(command_period, command_noise))
    {
        ro.Push(item);
    }
    ro.Flush();


    DateTimeUtil.Instance.TimePass(TimeSpan.FromSeconds(10));

    Console.WriteLine($"ReOrderBuffer Overall Metrics:");
    Console.WriteLine($"- Push:          {overall_metrics.push}");
    Console.WriteLine($"- Send:          {overall_metrics.send}");
    Console.WriteLine($"- Drop:          {overall_metrics.drop}");
    Console.WriteLine($"- Drop Rate (%)  {overall_metrics.drop * 100 / overall_metrics.push} %");
    Console.WriteLine($"- Max Delay:     {overall_metrics.max_delay:0.000} msec");
    Console.WriteLine($"- Average Delay: {overall_metrics.total_delay / overall_metrics.send:0.000} msec");
    Console.WriteLine($"- Buffer Usage:  {overall_metrics.buffer_max}");
}


外在環境的變因:
這部分控制我如何模擬外在的傳輸環境,理論上這是 ReOrderBuffer 不可控的,只能掌握他,並且做出對策而已。這變因有三個,我只控制前兩項,第三項固定為 1% 不變:

  1. command_period: 產生 Command 的週期 (msec)
  2. command_noise: 產生 Command 的隨機延遲範圍 (0 ~ n msec)
  3. lost_rate: Command 會在網路傳輸過程中丟失的機率 (根本不會 PUSH 進 Buffer)

限制條件:
這是 ReOrderBuffer 本身能控制的參數,只有一個 buffer_size (我一直提到,理論上應該還要再控制另一個 max_timeout,不過這版本我先不處理):

  1. buffer_size: 設定 ReOrderBuffer 的緩衝區大小

而我在意的結果指標,我看這幾個:

  1. Drop Rate (%) 越低越好。這裡 DROP 指的是真的有收到,但是被迫放棄的比例。傳輸過程中本來就掉的是 SKIP,不算在內。
  2. Max Delay 越低越好 (這是極限值, 單一 Command 可能碰到的最大 Delay, 單位 msec)

藉這機會,用前面這張圖延伸說明一下 Delay. 以這案例, #1, #2 會直接送出 ( SEND_PASSTHRU ),不會再 Buffer 停留 (頂多就 0.x msec 處理時間而已)。但是因為 noise 的關係,#3 比 #2 還早送到 Buffer, 因此 #3 會被暫存放在 Buffer, 等待 #2 已經 SEND 出去後,才會一起把預先放在 Buffer 的 #3 一起 SEND_BUFFERED 出去。

#3 因為 Buffer 的處理,會多一段延遲,這段就是這指標 Max Delay 要追蹤的數字。

4-1, 模擬測試 (100, 500, 10)

先用預設值跑一次吧,預設的組態,跟跑完的 Metrics 統計如下:


Configuration:
- command_period:  100 msec
- command_noise:   500 msec
- buffer_size:     10

ReOrderBuffer Overall Metrics:
- Push:          990
- Send:          990
- Drop:          0
- Drop Rate (%)  0 %
- Max Delay:     1237.963 msec
- Average Delay: 104.525 msec
- Buffer Usage:  10

過程中每個指標我都有輸出 CSV, 我只拉出 Drop, Buffer Usage & MaxDelay 這幾個數值出來觀察:

綠線代表因 Buffer 而造成的 Delay, 事後我逐步追查 log, 解讀是這樣:

  1. 平均約在 100ms 以下的部分,單純就是順序不對,放在 Buffer 內等著下一個 command 收到後調整回來。這些大約都在 100ms 以內能夠修正,正常的發揮。
  2. 圖表上有 8 個大約 500ms 以上的 Peak, 這先瞬間變大的延遲, 其實是來自那 1% 送丟的 command, 因為 Buffer 夠大,這些送丟的 command 後面會牽連著幾個 command, 最多被延遲到 10 個週期後才被強迫送出。

了解了什麼情況會看到甚麼樣的數值之後,看後面的 dashboard 呈現,你應該就能更快的看到監控畫面掌握到實際的運作狀況了。這是很重要的一環,除了快速監控了解狀況之外,這也對於狀況的解讀,以及安排快速反應的 SOP 有很大的幫助。

4-2, 模擬測試 (100, 100, 10)

維持 4-1 的組態,我只把 command_noise 從 500 降到 100 (傳輸延遲從 0 ~ 500 msec 隨機飄移,縮減到 0 ~ 100 msec 隨機飄移,可以把它當作外在的網路品質變好了),跑完的結果如下:

Configuration:
- command_period:  100 msec
- command_noise:   100 msec
- buffer_size:     10

ReOrderBuffer Overall Metrics:
- Push:          990
- Send:          990
- Drop:          0
- Drop Rate (%)  0 %
- Max Delay:     1134.016 msec
- Average Delay: 52.932 msec
- Buffer Usage:  10


同樣抓三個指標的數值畫成圖表:

這邊花點篇幅聊一下 Delay, 這邊的 Delay 只計算 Buffer 收到 Command 後,因為暫存在 Buffer 內延遲了多少時間 (主要就是 SEND_BUFFERED 的狀況),傳輸延遲 ( 0 ~ 500 msec 那個 ) 是沒有計算在內的,因為我把它當作外在因素,無法改善的,所以在指標上我沒有特別統計他。

因此這個數字的好壞,完全取決於我收到的 Command 到底順序排得有多 “亂” ? 越亂的順序,越需要依賴站存在 Buffer 內重排,系統就會越察覺到 Command 經過 Buffer 後卡多久才會被送出? 這也是個適合的 SLO 目標。

在其他條件都不變的情況下,網路改善 ( command_noise 從 500 msec -> 100 msec ),就可以讓 Max Delay 從 1237.963 msec 進步到 1134.016 msec, 這就是能觀察到的改變

從圖表來看,調低了 noise ( 500 -> 100 ), 你會發現綠線 delay 的指標,跟 4-1 相比, 100ms 以下的單純錯位修正幾乎都不見了,只剩下傳輸丟失的 Peak 還在。

4-3, 模擬測試 (100, 500, 5 ~ 1)

如果網路品質都固定在 500 ( command_noise ), 我只調整 buffer_size, 從 10 降為 5, 3, 1 各模擬一次, 我直接貼三份模擬結果出來:

模擬測試 (100, 500, 5):

Configuration:
- command_period:  100 msec
- command_noise:   500 msec
- buffer_size:     5

ReOrderBuffer Overall Metrics:
- Push:          990
- Send:          990
- Drop:          0
- Drop Rate (%)  0 %
- Max Delay:     777.193 msec
- Average Delay: 68.508 msec
- Buffer Usage:  5

驗證了前面單元測試,當 Buffer 越大的時候,可能導致不必要的延遲 (因為有指令丟掉了,Buffer 越大只是卡越久而已),Buffer Size 降成 5 反而讓延遲從 1237.963 msec 降到 777.193 msec.

模擬測試 (100, 500, 3):

Configuration:
- command_period:  100 msec
- command_noise:   500 msec
- buffer_size:     3

ReOrderBuffer Overall Metrics:
- Push:          990
- Send:          988
- Drop:          2
- Drop Rate (%)  0 %
- Max Delay:     636.567 msec
- Average Delay: 59.871 msec
- Buffer Usage:  3

Buffere Size 真的越小越好嗎? 那我繼續降,這次我只開 Buffer Size = 3 再跑一次模擬。延遲繼續降了沒錯,從 1237.963 msec 持續降到 636.567 msec 沒錯,不過這樣的 Buffer Size 開始無法發揮修正順序的能力了。因為網路的品質關係,只有 3 的 Buffer Size 不足以提供足夠的緩衝,讓這些 Command 的順序有足夠的空間站存調整回正確的順序,因此這次模擬,已經觀察的到 2 個 Command 被收到後丟棄了 (DROP)。上面的統計被我四捨五入了,雖然顯示是 0%, 實際上是 0.2% …

模擬測試 (100, 500, 2):

Configuration:
- command_period:  100 msec
- command_noise:   500 msec
- buffer_size:     2


ReOrderBuffer Overall Metrics:
- Push:          990
- Send:          954
- Drop:          36
- Drop Rate (%)  3 %
- Max Delay:     425.002 msec
- Average Delay: 54.486 msec
- Buffer Usage:  2

我再繼續降 Buffer Size, 這次用 2 來跑一次模擬。 很明顯的,Drop Rate 持續飆升,這次跑到 3% 了 (實際上 1000 個 Command, 網路傳輸掉了 10 個,只收到 990 個,而經過 Buffer 處理又被丟掉 36 個,最後成功往後送的 Command 只有 954 個)

模擬測試 (100, 500, 1):

Configuration:
- command_period:  100 msec
- command_noise:   500 msec
- buffer_size:     1

ReOrderBuffer Overall Metrics:
- Push:          990
- Send:          844
- Drop:          146
- Drop Rate (%)  14 %
- Max Delay:     356.657 msec
- Average Delay: 39.691 msec
- Buffer Usage:  1

都做到這邊了,我就把 Buffer Size = 1 也跑完吧! Drop Rate 飆升到 14%

這類的機制,都是為了提高可靠度而設計的,應該把 Drop 數字控制在 0 的前提下,盡量降低延遲才是合理的作法。以這個組態,看起來最終決定的 Buffer Size 應該控制在 5 以上才合適。面對這樣的網路品質,給了合適的 Buffer Size, 就足以將錯誤的順序校正回來,並且維持 (最重要的是可以預測) 一定的延遲時間。

最後我把這組實驗的結果,放在同一張表格:

控制不變的參數:

  • command_period: 100
  • command_noise: 500
SN Buffer Size Drop Delay
1 10 0 1237.963
2 5 0 777.193
3 3 2 636.567
4 2 36 425.002
5 1 146 356.657

4-4, 改善

其實,這段要講的,就是我前面每一段都忍不住會提醒一下,我沒做的 “command timeout” 機制。

這類系統,其實最終都是看服務水準 (SLO, Service Level Objective) 目標怎麼定的。串流及時處理的系統,講求的都是延遲必須控制在保證範圍內。這邊我們能控制的,只有 Buffer Size, 如果 Command 送過來的速率是固定的話,其實 Buffer Size 就足夠了,反正速率 x 時間 就等於 容量了,基本上是能有對應關係的。

但是就如同上面的實驗,當 Buffer Size 越大,在極端情況下會讓 Command 非預期的留在 Buffer 過久,因此缺乏一個 “主動” 的機制,把停留太久的 Command 從 Buffer 強制拿出來 (不論是 SKIP 或是 DROP) 的機制,而也因為這次模擬我用的是 DateTime Mock, 篇幅有限我不大想在這基礎去擴充高精確度的 timer 來做這件事 (其實還是可以啦,有興趣的可以參考這篇: 後端工程師必備: 排程任務的處理機制練習)

結構上其實就跟 Flush(...) 類似,只是要有個高精確度的 timer, 時間到了就去觸發 Buffer, 定期檢查是否有該收到但是還沒收到的 Command, 要主動觸發 SKIP 事件? 或是定期檢查是否有 Command 已經被暫放在 Buffer 超出預期的時間,必須強制 DROP 該 Command?

如果有搭配這樣的機制,你就能放心的擴大 Buffer Size 了,這樣才是我認知中理想的服務設計。這類服務應該以 SLO 為主要目標,實做或是演算法都應該是支撐目標才對,而非目標來遷就設計 (抱歉,我只示範了一半 XDDD)。搭配 timer 做主動偵測,Buffer 的設計就會更有適應性,不但可以一次 allocate 足夠大的 Buffer Size, 確保能應付瞬間的變化,也能精準控制 delay 不會超過預期的範圍。良好的設計是能兼顧各種需求的,只是這次的範例我沒有實做到後半段。

這段,有興趣的朋友們可以親自動手測試看看,我把最困難的部分 (演算法,介面定義,實驗環境,測量機制) 都定義好了,有心想要成為架構師的,可以趁這機會練習體驗看看。如果你完成了,歡迎在底下留言跟我分享你的做法跟心得。

5, 總結

不知道看完這篇文章的朋友們,有多少人有參加 DevOpsDays Taipei 2023, 聽我講那場 “架構師也要 DevOps” 的演講?

其實我在裡面提到的用模型 + 模擬,來預先了解 & 解決維運問題,就是這篇我示範的整個過程。有很明確的規格或演算法,那通常是開發 (Dev) 領域擅長的,演算法設計,規格設計,甚至是單元測試,都能讓你做出如預期的成果。

但是這個例子,最終是要上戰場的服務啊,你無法控制網路品質,你也無法用 “計算” 的方式預估你要多大的 Buffer Size, 因此如果你沒有用模型來模擬,你也無從 “猜測” 你的做法是否能真的上線解決問題。

架構師難為的地方就在於,只有困難 (大家搞不定,也想不通怎麼解決) 的問題才會來找你。在這前提之下你從有了想法,到真的能上線驗證,通常需要花很長的時間才走得到這一步。因此,想進一切辦法,讓問題還在很早期的階段就能被驗證,對架構師這角色而言是非常重要的,如果你提出一個錯誤的方式,其他人要等半年後才能告訴你行不通,那架構師的存在意義就不見了不是嗎?

因此,長年面對這些難題,我才會發展出我自己驗證解決方案是否可行的流程,從概念就要把它建立成可被評估的模型,接著把它變成可被驗證可被執行的 MVP。這階段除了拿來驗證解題之外,也應該拿來驗證真實上線與監控等等行為 (現在越來越強調 DevOps, 開發維運一體化)。架構師的設計也不應該僅止於開發的規劃跟設計,維運的設計也應該包含在內才對。

所以,這篇文章的 (2), (3) 講的就是 Dev 的部分,而 (4) 則是講 Ops 的部分,綜合驗證設計後才真正發包給相關團隊實際開發出來,就是我整篇文章背後的思考脈絡。






安德魯部落格 GPTs

試試用 GPTs 幫你讀文章!
直接用白話文詢問,"安德魯的部落格 GPTs" 會幫你找到相關文章,也會用我文章的知識來回答你的問題。

Facebook Pages

Edit Post (Pull Request)

Post Directory