Flink 的 Table API 和SQL支持是用于批處理和流處理的統(tǒng)一API。這意味著 Table API 和SQL查詢具有相同的語義,無論它們的輸入是有界批量輸入還是無界流輸入。因為關(guān)系代數(shù)(relational algebra)和SQL最初是為批處理而設(shè)計的,所以對于無界流輸入的關(guān)系查詢不像有界批輸入上的關(guān)系查詢那樣容易理解。下面將解釋 Flink 關(guān)于流數(shù)據(jù)的關(guān)系A(chǔ)PI的概念、實際限制和特定于流的配置參數(shù)。
流數(shù)據(jù)上的關(guān)系查詢
SQL和 Relational algebra 并沒有考慮到流數(shù)據(jù)。因此,在關(guān)系代數(shù)(和SQL)和流處理之間有一些概念上的差距。
| 關(guān)系代數(shù)/ SQL | 流處理 |
|---|---|
| 關(guān)系(或表)是有界的(多)元組的集合 | 流是無限的元組序列 |
| 對批處理數(shù)據(jù)執(zhí)行的查詢(例如,關(guān)系數(shù)據(jù)庫中的表)可以訪問完整的輸入數(shù)據(jù) | 流式查詢在啟動時無法訪問所有數(shù)據(jù),必須等待流式傳輸數(shù)據(jù) |
| 批處理查詢在生成固定大小的結(jié)果后終止 | 流式查詢會根據(jù)收到的記錄不斷更新其結(jié)果,并且永遠(yuǎn)不會完成 |
盡管存在這些差異,但使用關(guān)系查詢和SQL處理流數(shù)據(jù)并非不可能。高級關(guān)系數(shù)據(jù)庫系統(tǒng)提供稱為物化視圖(Materialized View)的特性。物化視圖定義為SQL查詢,就像常規(guī)虛擬視圖一樣。與虛擬視圖相比,物化視圖緩存查詢的結(jié)果,使得在訪問視圖時不需要評估查詢。緩存的一個常見挑戰(zhàn)是防止緩存提供過時的結(jié)果。當(dāng)修改其查詢的基表時,物化視圖將過時。Eager View Maintenance 是一種更新物化視圖和在基表更新后立即更新物化視圖的技術(shù)。
如果我們考慮以下因素,那么在流上的 Eager view maintenance 和SQL查詢之間的聯(lián)系就會變得明顯:
- 數(shù)據(jù)庫表是應(yīng)用的
INSERT,UPDATE和DELETE命令的流數(shù)據(jù)的結(jié)果流,通常被稱為 changelog stream。 - 物化視圖定義為SQL查詢。為了更新視圖,查詢不斷地處理視圖關(guān)聯(lián)的 changelog stream。
- 物化視圖是流式SQL查詢的結(jié)果。
動態(tài)表和連續(xù)查詢
動態(tài)表(Dynamic table)是 Flink Table API 和SQL支持流數(shù)據(jù)的核心概念。與表示批處理數(shù)據(jù)的靜態(tài)表(static table)相比,動態(tài)表會隨時間而變化,并且可以像靜態(tài)批處理表一樣查詢。查詢動態(tài)表會生成連續(xù)查詢(Continuous Query)。連續(xù)查詢永遠(yuǎn)不會終止并生成動態(tài)表作為結(jié)果。查詢不斷更新其結(jié)果表以反映其輸入表的更改。實質(zhì)上,對動態(tài)表的連續(xù)查詢與物化視圖的查詢非常相似。
連續(xù)查詢的結(jié)果在語義上總是等價于在輸入表的快照上以批處理模式執(zhí)行的相同查詢的結(jié)果。下圖顯示了流、動態(tài)表和連續(xù)查詢的關(guān)系:
- 流轉(zhuǎn)換為動態(tài)表
- 在動態(tài)表上連續(xù)查詢,生成新的動態(tài)表
- 生成的動態(tài)表將轉(zhuǎn)換回流

動態(tài)表首先是一個邏輯概念。在查詢執(zhí)行期間,不必(完全)實現(xiàn)動態(tài)表。
在下文中,我們將用具有以下模式的點擊事件(click events)的流解釋動態(tài)表和連續(xù)查詢的概念:
[
user: VARCHAR, // the name of the user
cTime: TIMESTAMP, // the time when the URL was accessed
url: VARCHAR // the URL that was accessed by the user
]
在流上定義表
為了使用關(guān)系查詢處理流,必須將其轉(zhuǎn)換為表。從概念上講,流的每個記錄都被解釋為對結(jié)果表的 INSERT 修改。下圖顯示了點擊事件流(左側(cè))如何轉(zhuǎn)換為表(右側(cè))。隨著更多的點擊事件的插入,結(jié)果表不斷增長。

