Storm-windowing 的一些嘗試
Storm 在 1.x.x 版本后引入了 windowing 機(jī)制,使得開發(fā)者可以很方便的做一些統(tǒng)計計算。
最近由于工作內(nèi)容變更,著手整合、開發(fā)公司的安全風(fēng)控平臺,又重拾 storm,使用storm清洗分發(fā)業(yè)務(wù)數(shù)據(jù),并做相關(guān)計算。在接入 AntiCrawler(反爬蟲)的業(yè)務(wù)需求時調(diào)研并使用了 storm 的 windowing 特性。
Windowing介紹
Sliding & Tumbling
Storm官方文檔抽象出兩種類型的window:
(1)Sliding Window——一個tuple可以屬于多個window,如下:

而定義一個 storm-window 的主要根據(jù)以下兩點(diǎn):window-length
和 slide-interval。其中,window-length 是指這個窗口的長度,slide-interval 是指這個窗口每次滑動的距離他們可以通過兩種維度計算:
(1)Count-即固定數(shù)量的tuple組成一個window
(2)Duration-即固定時間內(nèi)所有的tuple組成一個window。
他們可以靈活的組合,以滿足不同的需求,其具體接口可以參考storm-api(java-BaseWindowedBolt)。
Timestamp
當(dāng)使用 Duration 作為 window 的計算指標(biāo)(length or interval)時,需要注意這樣一個問題:每個 tuple 的 timestamp。Storm 根據(jù) tuple 的 timestamp 來計算這個 tuple 是否屬于這個 window。
默認(rèn)的 storm 把 window-bolt 處理這個 tuple 的當(dāng)前時間作為這個tuple 的時間戳。另外可以通過代碼指定tuple的某個字段作為這個tuple的timestamp(java的api是withTimestampField(String fieldName))。
個人不推薦使用默認(rèn)值,最好使用 數(shù)據(jù)中自帶的時間戳。因?yàn)樵跀?shù)據(jù)堆積的情況下,如果使用默認(rèn)值,大量的歷史堆積數(shù)據(jù)(對于實(shí)時計算來說在某種意義上已經(jīng)是臟數(shù)據(jù))會被當(dāng)成實(shí)時值用以計算,導(dǎo)致數(shù)據(jù)不準(zhǔn)確。
Out of order
如果使用tuple自帶的字段作為 timestamp,在分布式場景中,由于各種因素,輸出的tuples?的timestamp是亂序的,參考如下場景:
假設(shè)一個 Sliding window,其 window-length 是 10s,slide-interval 是5s。?依次收到t1(10:00:10),t2(10:00:14),t3(10:00:12),t4(10:00:16) 4個 tuple。
這種情況下storm會怎么做呢?默認(rèn)的,storm在收到t3時發(fā)現(xiàn)其timestamp小于t2,則將其拋棄。并輸出一條INFO級別的日志:
INFO : Received a late tuple {time=1488299337876} with ts 1488299337876. This will not processed.
這種情況顯然不是我們希望的,所以 storm 提供了一個接口withLag (Duration duration),通過這個接口,開發(fā)者可以通過這接口設(shè)置 window 可以接受的最大延時。此時,如果設(shè)置最大延時5s,則在上述情況下,t3則不會被拋棄。
所以,根據(jù)業(yè)務(wù)場景合理的設(shè)置withLag是有必要的。
Watermarks
Watermark 是 storm 內(nèi)部跟蹤處理 window 的一個特性,其類似Flink、MillWheel。在處理帶有timestamp的tuple時,storm內(nèi)部包含一個由tuple的timestamp計算而來的watermarks。
它的計算方法是:storm 接受到得最新的 tuple 的 timestamp——Tmax 減去通過 withLat 設(shè)置的最大延時 L,Max(T1…Tn)- L。
Watermark 是用來評估是否結(jié)算窗口(window calculation),每當(dāng) window bolt 收到一個 Watermark,都會評估當(dāng)前的 tuple 是否有需要結(jié)算的窗口,可以通withWatermarkInterval(Duration interval) 接口設(shè)置 watermark 的發(fā)送周期,其默認(rèn)值是1s。以下官方給出的watermark機(jī)制的demo:
假設(shè)一個Slide window,其Window length = 20s, sliding interval = 10s, watermark interval = 1s, lag = 5s。
當(dāng)前時間9:00:00,e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) 于 9:00:00 – 9:00:01到達(dá)。
那么 9:00:01 收到的 watermark 則為 6:00:36-lag(5) = 6:00:31,6:00:31 向下取整 6:00:30 以前的所有未結(jié)算windows都會結(jié)算,所以此時有三個window將會計算:
5:59:50 – 06:00:10 with tuples e1, e2, e3
6:00:00 – 06:00:20 with tuples e1, e2, e3, e4
6:00:10 – 06:00:30 with tuples e4, e5
在 9:00:01 – 9:00:02,又有4個tuple,e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)到達(dá),則在 9:00:02(watermark interval 是1s)收到的 watermark 是 8:00:39-lag(5) = 8:00:34,向下取整 8:00:30以前的所有未結(jié)算window將會計算:
6:00:20 – 06:00:40 with tuples e5, e6 (from earlier batch)
6:00:30 – 06:00:50 with tuple e6 (from earlier batch)
8:00:10 – 08:00:30 with tuples e7, e8, e9
Trident Windowing
上文介紹的 windowing 主要是以 storm-core 為基礎(chǔ)的,同樣的,trident 也提供了類似的機(jī)制,同樣包含 Sliding 和 Tumbling 兩種類型,其使用方法和 storm-core 類似,具體 demo 可以參考官方提供的 examples (參見文末鏈接)。
Ps: 關(guān)于withTimestamp,withLag 和 watermark的測試驗(yàn)證測試代碼可以參考:storm-window-test 測試代碼
相關(guān)資料:storm-windowing 官方文檔