可編程網關 Pipy 第二彈:編程實現 Metrics 及源碼解讀

由于要給團隊做一下關于 Flomesh 的分享,準備下材料。

“分享是最好的學習方法。”

上一回初探可編程網關 Pipy,領略了 Pipy 的“風騷”。從 Pipy 的 GUI 交互深入了解了 Pipy 的配置加載流程。

今天看一下 Pipy 如何實現 Metrics 的功能,順便看下數據如何在多個 Pipeline 中進行流轉。

前置

首先,需要對 Pipy 有一定的了解,如果不了解看一下上一篇文章。

其次構建好 Pipy 環(huán)境,關于構建還是去看上一篇文章。

Metrics 功能實現

至于 Pipy 實現 Metrics 的方式,源碼中就有,位于 test/006-metrics/pipy.js。

  • 代理監(jiān)聽 6080 端口,后端服務在 8080 端口,Metrics 在 9090 端口
  • 共有 5 個 Pipeline:3 個 listen 類型,2 個 Pipeline 類型
  • 7 種過濾器:fork、connectdecodeHttpRequest、onMessageStartdecodeHttpResponse、encodeHttpRespnsereplaceMessage

貼一下源碼:

pipy({
  _metrics: {
    count: 0,
  },
  _statuses: {},
  _latencies: [
    1,2,5,7,10,15,20,25,30,40,50,60,70,80,90,100,
    200,300,400,500,1000,2000,5000,10000,30000,60000,
    Number.POSITIVE_INFINITY
  ],
  _buckets: [],
  _timestamp: 0,
})

.listen(6080)
  .fork('in')
  .connect('127.0.0.1:8080')
  .fork('out')

// Extract request info
.pipeline('in')
  .decodeHttpRequest()
  .onMessageStart(
    () => (
      _timestamp = Date.now(),
      _metrics.count++
    )
  )

// Extract response info
.pipeline('out')
  .decodeHttpResponse()
  .onMessageStart(
    e => (
      ((status, latency, i) => (
        status = e.head.status,
        latency = Date.now() - _timestamp,
        i = _latencies.findIndex(t => latency <= t),
        _buckets[i]++,
        _statuses[status] = (_statuses[status]|0) + 1
      ))()
    )
  )

// Expose as Prometheus metrics
.listen(9090)
  .decodeHttpRequest()
  .replaceMessage(
    () => (
      (sum => new Message(
        [
          `count ${_metrics.count}`,
          ...Object.entries(_statuses).map(
            ([k, v]) => `status{code="${k}"} ${v}`
          ),
          ..._buckets.map((n, i) => `bucket{le="${_latencies[i]}"} ${sum += n}`)
        ]
        .join('\n')
      ))(0)
    )
  )
  .encodeHttpResponse()

// Mock service on port 8080
.listen(8080)
  .decodeHttpRequest()
  .replaceMessage(
    new Message('Hello!\n')
  )
  .encodeHttpResponse()

測試

使用 ab 做請求模擬 ab -n 2000 -c 10 http://localhost:6080/,然后檢查下記錄的指標。

$ http :9090 --body
count 2000
status{code="200"} 2000
bucket{le="1"} 1762
bucket{le="2"} 1989
bucket{le="5"} 1994
bucket{le="7"} 1999
bucket{le="10"} 2000

分析

TL;DR:本次示例的核心是 fork,從字面意思就很容易理解:新開一個處理分支(Pipeline),與主線并行執(zhí)行。

src/inbound.cpp:104 109 處,Pipy 接收一個新的連接。

創(chuàng)建 ContextSession,并在 L178 處注冊事件的處理器,然后在 L187 處開始接收數據。

#receive 方法中,定義了數據接收處理器:將讀到的數據寫入 buffer 中。這個 buffer 存儲的是 Event類型數據。(所以說 Pipy 是基于數據流事件,將一些封裝成了事件)

接著調用 Session#input。

實際上調用的是 ReusableSession#input,調用 m_filters#process 方法。m_filters 實際上是 Filter 類型。

為什么只有一個 Filter?重點來了,看下 ReusableSession 的構造過程就能明白了(這里用了個反向迭代器)。output 是當前 Filter 處理完要執(zhí)行的,實現類似鏈式的執(zhí)行。

再回頭看上面的示例,可以想象 fork 就是 Sessionm_filters。

src/filters/fork.cpp:85,在 fork 過濾器中,在 1 處從 module 中獲取到目標 Pipeline,并在 34 處 創(chuàng)建了新的 Session 并保存原 Session 的數據。

然后在 5 處將原 Event 輸入到新的 Session 中,觸發(fā)目標 PipelineFilter 鏈。值得注意的是,這里是基于事件的處理,并不是阻塞的。這就意味著,fork 的目標 pipline,與 fork 所在的 pipeline 是并行執(zhí)行的。 在示例中,就是 Pipeline ‘in’ 與 主 Pipelineconnect 是并行執(zhí)行的。

最終在 6 處,繼續(xù)使用原 SessionFilter 鏈。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 有幸參加了 Flomesh[https://flomesh.cn/] 組織的workshop,了解了他們的 Pip...
    云原生指北閱讀 464評論 0 0
  • 官網地址:https://www.elastic.co/cn/ 官網權威指南:https://www.elasti...
    Anwar_ec28閱讀 6,911評論 0 11
  • 表情是什么,我認為表情就是表現出來的情緒。表情可以傳達很多信息。高興了當然就笑了,難過就哭了。兩者是相互影響密不可...
    Persistenc_6aea閱讀 129,938評論 2 7
  • 16宿命:用概率思維提高你的勝算 以前的我是風險厭惡者,不喜歡去冒險,但是人生放棄了冒險,也就放棄了無數的可能。 ...
    yichen大刀閱讀 8,212評論 0 4

友情鏈接更多精彩內容