由于要給團隊做一下關于 Flomesh 的分享,準備下材料。
“分享是最好的學習方法。”
上一回初探可編程網關 Pipy,領略了 Pipy 的“風騷”。從 Pipy 的 GUI 交互深入了解了 Pipy 的配置加載流程。
今天看一下 Pipy 如何實現 Metrics 的功能,順便看下數據如何在多個 Pipeline 中進行流轉。
前置
首先,需要對 Pipy 有一定的了解,如果不了解看一下上一篇文章。
其次構建好 Pipy 環(huán)境,關于構建還是去看上一篇文章。
Metrics 功能實現
至于 Pipy 實現 Metrics 的方式,源碼中就有,位于 test/006-metrics/pipy.js。

- 代理監(jiān)聽
6080端口,后端服務在8080端口,Metrics 在9090端口 - 共有 5 個 Pipeline:3 個 listen 類型,2 個 Pipeline 類型
- 7 種過濾器:
fork、connect、decodeHttpRequest、onMessageStart、decodeHttpResponse、encodeHttpRespnse、replaceMessage
貼一下源碼:
pipy({
_metrics: {
count: 0,
},
_statuses: {},
_latencies: [
1,2,5,7,10,15,20,25,30,40,50,60,70,80,90,100,
200,300,400,500,1000,2000,5000,10000,30000,60000,
Number.POSITIVE_INFINITY
],
_buckets: [],
_timestamp: 0,
})
.listen(6080)
.fork('in')
.connect('127.0.0.1:8080')
.fork('out')
// Extract request info
.pipeline('in')
.decodeHttpRequest()
.onMessageStart(
() => (
_timestamp = Date.now(),
_metrics.count++
)
)
// Extract response info
.pipeline('out')
.decodeHttpResponse()
.onMessageStart(
e => (
((status, latency, i) => (
status = e.head.status,
latency = Date.now() - _timestamp,
i = _latencies.findIndex(t => latency <= t),
_buckets[i]++,
_statuses[status] = (_statuses[status]|0) + 1
))()
)
)
// Expose as Prometheus metrics
.listen(9090)
.decodeHttpRequest()
.replaceMessage(
() => (
(sum => new Message(
[
`count ${_metrics.count}`,
...Object.entries(_statuses).map(
([k, v]) => `status{code="${k}"} ${v}`
),
..._buckets.map((n, i) => `bucket{le="${_latencies[i]}"} ${sum += n}`)
]
.join('\n')
))(0)
)
)
.encodeHttpResponse()
// Mock service on port 8080
.listen(8080)
.decodeHttpRequest()
.replaceMessage(
new Message('Hello!\n')
)
.encodeHttpResponse()
測試
使用 ab 做請求模擬 ab -n 2000 -c 10 http://localhost:6080/,然后檢查下記錄的指標。
$ http :9090 --body
count 2000
status{code="200"} 2000
bucket{le="1"} 1762
bucket{le="2"} 1989
bucket{le="5"} 1994
bucket{le="7"} 1999
bucket{le="10"} 2000
分析
TL;DR:本次示例的核心是 fork,從字面意思就很容易理解:新開一個處理分支(Pipeline),與主線并行執(zhí)行。
在 src/inbound.cpp:104 109 處,Pipy 接收一個新的連接。

創(chuàng)建 Context 和 Session,并在 L178 處注冊事件的處理器,然后在 L187 處開始接收數據。

在 #receive 方法中,定義了數據接收處理器:將讀到的數據寫入 buffer 中。這個 buffer 存儲的是 Event類型數據。(所以說 Pipy 是基于數據流事件,將一些封裝成了事件)
接著調用 Session#input。

實際上調用的是 ReusableSession#input,調用 m_filters 的 #process 方法。m_filters 實際上是 Filter 類型。

為什么只有一個 Filter?重點來了,看下 ReusableSession 的構造過程就能明白了(這里用了個反向迭代器)。output 是當前 Filter 處理完要執(zhí)行的,實現類似鏈式的執(zhí)行。

再回頭看上面的示例,可以想象 fork 就是 Session 的 m_filters。
src/filters/fork.cpp:85,在 fork 過濾器中,在 1 處從 module 中獲取到目標 Pipeline,并在 3 和 4 處 創(chuàng)建了新的 Session 并保存原 Session 的數據。
然后在 5 處將原 Event 輸入到新的 Session 中,觸發(fā)目標 Pipeline 的 Filter 鏈。值得注意的是,這里是基于事件的處理,并不是阻塞的。這就意味著,fork 的目標 pipline,與 fork 所在的 pipeline 是并行執(zhí)行的。 在示例中,就是 Pipeline ‘in’ 與 主 Pipeline 的 connect 是并行執(zhí)行的。
最終在 6 處,繼續(xù)使用原 Session 的 Filter 鏈。
