Table API 和 Flink SQL 是flink 對批處理和流處理,提供了統(tǒng)一的上層API。Table API 是一套內嵌在Java 和Scala 語言中的查詢API,它允許以非常直觀的方式組合來自一些關系運算符的查詢; Flink SQL 支持基于實現(xiàn) SQL 標準的 Apache Calcite。
簡單的用例
把一個流,轉成table api來操作數(shù)據(jù)
依賴
<!-- table API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.12.2</version>
</dependency>
測試數(shù)據(jù)
1,1623051400,test data,1
2,1623051401,test data,1
3,1623051402,test data,1
1,1623051405,test data,3
2,1623051406,test data,3
3,1623051409,test data,3
1,1623051410,test data,5
用例代碼
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1 讀取數(shù)據(jù)
DataStream<String> dataStream = env.readTextFile("D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
// 2 轉換數(shù)據(jù)
DataStream<EventData> map = dataStream.map(i -> {
String[] strs = i.split(",");
return new EventData(
strs[0],
Long.valueOf(strs[1]),
String.valueOf((Long.valueOf(strs[1]) - 5)),
Integer.valueOf(strs[3])
);
});
// 3 創(chuàng)建表得執(zhí)行環(huán)境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 4 基于流創(chuàng)建一個table
Table table = tableEnv.fromDataStream(map);
// 5 調用table api進行轉換操作
Table where = table
.filter($("id").isEqual("1"))
// 顯示哪些列
.select($("id"),$("eventTime"));
// 6 執(zhí)行sql
tableEnv.createTemporaryView("event_data",table); //基于table 創(chuàng)建一個匿名視圖的表名 eventData
// 沒有創(chuàng)建視圖不能使用 表名稱 eventData
String sql = "select id,eventTime from event_data where id = '2'";
Table sqlTable = tableEnv.sqlQuery(sql);
// 7 結果轉換成行
tableEnv.toAppendStream(where, Row.class).print("where");
tableEnv.toAppendStream(sqlTable, Row.class).print("sql");
env.execute("test");
}
結果
where> 1,1623051400
sql> 2,1623051401
where> 1,1623051405
sql> 2,1623051406
where> 1,1623051410
基于批處理或流處理的環(huán)境配置
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 基于老版本流處理
EnvironmentSettings oldSettings = EnvironmentSettings.newInstance()
.inStreamingMode()// 流處理
//.inBatchMode()// 批處理
.useOldPlanner() // 老版本
//.useBlinkPlanner() // 新版本
.build();
StreamTableEnvironment oldStringTableEnv = StreamTableEnvironment.create(env, oldSettings);
// 基于老版本批處理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchEnv = BatchTableEnvironment.create(batchEnv);
// 基于blink,blink 多了些功能以及架構上真正的批流統(tǒng)一,都是轉換成了 DataStream,不像老版本還有 DateSet
EnvironmentSettings blinkSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkStringTableEnv = StreamTableEnvironment.create(env, blinkSettings);
// 基于blink 批處理
EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkSettings);
}
表(table)
TableEnvironment 可以注冊目錄 Catalog,并可以基于 Catalog 注冊表,表是由一個“標識符”(identifier)來指定的,由3部分組成: Catalog名、數(shù)據(jù)庫名、對象名。
表可以是常規(guī)的,也可以是虛擬的(視圖,View),常規(guī)表一般可以用來描述外部數(shù)據(jù),比如文件、數(shù)據(jù)庫表或消息隊列的數(shù)據(jù),也可以直接從 DataStream 轉換而來;視圖(View)可以從現(xiàn)有的表中創(chuàng)建,通常是 table api 或者 sql 查詢的一個結果集。
創(chuàng)建表的執(zhí)行環(huán)境,需要將 flink 流處理的執(zhí)行環(huán)境傳入
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableEnvironment 是 flink 中集成 table api 和 sql 的核心概念,所有對表得操作都基于 TableEnvironment,包括注冊Catalog、在Catalog中注冊表、執(zhí)行sql查詢、注冊用戶自定義函數(shù)(UDF)
優(yōu)化用例
直接創(chuàng)建 TableEnvironment 來讀取文件,并用Table API做查詢。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 連接外部系統(tǒng)讀取數(shù)據(jù)
String path = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt";
// 類似于 source
tableEnv.connect(
new FileSystem().path(path)
// 類似于 split
).withFormat(
// 引入 flink csv 依賴,默認 , 分割
new Csv()
// 類似于 map 轉換
).withSchema(
new Schema().field("id", DataTypes.STRING())
.field("eventTime", DataTypes.BIGINT())
.field("data", DataTypes.STRING())
.field("num", DataTypes.INT())
).createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema();
// 簡單查詢轉換
Table resultTable = inputTable.select($("id"), $("eventTime"))
// 大于 1623051405
.filter($("eventTime").isGreater(1623051405))
// id = 1
.where($("id").isEqual("1"));
// 聚合統(tǒng)計
Table aggTable = inputTable.groupBy($("id"))
.select($("id").count().as("ct"),$("eventTime").avg().as("et"));
// sql寫法
Table rt = tableEnv.sqlQuery("select id,eventTime from inputTable where eventTime > 1623051405 and id = '1'");
Table at = tableEnv.sqlQuery("select id,count(id) as ct,eventTime,avg(eventTime) as ev from inputTable group by id,eventTime");
//打印輸出
tableEnv.toAppendStream(inputTable, Row.class).print("inputTable");
tableEnv.toAppendStream(resultTable, Row.class).print("resultTable");
// group by 操作會讓數(shù)據(jù)發(fā)生改變,所以不是普通的 append 追加操作
tableEnv.toRetractStream(aggTable, Row.class).print("aggTable");
tableEnv.toAppendStream(rt, Row.class).print("rt");
tableEnv.toRetractStream(at, Row.class).print("at");
env.execute("test");
}
toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[id], select=[id, COUNT(id) AS EXPR
1]) 這句話的意思是 使用到 groupy by 有count 操作等都是值更新操作,不是 append 追加操作結果,所以輸出 toRetractStream ,toRetractStream 是撤回流,會把上一次結果撤回改成 false,輸出新的結果 true。
用例數(shù)據(jù)輸出到另一個文件
對于數(shù)據(jù)的寫入在某些是有要求的,聚合操作的結果集就沒辦法寫到文件,聚合操作會輸出兩條結果,一個上一次結果的撤回 false,一次新結果的輸出 true,是沒辦法追加到文件系統(tǒng)的。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 連接外部系統(tǒng)讀取數(shù)據(jù)
String path = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt";
// 類似于 source
tableEnv.connect(
new FileSystem().path(path)
// 類似于 split
).withFormat(
// 引入 flink csv 依賴,默認 , 分割
new Csv()
// 類似于 map 轉換
).withSchema(
new Schema().field("id", DataTypes.STRING())
.field("eventTime", DataTypes.BIGINT())
.field("data", DataTypes.STRING())
.field("num", DataTypes.INT())
).createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema();
// 簡單查詢轉換
Table resultTable = inputTable.select($("id"), $("eventTime"))
// 大于 1623051405
.filter($("eventTime").isGreater(1623051405))
// id = 1
.where($("id").isEqual("1"));
// 聚合統(tǒng)計
Table aggTable = inputTable.groupBy($("id"))
.select($("id").count().as("ct"),$("eventTime").avg().as("et"));
// 輸出到另一個文件
String outPutPath = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\out.txt";
// 類似于 source
tableEnv.connect(
new FileSystem().path(outPutPath)
// 類似于 split
).withFormat(
// 引入 flink csv 依賴,默認 , 分割
new Csv()
// 類似于 map 轉換
).withSchema(
new Schema().field("id", DataTypes.STRING())
.field("eventTime", DataTypes.BIGINT())
).createTemporaryTable("outPutTable");
// 寫道另一個文件,如果是 aggTable 是不能寫入到文件的
resultTable.executeInsert("outPutTable",false);
env.execute("test");
}
flink table api 之 kafka
KafkaTableSinkBase 實現(xiàn)的是 AppendStreamTableSink 所以也沒辦法把聚合數(shù)據(jù)寫進去,就像以上 CSVTableSInkBase 也是實現(xiàn)了 AppendStreamTableSink ,都無法去寫入聚合的數(shù)據(jù)。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 建立源
tableEnv.connect(
// 需要引入flink-connector-kafka_2.12<
new Kafka()
// kafka 版本
.version("0.12")
.topic("topic_consumer")
.property("zookeeper.connect","localhost:2181")
.property("bootstrap.servers","localhost:9092")
).withFormat(
new Csv()
).withSchema(
new Schema().field("id", DataTypes.STRING())
.field("eventTime", DataTypes.BIGINT())
.field("data", DataTypes.STRING())
.field("num", DataTypes.INT())
).createTemporaryTable("inputTable");
// 簡單轉換
Table inputTable = tableEnv.from("inputTable");
// 輸出kafka
tableEnv.connect(
// 需要引入flink-connector-kafka_2.12<
new Kafka()
// kafka 版本
.version("0.12")
.topic("topic_producer")
.property("zookeeper.connect","localhost:2181")
.property("bootstrap.servers","localhost:9092")
).withFormat(
new Csv()
).withSchema(
new Schema().field("id", DataTypes.STRING())
.field("eventTime", DataTypes.BIGINT())
.field("data", DataTypes.STRING())
.field("num", DataTypes.INT())
).createTemporaryTable("outPutTable");
// KafkaTableSinkBase 實現(xiàn)的是 AppendStreamTableSink 所以也沒辦法把聚合數(shù)據(jù)寫進去
inputTable.executeInsert("outPutTable");
env.execute("test");
}
對于流式查詢,需要聲明如何在表和外部連接器之間執(zhí)行轉換,與外部系統(tǒng)交換的消息類型,由更新模式(update mode)指定。
- 追加(Append)
表制作插入操作,和外部連接器只交換插入(insert)消息 - 撤回(Retract)
表和外部連接器交換添加(Add)和撤回(Retract)消息。插入操作(insert)編碼為Add消息;刪除(Delete)編碼為 Retract消息;更新(Upsert)編碼為上一條的Retract和嚇一跳的Add消息。 - 更新插入(Upsert)
更新和插入都被編碼為Upsert消息;刪除編碼為Delet消息
kafka 和 文本不支持,但 支持 ES(ElasticSearchUpsertTableSinkBase 類)、Mysql(需要引入 flink-jdbc_2.12)
將 Table 轉換成 DataStream
表可以轉換為 DataStream 或 DataSet,這樣自定義流處理或批處理程序就可以繼續(xù)在 Table API 或 SQL 查詢的結果上運行了。將表轉換為 DataStream 或 DataSet 時,需要指定生成的數(shù)據(jù)類型,即要將表的每一行轉換成指定數(shù)據(jù)類型。表作為流式查詢的結果,是動態(tài)更新的,轉換有兩種轉換模式:追加(Append)模式和撤回(Retract)模式
追加模式,用于表只會被插入(insert)操作場景
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable,Row.class)
撤回模式,用于任何場景。有些類似于更新模式中 Retract模式,他只有 insert 和 Delete 兩類操作。得到的數(shù)據(jù)會增加一個 Boolean 類型的標識位(返回的第一個字段),用它來表示到底是新增的數(shù)據(jù)(insert),還是被刪除的數(shù)據(jù)(Delete)
DataStream<Tuple2<Boolean,Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable,Row.class)
流轉換成表
// 3 創(chuàng)建表得執(zhí)行環(huán)境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 4 基于流創(chuàng)建一個table
Table table = tableEnv.fromDataStream(map);
默認轉換后的 Table schema 和 DataStream 中的字段定義一一對應,也可以單獨指定出來
Table table = tableEnv.fromDataStream(map,"id,eventTime as et,data,num");
基于DataStream 創(chuàng)建臨時視圖
tableEnv.createTemporaryView("view_name",dataStream);
tableEnv.createTemporaryView("view_name",dataStream,"id,eventTime as et,data,num");
基于Table創(chuàng)建臨時視圖
tableEnv.createTemporaryView("view_name",table);
查看執(zhí)行計劃
tableEnv.explain(resultTable);
動態(tài)表(Dynamic Tables)
動態(tài)表是Flink對流數(shù)據(jù)的 Table API 和 SQL 支持的核心概念,它表示批處理數(shù)據(jù)的靜態(tài)表不同,動態(tài)表是隨時間變化的。
動態(tài)表可以像靜態(tài)的批處理一樣進行查詢,查詢一個動態(tài)表會產生持續(xù)查詢(Continuous Query),連續(xù)查詢永遠不會終止,并會生成另一個動態(tài)表,查詢會不斷更新其動態(tài)結果表,以反映其動態(tài)輸入表上的更改。
流式表查詢的處理過程:1. 流被轉換為動態(tài)表;2. 對動態(tài)表計算連續(xù)查詢,生成新的動態(tài)表;3. 生成的動態(tài)表被轉換回流

