Storm-windowing 的一些嘗試

Storm-windowing 的一些嘗試

Storm 在 1.x.x 版本后引入了 windowing 機(jī)制,使得開發(fā)者可以很方便的做一些統(tǒng)計計算。

最近由于工作內(nèi)容變更,著手整合、開發(fā)公司的安全風(fēng)控平臺,又重拾 storm,使用storm清洗分發(fā)業(yè)務(wù)數(shù)據(jù),并做相關(guān)計算。在接入 AntiCrawler(反爬蟲)的業(yè)務(wù)需求時調(diào)研并使用了 storm 的 windowing 特性。

Windowing介紹

Sliding & Tumbling

Storm官方文檔抽象出兩種類型的window:

(1)Sliding Window——一個tuple可以屬于多個window,如下:
tumbling-window

(2)Tumbling Window——一個tuple只屬于一個window,如下:
sliding-window

而定義一個 storm-window 的主要根據(jù)以下兩點(diǎn):window-length
和 slide-interval。其中,window-length 是指這個窗口的長度,slide-interval 是指這個窗口每次滑動的距離他們可以通過兩種維度計算:

(1)Count-即固定數(shù)量的tuple組成一個window
(2)Duration-即固定時間內(nèi)所有的tuple組成一個window。

他們可以靈活的組合,以滿足不同的需求,其具體接口可以參考storm-api(java-BaseWindowedBolt)。

Timestamp

當(dāng)使用 Duration 作為 window 的計算指標(biāo)(length or interval)時,需要注意這樣一個問題:每個 tuple 的 timestamp。Storm 根據(jù) tuple 的 timestamp 來計算這個 tuple 是否屬于這個 window。

默認(rèn)的 storm 把 window-bolt 處理這個 tuple 的當(dāng)前時間作為這個tuple 的時間戳。另外可以通過代碼指定tuple的某個字段作為這個tuple的timestamp(java的api是withTimestampField(String fieldName)。

個人不推薦使用默認(rèn)值,最好使用 數(shù)據(jù)中自帶的時間戳。因?yàn)樵跀?shù)據(jù)堆積的情況下,如果使用默認(rèn)值,大量的歷史堆積數(shù)據(jù)(對于實(shí)時計算來說在某種意義上已經(jīng)是臟數(shù)據(jù))會被當(dāng)成實(shí)時值用以計算,導(dǎo)致數(shù)據(jù)不準(zhǔn)確。

Out of order

如果使用tuple自帶的字段作為 timestamp,在分布式場景中,由于各種因素,輸出的tuples?的timestamp是亂序的,參考如下場景:

假設(shè)一個 Sliding window,其 window-length 是 10s,slide-interval 是5s。?依次收到t1(10:00:10),t2(10:00:14),t3(10:00:12),t4(10:00:16) 4個 tuple。

這種情況下storm會怎么做呢?默認(rèn)的,storm在收到t3時發(fā)現(xiàn)其timestamp小于t2,則將其拋棄。并輸出一條INFO級別的日志:

INFO : Received a late tuple {time=1488299337876} with ts 1488299337876. This will not processed.

這種情況顯然不是我們希望的,所以 storm 提供了一個接口withLag (Duration duration),通過這個接口,開發(fā)者可以通過這接口設(shè)置 window 可以接受的最大延時。此時,如果設(shè)置最大延時5s,則在上述情況下,t3則不會被拋棄。

所以,根據(jù)業(yè)務(wù)場景合理的設(shè)置withLag是有必要的。

Watermarks

Watermark 是 storm 內(nèi)部跟蹤處理 window 的一個特性,其類似Flink、MillWheel。在處理帶有timestamp的tuple時,storm內(nèi)部包含一個由tuple的timestamp計算而來的watermarks。

它的計算方法是:storm 接受到得最新的 tuple 的 timestamp——Tmax 減去通過 withLat 設(shè)置的最大延時 L,Max(T1…Tn)- L。

Watermark 是用來評估是否結(jié)算窗口(window calculation),每當(dāng) window bolt 收到一個 Watermark,都會評估當(dāng)前的 tuple 是否有需要結(jié)算的窗口,可以通withWatermarkInterval(Duration interval) 接口設(shè)置 watermark 的發(fā)送周期,其默認(rèn)值是1s。以下官方給出的watermark機(jī)制的demo:

假設(shè)一個Slide window,其Window length = 20s, sliding interval = 10s, watermark interval = 1s, lag = 5s。

當(dāng)前時間9:00:00,e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) 于 9:00:00 – 9:00:01到達(dá)。

那么 9:00:01 收到的 watermark 則為 6:00:36-lag(5) = 6:00:31,6:00:31 向下取整 6:00:30 以前的所有未結(jié)算windows都會結(jié)算,所以此時有三個window將會計算:

5:59:50 – 06:00:10 with tuples e1, e2, e3
6:00:00 – 06:00:20 with tuples e1, e2, e3, e4
6:00:10 – 06:00:30 with tuples e4, e5

在 9:00:01 – 9:00:02,又有4個tuple,e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)到達(dá),則在 9:00:02(watermark interval 是1s)收到的 watermark 是 8:00:39-lag(5) = 8:00:34,向下取整 8:00:30以前的所有未結(jié)算window將會計算:

6:00:20 – 06:00:40 with tuples e5, e6 (from earlier batch)
6:00:30 – 06:00:50 with tuple e6 (from earlier batch)
8:00:10 – 08:00:30 with tuples e7, e8, e9

Trident Windowing

上文介紹的 windowing 主要是以 storm-core 為基礎(chǔ)的,同樣的,trident 也提供了類似的機(jī)制,同樣包含 Sliding 和 Tumbling 兩種類型,其使用方法和 storm-core 類似,具體 demo 可以參考官方提供的 examples (參見文末鏈接)。

Ps: 關(guān)于withTimestamp,withLag 和 watermark的測試驗(yàn)證測試代碼可以參考:storm-window-test 測試代碼
相關(guān)資料:storm-windowing 官方文檔

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容