連續(xù)查詢
在動態(tài)表上進(jìn)行連續(xù)查詢,并生成新的動態(tài)表。與批查詢相反,連續(xù)查詢不會停止更新其結(jié)果表。在任何時間點,連續(xù)查詢的結(jié)果在語義上等同于在輸入表的快照上以批處理模式執(zhí)行的相同查詢的結(jié)果。
在下面展示了在點擊事件流中定義的 clicks 表上的兩個查詢的例子。
第一個查詢是一個簡單的 GROUP-BY COUNT 聚合查詢。在 clicks 表上按 user 字段進(jìn)行分組,并計算訪問的URL數(shù)量。下圖顯示了隨著 clicks 表的行數(shù)增加更新查詢:
- 當(dāng)查詢啟動時,clicks 表(左側(cè))為空。第一行記錄 insert clicks 表時,開始計算結(jié)果表。
- clicks 插入第一行 [Mary, ./home],結(jié)果表(右側(cè),頂部)為 [Mary, 1]。
- clicks 插入第二行 [Bob, ./cart],結(jié)果表插入新行 [Bob, 1]。
- clicks 插入第三行 [Mary, ./prod?id=1],更新結(jié)果表,[Mary, 1] 更新為 [Mary, 2]。
- clicks 插入第四行 [Liz, 1],結(jié)果表插入新行 [Liz, 1]。。

第二個查詢類似于第一個查詢,但 clicks 表除了 user 字段之外新增了 cTime 字段,并且按小時生成滾動窗口,然后計算URL數(shù)量。同樣,下圖顯示了不同時間點的輸入和輸出,以顯示動態(tài)表的變化:
- 查詢每小時連續(xù)計算結(jié)果并更新結(jié)果表。
- 第一個窗口(12:00:00 ~ 12:59:59),clicks 表包含四行記錄輸入,查詢計算得到兩個結(jié)果行并加入到結(jié)果表。
- 對于下一個窗口(13:00:00 ~ 13:59:59),clicks 表包含三行記錄輸入,這導(dǎo)致新增兩行結(jié)果被追加到結(jié)果表中。
- 隨著時間的推移,更多的行被追加到 clicks 表中,結(jié)果表將被更新。

盡管兩個查詢例子看起來非常相似(都計算了分組計數(shù)聚合),但在一個重要方面有所不同:
- 第一個查詢更新之前發(fā)出的結(jié)果,即結(jié)果表的 changelog 包含
INSERT和UPDATE。 - 第二個查詢僅附加到結(jié)果表,即結(jié)果表的 changelog 僅包含
INSERT。
查詢限制
許多(但不是全部)語義有效的查詢可以作為流上的連續(xù)查詢進(jìn)行計算。有些查詢的計算成本太高,要么是因為它們需要維護(hù)的狀態(tài)太大,要么是因為計算更新太昂貴:
- 狀態(tài)大?。⊿tate Size):連續(xù)查詢在無界流上進(jìn)行計算,連續(xù)查詢處理的數(shù)據(jù)總量可能非常大。必須更新先前發(fā)出的結(jié)果,因此需要維護(hù)所有已經(jīng)發(fā)出的行。例如,第一個示例需要存儲每個 user 的URL計數(shù),以便在輸入表收到新行時能夠更新計數(shù)。
SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
- 計算更新(Computing Updates):某些查詢,即使只添加或更新了單個輸入記錄,也需要重新計算和更新大部分發(fā)出的結(jié)果行。顯然,這樣的查詢不適合作為連續(xù)查詢執(zhí)行。例如以下查詢,該查詢基于最后一次點擊的時間排序每個用戶。一旦 clicks 表收到新行或某一行 lastLogin 更新,并且必須計算新的排名。
SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);
表轉(zhuǎn)換到流
動態(tài)表可以像常規(guī)數(shù)據(jù)庫表一樣,通過 INSERT、UPDATE 和 DELETE 更改來不斷修改。
將動態(tài)表轉(zhuǎn)換為流或?qū)⑵鋵懭胪獠肯到y(tǒng)時,需要對這些更改進(jìn)行編碼。Flink Table API 和SQL支持三種動態(tài)表更改的方法:
Append-only stream:只能由插入修改的動態(tài)表,可以通過發(fā)出插入的行轉(zhuǎn)換為流。
Retract stream:Retract stream 是具有兩種類型的消息的流,添加消息(add)和撤消消息(retract)。通過將插入編碼為添加消息(add message),將刪除編碼為撤銷消息(retract message),將更新編碼為撤銷消息(更新先前行)和添加消息(更新新行),將動態(tài)表轉(zhuǎn)換為 Retract stream。