一常規(guī)數(shù)據(jù)庫表一樣,動態(tài)表可以通過插入(Insert)、更新(Update)和刪除(Delete)更改,進行持續(xù)的修改,將動態(tài)表轉換為流或將其寫入外部系統(tǒng)時,需要對這些更改進行編碼:追加流(Append-only)、撤回流(Retract)、更新插入流(Upsert)
時間特性(Time Attributes)
基于時間的操作(比如 Table API 和 SQL 中窗口操作),需要定義相關的時間語義和時間數(shù)據(jù)來源的信息。
Table 可以提供一個邏輯上的時間字段,用于在表處理程序中,指示時間和訪問相應的時間戳。
時間屬性,可以是每個表schema的一部分。一旦定義了時間屬性,他就可以作為一個字段引用,并且可以在基于時間的操作中使用。時間屬性的行為類似于常規(guī)的時間戳,可以訪問,并且進行計算。
定義處理時間(Processing Time)
處理時間語義下,允許表處理程序根據(jù)機器的本地時間生成成果。它是時間的最簡單概念。它既不需要提取時間戳,也不需要生成 watermark。
由 DataStream 轉換成表時指定。在定義 Schema 期間,可以使用 .proctime,指定字段名定義處理時間字段。這個 proctime 屬性只能通過附加邏輯字段,來擴展物理 schema。因此,只能 schema 定義的末尾定義它。
Table table = tavleEnv.fromDataStream(dataStream,"id,eventTime,data,num.pt.proctime");
connect 中使用
.withSchema(
new Schema().field("id", DataTypes.STRING())
.field("eventTime", DataTypes.BIGINT())
.field("data", DataTypes.STRING())
.field("num", DataTypes.INT())
.field("pt", DataTypes.TIMESTAMP(3))
.proctime()
mysql 中定義,PROCTIME 只有blink 中支持,要引入 blink api
String sinkDDL = "create table dataTable( "+
"id varchar(20) not null, "+
"eventTime bigint, "+
"data varchar(30), "+
"num int, "+
"pt as PROCTIME(), "+
") with ( "+
" 'connector.type' = 'filesystem' "+
" 'connector.path' = '/test.tct' "+
" 'format.type' = 'csv' ";
tableEnv.sqlUpdate(sinkDDL)
定義事件事件(Event Time)
事件時間語義,允許表處理程序根據(jù)每個記錄中包含的時間生成結果。這樣即使在有序亂序事件或延遲事件時,也可以獲得正確的結果。
為了處理無須事件,并區(qū)分流中的準時和遲到事件;Flink 需要從事件數(shù)據(jù)中,提取時間戳,并用來推進事件時間的進展。
定義事件事件,同樣有三種方法:由 DataStream 轉換成表時指定;定義 Table Schema 時指定;在創(chuàng)建表的 DDL 中定義。
// 將 DataStream 轉換位Table,并指定時間字段
Table table = tableEnv.fromDataStream(dataStream,"id,eventTime.rowtime,data,num");
// 或者,直接追加時間字段
Table table = tableEnv.fromDataStream(dataStream,"id,eventTime,data,num,rt.rowtime");
connect 中定義
.withSchema(
new Schema().field("id", DataTypes.STRING())
.field("eventTime", DataTypes.BIGINT())
.field("data", DataTypes.STRING())
.field("num", DataTypes.INT())
.rowtime(
new Rowtime()
// 從字段中提取時間戳
.timestampsFromField("eventTime")
// waternark 延遲1s
.watermarksPeriodicBounded(1000)
)
)
sql ddl 定義,需要 blink api
String sinkDDL = "create table dataTable( "+
"id varchar(20) not null, "+
"eventTime bigint, "+
"data varchar(30), "+
"num int, "+
"rt as TO_TIMESTAMP( FROM_UNIXTIME(eventTime) ), "+
"watermark for rt as rt - interval '1' second "+
") with ( "+
" 'connector.type' = 'filesystem' "+
" 'connector.path' = '/test.tct' "+
" 'format.type' = 'csv' ";
tableEnv.sqlUpdate(sinkDDL)
窗口
時間語義,要配合敞口操作才能發(fā)揮作用,在 Table API 和 SQL 中,主要有兩種裝口:group Windows(分組窗口),根據(jù)時間或行計數(shù)間隔,將行聚合到有限的組(Group)中,并對每個組的數(shù)據(jù)執(zhí)行一次聚合函數(shù);Over Windows,針對每個輸入行,計算相鄰行范圍內的聚合。
Group Windows
Group Windows 時使用 window 定義的,并且必須又 as 子句指定一個別名。為了按窗口進行分組,窗口的別名必須在 greop by 子句中,像常規(guī)的分組字段一樣引用。
Table aggTable = inputTable
.window([w:GroupWindow] as "w") // 定義窗口,別名為 w
.groupBy("w,a") // 按照字段a和窗口 w分組
.select("a,b.sum"); // 聚合
Table API 提供了一組具有特定語義的預定義Window類,這些類會被轉換為底層DataStream 或 DataSet 的窗口操作。
滾動窗口
// 開了一個10分鐘的滾動窗口,指定一個字段 rowtime,as 別名
.window(Tumble.over($("10.minutes")).on($("rowtime")).as("w"))
// 處理時間
.window(Tumble.over($("10.minutes")).on($("proctime")).as("w"))
// 計數(shù)窗口 10.rows 10行
.window(Tumble.over($("10.rows")).on($("proctime")).as("w"))
滑動窗口
// 事件事件滑動窗口,長度是10分鐘,5分鐘滑動一次
.window(Slide.over($("10.minutes")).every($("5.minutes")).on("rowtime").as("w"))
// 處理時間滑動窗口
.window(Slide.over($("10.minutes")).every($("5.minutes")).on("proctime").as("w"))
// 計數(shù)滑動窗口
.window(Slide.over($("10.rows")).every($("5.rows")).on("proctime").as("w"))
會話窗口
// 最小間隔時間
.window(Session.withGap($("10.minutes")).on("rowtime").as("w"))
.window(Session.withGap($("10.minutes")).on("proctime").as("w"))
sql 中的定義,在查詢的 group by 子句中使用
// 定義一個滾動窗口,第一個參數(shù)是時間字段,第二個參數(shù)是長度
TUMBLE(time_attr,interval)
// 定義一個滑動窗口,第一個參數(shù)是時間字段,第二個參數(shù)是滑動長度,第三個是窗口長度
HOP(time_attr,interval,interval)
// 定義一個會話窗口,第一個參數(shù)是時間字段,第二個參數(shù)窗口間隔
SESSION(time_attr,interval)
基于ddl 開窗代碼演示
ddl 更多操作,1.12 中不推薦 connect,推薦使用 ddl 來操作。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 連接外部系統(tǒng)讀取數(shù)據(jù)
String path = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt";
// 類似于 source
tableEnv.connect(
new FileSystem().path(path)
// 類似于 split
).withFormat(
// 引入 flink csv 依賴,默認 , 分割
new Csv()
// 類似于 map 轉換
).withSchema(
new Schema().field("id", DataTypes.STRING())
.field("data", DataTypes.STRING())
.field("num", DataTypes.INT())
.field("eventTime", DataTypes.TIMESTAMP(3))
).createTemporaryTable("inputTable");
String ddl = "CREATE TABLE MyUserTable ( " +
" id STRING, " +
" eventTime BIGINT, " +
" data STRING, " +
" num INTEGER, " +
" rt as TO_TIMESTAMP( FROM_UNIXTIME(eventTime) ), " +
" watermark for rt as rt - interval '5' second " +
") WITH ( " +
" 'connector.type' = 'filesystem'," +
" 'connector.path' = 'D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt', " +
" 'format.type' = 'csv' " +
")";
tableEnv.executeSql(ddl);
String sql = "select " +
"count(id) as ct,id,tumble_start(rt,interval '15' second) as st,tumble_end(rt,interval '15' second) as ed " +
"from MyUserTable " +
"group by id,tumble(rt,interval '15' second) ";
// 第一種方式
Table table = tableEnv.sqlQuery(sql);
tableEnv.toRetractStream(table, Row.class).print("sql1");
// 第二種方式 打印sqk結構
tableEnv.executeSql(sql).print();
// 第三種方式 使用api
Table myUserTable = tableEnv.from("MyUserTable");
Table select = myUserTable.window(Tumble.over(lit(15).seconds()).on($("rt")).as("w"))
.groupBy($("id"), $("w"))
.select($("id").count().as("ct"),$("w").start().as("st"),$("w").end().as("ed"));
tableEnv.toAppendStream(select, Row.class).print("api");
env.execute("test");
}
Over Windows
Over Window 聚合是標準 SQL 中已有得(over 子句),可以在查詢的 SELECT 子句中定義;Over Window 聚合,會針對每個輸入行,計算相鄰行范圍內的聚合;Over Window 使用 windows(w:overwindows*)子句定義,并在select() 方法中通過別名來引用。
table table = input.window([e:OverWindow] as "w").select("a,b.sum over w,c.min over w");
Table API 提供了 Over類,來配置 Over窗口的屬性。
無界 Over Windows
可以在事件事件或處理事件,以及指定為時間間隔、或行計數(shù)的范圍內,定義 Over Window,無界的over window是使用常量指定的。
// 無界的事件時間
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RABGE).as("w"))
// 無界的處理時間
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RABGE).as("w"))
// 無界的事件時間
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RABGE).as("w"))
// 無界的處理時間
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))
有界 Over Windows
// 有界的事件時間
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// 有界的處理時間
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
sql 中的 Over Window
用 Over 做窗口聚合時,所有聚合必須在同一窗口上定義,也就是說必須是相同的分區(qū)、排序和范圍;目前僅支持在當前行范圍之前的窗口;Order By 必須在單一的時間屬性上指定。
select count(id) over(
partition by id
order by eventTime
// 當前行以及前兩行
row between 2 preceding and current row
) from table
代碼演示
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 連接外部系統(tǒng)讀取數(shù)據(jù)
String path = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt";
// 類似于 source
tableEnv.connect(
new FileSystem().path(path)
// 類似于 split
).withFormat(
// 引入 flink csv 依賴,默認 , 分割
new Csv()
// 類似于 map 轉換
).withSchema(
new Schema().field("id", DataTypes.STRING())
.field("data", DataTypes.STRING())
.field("num", DataTypes.INT())
.field("eventTime", DataTypes.TIMESTAMP(3))
).createTemporaryTable("inputTable");
String ddl = "CREATE TABLE MyUserTable ( " +
" id STRING, " +
" eventTime BIGINT, " +
" data STRING, " +
" num INTEGER, " +
" rt as TO_TIMESTAMP( FROM_UNIXTIME(eventTime) ), " +
" watermark for rt as rt - interval '5' second " +
") WITH ( " +
" 'connector.type' = 'filesystem'," +
" 'connector.path' = 'D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt', " +
" 'format.type' = 'csv' " +
")";
tableEnv.executeSql(ddl);
Table myUserTable = tableEnv.from("MyUserTable");
Table select = myUserTable.window(Over.partitionBy($("id")).orderBy($("rt")).preceding(rowInterval(2L)).as("w"))
.select("id,rt,id.count over w,num.sum over w");
String sql = "select id,rt,count(id) over w,sum(num) over w "+
" from MyUserTable "+
" window w as ( partition by id order by rt rows between 2 preceding and current row ) ";
Table Table = tableEnv.sqlQuery(sql);
tableEnv.executeSql(sql).print();
tableEnv.toAppendStream(Table, Row.class).print("sql");
tableEnv.toRetractStream(select, Row.class).print("api");
env.execute("test");
}
數(shù)據(jù)
1,1623051400,test data,1
2,1623051401,test data,3
3,1623051402,test data,5
1,1623051405,test data,7
2,1623051406,test data,9
3,1623051409,test data,2
1,1623051415,test data,4
2,1623051416,test data,6
3,1623051417,test data,7
1,1623051418,test data,9
2,1623051419,test data,3
3,1623051420,test data,2
1,1623051421,test data,2
1,1623051422,test data,3
1,1623051423,test data,1
結果
+----+--------------------------------+-------------------------+----------------------+-------------+
| op | id | rt | EXPR$2 | EXPR$3 |
+----+--------------------------------+-------------------------+----------------------+-------------+
| +I | 1 | 2021-06-07T15:36:40 | 1 | 1 |
| +I | 2 | 2021-06-07T15:36:41 | 1 | 3 |
| +I | 3 | 2021-06-07T15:36:42 | 1 | 5 |
| +I | 1 | 2021-06-07T15:36:45 | 2 | 8 |
| +I | 2 | 2021-06-07T15:36:46 | 2 | 12 |
| +I | 3 | 2021-06-07T15:36:49 | 2 | 7 |
| +I | 1 | 2021-06-07T15:36:55 | 3 | 12 |
| +I | 2 | 2021-06-07T15:36:56 | 3 | 18 |
| +I | 3 | 2021-06-07T15:36:57 | 3 | 14 |
| +I | 1 | 2021-06-07T15:36:58 | 3 | 20 |
| +I | 2 | 2021-06-07T15:36:59 | 3 | 18 |
| +I | 3 | 2021-06-07T15:37 | 3 | 11 |
| +I | 1 | 2021-06-07T15:37:01 | 3 | 15 |
| +I | 1 | 2021-06-07T15:37:02 | 3 | 14 |
| +I | 1 | 2021-06-07T15:37:03 | 3 | 6 |
+----+--------------------------------+-------------------------+----------------------+-------------+
partition by id order by rt rows between 2 preceding and current row, 是以 id 分組,計算當前行的前2行,也就是一共3行,的值的計算。第一條數(shù)據(jù)進來,對于他來說就是當前行,前兩行沒有。
更詳細的 over 概念可以看這篇文章
函數(shù)
Flink Table API 和 SQL 為用戶提供了一組用于數(shù)據(jù)轉換的內置函數(shù),sql中支持的很多函數(shù),Table API 和 SQL 都已經做了實現(xiàn)。
比較函數(shù) 邏輯函數(shù) 算數(shù)函數(shù)
SQL: SQL: SQL:
value1 = value2 boolean1 or boolean2 numeric1+numeric2
value1 > value2 boolean1 is false power(numeric1,numeric2)
not boolean
Table API: Table API:
ANY1 === ANY2 boolean1 || boolean2 numeric1+numeric2
ANY1 > ANY2 boolean.isFalse numeric1.power(numeric2)
!boolean
字符串函數(shù) 時間函數(shù) 聚合函數(shù)
SQL: SQL: SQL:
string1 || string2 date string count(*)
upper(string) timestamp string sum(expression)
char_length(string) current_time rank()
interval string range row_number()
Table API: Table API: Table API:
string1 + string2 string.toDate field.count
string.upperCase() string.toTimestamp field.sum
string.charLength() currentTime()
numeric.days
numeric.minutes
用戶自定義函數(shù)(UDF)
用戶自定義函數(shù)(User defined Functions,UDF)是一個重要得特性,他們顯著地擴展了查詢的表達能力。
在大多數(shù)情況下,用戶定義的函數(shù)必須先注冊,然后才能在查詢中使用;函數(shù)通過調用 registerFunctionn() 方法在 TableEnvironment 的函數(shù)目錄中,這樣Table API或 SQL解析器就可以識別并正確地解釋它。
標量函數(shù)(Scalar Functions)
用戶自定義的標量函數(shù),可以將0、1或多個標量值,映射到新的標量值;為了定義標量函數(shù),必須在 org.apache.flink.table.functions 中擴展基類 Scalar Function,并實現(xiàn)(一個或多個)求值(eval)方法。簡單來說,就是把一個表的字段傳入解析,列中可以顯示這個被解析的字段的結果。
標量函數(shù)的行為由求值方法決定,求值方法必須公開聲明并名命為 eval。
public static void main(String[] args) {
// 執(zhí)行環(huán)境創(chuàng)建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 讀取數(shù)據(jù)
DataStream<String> dataStream = env.readTextFile("D:\\workspace\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
// 轉換數(shù)據(jù)
DataStream<EventData> map = dataStream.map(i -> {
String[] strs = i.split(",");
return new EventData(
strs[0],
Long.valueOf(strs[1]),
String.valueOf((Long.valueOf(strs[1]) - 5)),
Integer.valueOf(strs[3])
);
});
// 將流轉換成表
Table dataTable = tableEnv.fromDataStream(map, $("id"), $("eventTime"), $("data"), $("num"));
HashCode hashCode = new HashCode(20);
// 創(chuàng)建一個臨時UDF 注冊到環(huán)境中
tableEnv.createTemporarySystemFunction("hashCode",hashCode);
// table api 使用自定義函數(shù)
dataTable.select($("id"),call("hashCode",$("id"))).execute().print();
tableEnv.createTemporaryView("event_data",map);
tableEnv.sqlQuery("select id,hashCode(id) from event_data").execute().print();
}
public static class HashCode extends ScalarFunction{
private int factor = 10;
// 可以定義構造函數(shù)來傳標準配置
public HashCode(int factor){
this.factor = factor;
}
// 必須是 public 返回類型和參數(shù)類型隨便定,但方法名必須交 eval
public int eval(String id){
return id.hashCode() % factor;
}
}
表函數(shù)(Table Functions)
用戶定義的表函數(shù),也可以將0、1或多個標量值作為輸入參數(shù);與標量函數(shù)不同的是,它可以返回任意數(shù)量的行為作為輸出,而不是單個值。簡單來說就是把表的字段傳入解析,一行變多行。
為了定義一個表函數(shù),必須擴展 org.apache.flink.table.functions 中的基類 TableFuntion 并實現(xiàn)(一個或多個)求值方法;表函數(shù)的行為由其求值方法決定,求值方法必須是 public的,并命名為 eval。
public static void main(String[] args) {
// 執(zhí)行環(huán)境創(chuàng)建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 讀取數(shù)據(jù)
DataStream<String> dataStream = env.readTextFile("D:\\workspace\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
// 轉換數(shù)據(jù)
DataStream<EventData> map = dataStream.map(i -> {
String[] strs = i.split(",");
return new EventData(
strs[0],
Long.valueOf(strs[1]),
strs[2],
Integer.valueOf(strs[3])
);
});
// 將流轉換成表
Table dataTable = tableEnv.fromDataStream(map, $("id"), $("eventTime"), $("data"), $("num"));
Split split = new Split(" ");
// 創(chuàng)建一個臨時UDF 注冊到環(huán)境中
tableEnv.createTemporarySystemFunction("split",split);
// table api 使用自定義函數(shù)
dataTable.joinLateral(call("split",$("data")).as("word","length"))
.select($("id"),$("data"),$("word"),$("length"))
.execute()
.print();
tableEnv.createTemporaryView("event_data",map);
tableEnv.sqlQuery("select id,data,word,length from event_data,lateral table(split(data)) as aplitid(word,length)")
.execute()
.print();
}
public static class Split extends TableFunction<Tuple2<String,Integer>> {
private String mark = ",";
// 可以定義構造函數(shù)來傳標準配置
public Split(String mark){
this.mark = mark;
}
// 必須是 public 返回類型和參數(shù)類型隨便定,但方法名必須交 eval
public void eval(String data){
for(String str : data.split(mark)) {
collect(new Tuple2<String, Integer>(str,str.length()));
}
}
}
聚合函數(shù)(Aggregate Functions)
用戶定義聚合函數(shù)(User Defined Aggregate Functions,UDAGGs)可以把一個表中的數(shù)據(jù),聚合成一個標量值;用戶定義的聚合函數(shù),是通過繼承 AggregateFunction 抽象類實現(xiàn)的。對表分組聚合,相同組的會聚合成一條數(shù)據(jù)。

AggregateFunction 要求必須實現(xiàn)的方法:createAccumulator()、Accumulate()、getValue();
AggregateFunction的工作原理:首先,它需要一個累加器(Accumulate),用來保存聚合中間結果的數(shù)據(jù)結構,可以通過調用 createAccumulator() 方法創(chuàng)建空累加器;隨后,對每個輸入行調用函數(shù)的 Accumulate() 方法來更新累加器;處理完所有行,將調用函數(shù)的 getValue() 方法來計算并返回最終結果。
public static void main(String[] args) {
// 執(zhí)行環(huán)境創(chuàng)建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 讀取數(shù)據(jù)
DataStream<String> dataStream = env.readTextFile("D:\\workspace\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
// 轉換數(shù)據(jù)
DataStream<EventData> map = dataStream.map(i -> {
String[] strs = i.split(",");
return new EventData(
strs[0],
Long.valueOf(strs[1]),
strs[2],
Integer.valueOf(strs[3])
);
});
// 將流轉換成表
Table dataTable = tableEnv.fromDataStream(map, $("id"), $("eventTime"), $("data"), $("num"));
Avg avg = new Avg();
// 創(chuàng)建一個臨時UDF 注冊到環(huán)境中
tableEnv.createTemporarySystemFunction("avg_0",avg);
// table api 使用自定義函數(shù)
dataTable.groupBy($("id"))
.select($("id"),call("avg_0",$("num").as("avg_num")))
.execute()
.print();
tableEnv.createTemporaryView("event_data",map);
tableEnv.sqlQuery("select id,avg_0(num) as avg_num from event_data group by id ")
.execute()
.print();
}
// 求 num 平均值
public static class Avg extends AggregateFunction<Double,Tuple2<Double,Integer>> {
// 求平均值
@Override
public Double getValue(Tuple2<Double, Integer> doubleIntegerTuple2) {
return doubleIntegerTuple2.f0 / doubleIntegerTuple2.f1;
}
// 初始化狀態(tài)
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
// 必須實現(xiàn) accumulate 方法,來數(shù)據(jù)之后更新狀態(tài),acc 為當前狀態(tài),tmp 為輸入數(shù)據(jù)
public void accumulate(Tuple2<Double, Integer> acc,Double tmp){
acc.f0 += tmp;
acc.f1 += 1;
}
}
表聚合函數(shù)
表聚合函數(shù) 和 聚合函數(shù)的區(qū)別是,對表分組聚合,相同組的會聚合成多行多列的結果表;表聚合函數(shù)通過繼承 TableAggregateFunction 抽象類來實現(xiàn)的。

public static void main(String[] args) {
// 執(zhí)行環(huán)境創(chuàng)建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 讀取數(shù)據(jù)
DataStream<String> dataStream = env.readTextFile("D:\\workspace\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
// 轉換數(shù)據(jù)
DataStream<EventData> map = dataStream.map(i -> {
String[] strs = i.split(",");
return new EventData(
strs[0],
Long.valueOf(strs[1]),
strs[2],
Integer.valueOf(strs[3])
);
});
// 將流轉換成表
Table dataTable = tableEnv.fromDataStream(map, $("id"), $("eventTime"), $("data"), $("num"));
// Top2 top2 = new Top2();
// 創(chuàng)建一個臨時UDF 注冊到環(huán)境中
// tableEnv.createTemporarySystemFunction("top2",top2);
tableEnv.registerFunction("top2", new Top2());
// table api 使用自定義函數(shù)
dataTable.groupBy($("id"))
.flatAggregate("top2(num) as (TOP1, TOP2)")
.select($("id"),$("TOP1"),$("TOP2"))
.execute()
.print();
}
// 求 num 平均值
// 第一個泛型是輸出,第二個泛型是狀態(tài)
public static class Top2 extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>> {
// 輸出
public void emitValue(Tuple2<Integer, Integer> acc, Collector<Tuple2<Integer, Integer>> out){
out.collect(acc);
}
// 初始化狀態(tài)
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return new Tuple2<>(0,0);
}
// 必須實現(xiàn) accumulate 方法,來數(shù)據(jù)之后更新狀態(tài),acc 為當前狀態(tài),tmp 為輸入數(shù)據(jù)
public void accumulate(Tuple2<Integer, Integer> acc,Integer tmp){
// 如果 tem 大于 f0,且 f0 > f1 那么
// 三個數(shù)降序 取前兩個
List<Integer> doubles = Arrays
.asList(acc.f0, acc.f1,tmp)
.stream()
.sorted(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return Integer.compare(o1,o2);
}
}.reversed())
.collect(Collectors.toList());
acc.f0 = doubles.get(0);
acc.f1 = doubles.get(1);
}
}