Flink Time-windowed Joins過(guò)期數(shù)據(jù)清理機(jī)制分析

在flink雙流Time-windowed Joins的主要實(shí)現(xiàn)是在TimeBoundedStreamJoin中,這個(gè)類里面的變量非常的多,所以首先要清楚,這些重要變量或者概念的計(jì)算過(guò)程。簡(jiǎn)單的說(shuō)整個(gè)join過(guò)程就是把左流的數(shù)據(jù)和右流的數(shù)據(jù)都通過(guò)state保存起來(lái),左流有新的數(shù)據(jù)到,就會(huì)根據(jù)key去遍歷右流state中的數(shù)據(jù),符合關(guān)聯(lián)條件就輸出,關(guān)聯(lián)不上的就保存在左流的state中等待右流數(shù)據(jù)的遍歷,反之亦然。另外會(huì)對(duì)每個(gè)流計(jì)算過(guò)期時(shí)間,以及每個(gè)數(shù)據(jù)的清理時(shí)間。本文主要根據(jù)代碼的實(shí)現(xiàn)過(guò)程對(duì)清理機(jī)制做一個(gè)分步的演算。

基本公式

建表語(yǔ)句

CREATE TABLE LeftTable (
  l_id STRING, 
  l_imsi STRING, 
  l_time TIMESTAMP(3), 
  WATERMARK FOR l_time AS l_time - INTERVAL '5' SECOND 
) WITH ( 
  'connector.type' = 'kafka',   
  'connector.version' = 'universal',  
  'connector.topic' = 'foo',  
  'connector.properties.zookeeper.connect' = 'hostA:2181',
  'connector.properties.bootstrap.servers' = 'hostA:6667',  
  'connector.properties.group.id' = 'tg1',   
  'connector.startup-mode' = 'latest-offset', 
  'format.type' = 'csv',   
  'format.field-delimiter' = ','
  )
CREATE TABLE RightTable (
  r_id STRING, 
  r_location STRING, 
  r_time TIMESTAMP(3), 
  WATERMARK FOR r_time AS r_time - INTERVAL '2' SECOND 
) WITH (  
  'connector.type' = 'kafka',   
  'connector.version' = 'universal',  
  'connector.topic' = 'bar',  
  'connector.properties.zookeeper.connect' = 'hostA:2181',
  'connector.properties.bootstrap.servers' = 'hostA:6667',
  'connector.properties.group.id' = 'tg1',   
  'connector.startup-mode' = 'latest-offset', 
  'format.type' = 'csv',   
  'format.field-delimiter' = ',' 
)

執(zhí)行SQL

SELECT l_id, l_imsi, r_location 
FROM LeftTable 
LEFT JOIN RightTable 
on l_id = r_id 
and r_time >= l_time - INTERVAL '4' SECOND AND r_time <= l_time + INTERVAL '6' SECOND

upperBound = 6lowerNound = -4

那么

leftRelativeSize = -leftLowerBound = -(-upperBound) = upperBound = 6

rightRelativeSize = leftUpperBound = -lowerBound = 4

leftExpirationTime = wm - upperBound - 0.001 
                   = wm - 6 - 0.001 
                   = wm - 6.001

rightExpirationTime = wm + lowerBound - 0.001 
                    = wm - 4 - 0.001 
                    = wm - 4.001
  
leftRowCleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 0.001 
                   = rowTime + 6 + (6+4)/2 + 0.001 
                   = rowTime + 11.001
                   
rightRowCleanUpTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 0.001 
                    = rowTime + 4 + (6+4)/2 + 0.001 
                    = rowTime + 9.001

rightOperatorTime = leftOperatorTime = wm = min(leftWatermark, rightWatermark)
  
allowedLateness = 0 //忽略不計(jì)

根據(jù)上面的公式進(jìn)行模擬計(jì)算

順序 來(lái)源 數(shù)據(jù) wm ExpirationTime RowCleanUpTime 結(jié)果
1 Left 1,111,2020-01-01 10:10:16 0 left=0, right=-4001 10:10:27.001
2 Right 2,B,2020-01-01 10:10:20 10:10:11 left=-6001, right=-4001 10:10:29.001
3 Left 2,222,2020-01-01 10:10:22 10:10:17 left=-6001, right=10:10:06.999 10:10:33.001 join輸出2,222,B
4 Left 4,4444,2020-01-01 10:10:35 10:10:18 left=-6001, right=10:10:12.999 10:10:46.001
5 Right 4,D,2020-01-01 10:10:29 10:10:27 left=10:10:11.999, right=10:10:12.999 10:10:38.001
6-1 Right 5,E,2020-01-01 10:10:30 10:10:28 left=10:10:20.999, right=10:10:12.999 10:10:39.001
wm超過(guò)第一條數(shù)據(jù)的CleanUpTime,觸發(fā)定時(shí)器 10:10:28 left=10:10:21.999, right=10:10:12.999 刪除第一條數(shù)據(jù),因?yàn)槭莑eft join所以輸出1,111,
6-2 Right 1,A,2020-01-01 10:10:17 10:10:27 left=10:10:20.999, right=10:10:12.999 可以和第一條數(shù)據(jù)join上,所以輸出1,111,A但是第一條數(shù)據(jù)的時(shí)間戳已經(jīng)小于leftExpirationTime說(shuō)明已經(jīng)過(guò)期,同時(shí)在緩存中刪除第一條數(shù)據(jù),但是這時(shí)wm并沒(méi)有超過(guò)第一條數(shù)據(jù)的CleanUpTime,不會(huì)觸發(fā)清理的定時(shí)器
6-3 Right 1,A,2020-01-01 10:10:30 10:10:28 left=10:10:20.999, right=10:10:12.999 不能和第一條數(shù)據(jù)join上,所以輸出1,111,但是第一條數(shù)據(jù)的時(shí)間戳已經(jīng)小于leftExpirationTime說(shuō)明已經(jīng)過(guò)期,同時(shí)在緩存中刪除第一條數(shù)據(jù)
wm超過(guò)第一條數(shù)據(jù)的CleanUpTime,觸發(fā)定時(shí)器 10:10:28 left=10:10:21.999, right=10:10:12.999 第一條數(shù)據(jù)已經(jīng)刪除,定時(shí)器不需要做其他操作

總結(jié)

每次計(jì)算ExpirationTime的時(shí)候用的是上一次的wm;
6-1,6-2,6-3為三種獨(dú)立可能觸發(fā)刪除過(guò)期數(shù)據(jù)的場(chǎng)景;
通過(guò)上面的測(cè)試可以發(fā)現(xiàn),數(shù)據(jù)即便過(guò)期了,但是沒(méi)有到清理時(shí)間,如果這時(shí)候有符合關(guān)聯(lián)條件的數(shù)據(jù)還是可以關(guān)聯(lián)上的,例如6-2場(chǎng)景。

以上數(shù)據(jù)基于flink 1.10.0版本blink planner進(jìn)行測(cè)試。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容