- Upsert stream: Upsert stream 是具有兩種類型的消息的流,更新插入消息(upsert)和刪除消息(delete)。轉(zhuǎn)換為 Upsert stream 的動態(tài)表需要有唯一鍵(可能是復(fù)合的),插入和更新編碼為 upsert 消息,刪除編碼為 delete 消息。算子需要知道唯一鍵屬性才能正確消費消息,與 Retract stream 的區(qū)別在于:更新修改使用單個消息進(jìn)行編碼,更加有效。下圖顯示了動態(tài)表到 Upsert stream 的轉(zhuǎn)換。

時間屬性
Flink 可以根據(jù)不同的時間域處理流數(shù)據(jù):
- Processing Time:執(zhí)行各個算子操作的系統(tǒng)時間
- Event Time:事件發(fā)生時間,附加在每條記錄上的時間戳
- Ingestion Time:事件進(jìn)入 Flink 的時間,與事件時間類似
更多關(guān)于Flink時間處理的信息,請參考 Event Time 和 Watermarks
val env = StreamExecutionEnvironment.getExecutionEnvironment
// default
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Table API 和SQL查詢中基于時間的算子操作(如 Window)需要設(shè)置時間域及時間提取信息。因此,表可以提供邏輯時間屬性,用于指示時間和訪問表程序中相應(yīng)的時間戳。
時間屬性可以是每個表模式中的一部分,可以從 DataStream 中創(chuàng)建表時創(chuàng)建,也可以在使用 TableSource 時預(yù)定義。一旦開始定義了時間屬性,它就可以作為字段引用,并可以用于基于時間的 算子操作。
只要時間屬性未被修改,并且只是從查詢的一部分轉(zhuǎn)發(fā)到另一部分,那么它仍是一個有效的時間屬性。時間屬性跟常規(guī)時間戳一樣,可以用于計算,一旦被用于計算,它會具體化并成為一個常規(guī)時間戳。常規(guī)時間戳跟 Flink 的時間和水印系統(tǒng)沒有關(guān)系,所以不會被用來做基于時間的算子操作。
Processing Time
處理時間(Processing Time)允許程序根據(jù)本地時間產(chǎn)生結(jié)果,這是最簡單的時間概念,既不要求時間戳提取也不會產(chǎn)生水印。有兩種方法可以定義處理時間屬性。
DataStream 到 Table 的轉(zhuǎn)換中定義
處理時間屬性是在 Schema 定義時使用 .proctime 屬性來定義的。時間屬性只能通過在 Schema 的基礎(chǔ)上新增一個邏輯字段來擴(kuò)展,因此,它只能在模式定義的末尾來定義。
val stream: DataStream[(String, String)] = ...
// 聲明一個邏輯字段,作為 processing time 屬性
val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
TableSource 定義
處理時間屬性可以通過一個實現(xiàn)了 DefinedProctimeAttribute 接口的 TableSource 定義,邏輯時間屬性會被添加到由 TableSource 的返回類型定義的 Schema 中。
// 定義 Table source,并指定 processing time 屬性
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
override def getReturnType = {
val names = Array[String]("Username" , "Data")
val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
Types.ROW(names, types)
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// create stream
val stream = ...
stream
}
override def getProctimeAttribute = {
// field with this name will be appended as a third field
"UserActionTime"
}
}
// register table source
tEnv.registerTableSource("UserActions", new UserActionSource)
val windowedTable = tEnv
.scan("UserActions")
.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
Event Time
事件時間允許程序根據(jù)包含在每條記錄中的時間生成結(jié)果。即使對于無序或者延遲事件也會產(chǎn)生一致性結(jié)果。當(dāng)從持久存儲中讀取記錄時,它還確保表程序的可重放結(jié)果。
此外,事件時間允許在批環(huán)境和流環(huán)境中使用統(tǒng)一的語法,流式環(huán)境中的時間屬性可以是批處理環(huán)境中的記錄的常規(guī)字段。
為了處理流事件無序,并區(qū)分準(zhǔn)時和延遲事件,F(xiàn)link 需要抽取事件中的時間戳,并且在生成水印來描述進(jìn)展。有兩種方法可以定義事件時間屬性。
DataStream 到 Table 的轉(zhuǎn)換中定義
事件時間可以在 shcema 定義中通過 .rowtime 屬性來定義。Timestamp 和 watermark 必須在 DataStream 的轉(zhuǎn)換中就已經(jīng)指定好。有兩種方式來定義時間屬性:
- 添加一個新的字段到 Schema
- 替換一個現(xiàn)有的字段
無論哪種方式,事件時間字段都將保存數(shù)據(jù)流事件時間戳的值。
// Option 1:
// 提取時間戳并指定水印
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
// 定義一個額外的邏輯字段作為 event time
val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
// Option 2:
// 提取時間戳并指定水印
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
// 第一個字段被用作時間戳提取,不需要定義額外的字段
val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
// Usage:
val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
TableSource 定義
處理時間屬性可以通過一個實現(xiàn)了 DefinedRowtimeAttribute 接口的 TableSource 定義。getRowtimeAttribute() 方法返回一個現(xiàn)有字段的名稱,該字段包含表的事件時間屬性,并且是類型 LONG 或 TIMESTAMP。
此外,getDataStream() 方法返回的 DataStream 必須分配與定義的時間屬性對齊的水印。
// 定義 Table source,并指定 rowtime 屬性
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
override def getReturnType = {
val names = Array[String]("Username" , "Data", "UserActionTime")
val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
Types.ROW(names, types)
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// create stream
// ...
// assign watermarks based on the "UserActionTime" attribute
val stream = inputStream.assignTimestampsAndWatermarks(...)
stream
}
override def getRowtimeAttribute = {
// Mark the "UserActionTime" attribute as event-time attribute.
"UserActionTime"
}
}
// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource)
val windowedTable = tEnv
.scan("UserActions")
.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
查詢配置
Flink的Table API和SQL接口使用QueryConfig來控制計算、發(fā)射的結(jié)果以及更新發(fā)射結(jié)果。
Table API 和SQL查詢具有相同的語義,無論它們的輸入是有界批量輸入還是無界流輸入。在許多情況下,對流輸入的連續(xù)查詢能夠與離線計算結(jié)果有相同的準(zhǔn)確結(jié)果。然而,這在一般情況下是不可能的,因為連續(xù)查詢必須限制它們維護(hù)的狀態(tài)的大小,以避免耗盡存儲空間,并且能夠長時間處理無界流數(shù)據(jù)。所以,連續(xù)查詢可能只能提供近似結(jié)果,具體取決于輸入數(shù)據(jù)和查詢本身。
Flink Table API 和SQL接口提供參數(shù)來調(diào)整連續(xù)查詢的準(zhǔn)確性和資源消耗。參數(shù)通過 QueryConfig 對象指定。QueryConfig 可以從 TableEnvironment 獲得。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 獲取 query configuration
val qConfig: StreamQueryConfig = tableEnv.queryConfig
// 設(shè)置查詢參數(shù)
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
// 定義查詢和 TableSink
val result: Table = ???
val sink: TableSink[Row] = ???
// TableSink 發(fā)送結(jié)果表時傳遞查詢參數(shù)
result.writeToSink(sink, qConfig)
// 轉(zhuǎn)換為 DataStream 時傳遞查詢參數(shù)
val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
空閑狀態(tài)保存時間
許多查詢在一個或多個屬性上進(jìn)行聚合或連接操作。當(dāng)在流上執(zhí)行此類查詢時,連續(xù)查詢需要收集記錄或維護(hù)每個鍵的結(jié)果值。如果輸入流的關(guān)鍵域正在演變,即活躍的鍵值隨時間變化,觀察到越來越多的不同鍵,連續(xù)查詢會累積越來越多的狀態(tài)。但是,通常在一段時間后一些鍵變?yōu)榉腔钴S,其對應(yīng)的狀態(tài)變得陳舊且無用。
例如,以下查詢計算每個會話的單擊次數(shù)。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
sessionId 屬性用于分組,連續(xù)查詢維護(hù)每個 sessionId 及其計數(shù)。sessionId 屬性隨著時間的推移而發(fā)展,并且 sessionId 值僅在會話結(jié)束之前有效(在有限的時間段內(nèi))。但是,連續(xù)查詢無法知道 sessionId 有此屬性,并期望每個 sessionId 值都可以在任何時間點發(fā)生。因此會維護(hù)每個觀察到的 sessionId 的計數(shù)。隨著 sessionId 觀察到越來越多,查詢的總狀態(tài)大小不斷增長。
空閑狀態(tài)保持時間(Idle State Retention Time)參數(shù)定義一個鍵的狀態(tài)在一次更新之后保存多久后刪除。對于前面的查詢示例,sessionId 只要在配置的時間段內(nèi)沒有更新,就會刪除對應(yīng)計數(shù)。
通過刪除鍵的狀態(tài),連續(xù)查詢會完全忘記它之前已經(jīng)看過這個鍵。如果刪除的鍵再次出現(xiàn),則被視為具有相應(yīng)鍵的第一個記錄。對于前面的查詢示例,這意味著 sessionId 的計數(shù)從0開始。
配置空閑狀態(tài)保存時間有兩個參數(shù):
- minimum idle state retention time,定義非活動鍵的狀態(tài)在刪除前至少保持多少時間。
- maximum idle state retention time,定義非活動鍵的狀態(tài)在刪除前最多保持多少時間。
對于前面的查詢示例:
val qConfig: StreamQueryConfig = ???
// 設(shè)置 idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
清理狀態(tài)需要額外的記錄,對于 minTime 和 maxTime 較大差異的情況成本更低,因此 minTime 和 maxTime 直接必須至少相差5分鐘。
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html