Table API和SQL通過(guò)join API集成在一起,這個(gè)join API的核心概念是Table,Table可以作為查詢(xún)的輸入和輸出。這篇文檔展示了使用Table API和SQL查詢(xún)的程序的通用結(jié)構(gòu),如何注冊(cè)一個(gè)Table,如何查詢(xún)一個(gè)Table以及如何將數(shù)據(jù)發(fā)給Table。
Table API和SQL查詢(xún)程序的結(jié)構(gòu)
所有批處理和流處理的Table API、SQL程序都有如下相同的模式,下面例子的代碼展示了Table API和SQL程序的通用結(jié)構(gòu):
// 對(duì)于批處理程序來(lái)說(shuō)使用 ExecutionEnvironment 來(lái)替換 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 創(chuàng)建一個(gè)TableEnvironment
// 對(duì)于批處理程序來(lái)說(shuō)使用 BatchTableEnvironment 替換 StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 注冊(cè)一個(gè) Table
tableEnv.registerTable("table1", ...) // 或者
tableEnv.registerTableSource("table2", ...); // 或者
tableEnv.registerExternalCatalog("extCat", ...);
// 從Table API的查詢(xún)中創(chuàng)建一個(gè)Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 從SQL查詢(xún)中創(chuàng)建一個(gè)Table
Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... ");
// 將Table API 種的結(jié)果 Table 發(fā)射到TableSink中 , SQL查詢(xún)也是一樣的
tapiResult.writeToSink(...);
// 執(zhí)行
env.execute();
// 對(duì)于批處理程序來(lái)說(shuō)使用 ExecutionEnvironment 來(lái)替換 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 創(chuàng)建一個(gè)TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注冊(cè)一個(gè) Table
tableEnv.registerTable("table1", ...) // 或者
tableEnv.registerTableSource("table2", ...) // 或者
tableEnv.registerExternalCatalog("extCat", ...)
// 從Table API的查詢(xún)中創(chuàng)建一個(gè)Table
val tapiResult = tableEnv.scan("table1").select(...)
// 從SQL查詢(xún)中創(chuàng)建一個(gè)Table
val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...")
// 將Table API 種的結(jié)果 Table 發(fā)射到TableSink中 , SQL查詢(xún)也是一樣的
tapiResult.writeToSink(...)
// 執(zhí)行
env.execute()
注意:Table API 和 SQL查詢(xún)可以輕易地進(jìn)行集成并嵌入到DataStream或者DataSet程序中,請(qǐng)參考Integration With DataStream and DataSet API部分來(lái)了解DataStream和DataSet如何轉(zhuǎn)換成Table及Table如何轉(zhuǎn)換成DataStream和DataSet。
創(chuàng)建一個(gè)TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念,它主要負(fù)責(zé):
1、在內(nèi)部目錄中注冊(cè)一個(gè)Table
2、注冊(cè)一個(gè)外部目錄
3、執(zhí)行SQL查詢(xún)
4、注冊(cè)一個(gè)用戶(hù)自定義函數(shù)(標(biāo)量、表及聚合)
5、將DataStream或者DataSet轉(zhuǎn)換成Table
6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一個(gè)Table總是會(huì)綁定到一個(gè)指定的TableEnvironment中,相同的查詢(xún)不同的TableEnvironment是無(wú)法通過(guò)join、union合并在一起。
TableEnvironment可以通過(guò)調(diào)用帶有參數(shù)StreamExecutionEnvironment或者ExecutionEnvironment和一個(gè)可選參數(shù)TableConfig的靜態(tài)方法TableEnvironment.getTableEnvironment()來(lái)創(chuàng)建。TableConf可以用來(lái)配置TableEnvironment或者自定義查詢(xún)優(yōu)化器和翻譯過(guò)程(參考查詢(xún)優(yōu)化器)
// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 為streaming查詢(xún)創(chuàng)建一個(gè) TableEnvironment
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);
// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// 為批查詢(xún)創(chuàng)建一個(gè) TableEnvironment
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 為流查詢(xún)創(chuàng)建一個(gè) TableEnvironment
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// 為批查詢(xún)創(chuàng)建一個(gè) TableEnvironment
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
在Catalog(目錄)中注冊(cè)一個(gè)Table
TableEnvironment有一個(gè)在內(nèi)部通過(guò)表名組織起來(lái)的表目錄,Table API或者SQL查詢(xún)可以訪(fǎng)問(wèn)注冊(cè)在目錄中的表,并通過(guò)名稱(chēng)來(lái)引用它們。
TableEnvironment允許通過(guò)各種源來(lái)注冊(cè)一個(gè)表:
1、一個(gè)已存在的Table對(duì)象,通常是Table API或者SQL查詢(xún)的結(jié)果
2、TableSource,可以訪(fǎng)問(wèn)外部數(shù)據(jù)如文件、數(shù)據(jù)庫(kù)或者消息系統(tǒng)
3、DataStream或者DataSet程序中的DataStream或者DataSet
將DataStream或者DataSet注冊(cè)為一個(gè)表將在Integration With DataStream and DataSet API中討論。
注冊(cè)一個(gè)Table
一個(gè)Table可以在TableEnvironment中按照下面程序注冊(cè):
// 獲取一個(gè) StreamTableEnvironment, BatchTableEnvironment也是同樣的方法
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table 是簡(jiǎn)單的投影查詢(xún)的結(jié)果
Table projTable = tableEnv.scan("X").project(...);
// 將 Table projTable 注冊(cè)為表 "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// 獲取一個(gè)TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table 是簡(jiǎn)單的投影查詢(xún)的結(jié)果
val projTable: Table = tableEnv.scan("X").project(...)
// 將 Table projTable 注冊(cè)為表 "projectedX"
tableEnv.registerTable("projectedTable", projTable)
注意:一個(gè)注冊(cè)的Table被當(dāng)做是與關(guān)系型數(shù)據(jù)庫(kù)中的視圖類(lèi)似,即定義Table的查詢(xún)不會(huì)被優(yōu)化,但是當(dāng)其他查詢(xún)引用到已注冊(cè)的Table時(shí)會(huì)被內(nèi)聯(lián)。如果多個(gè)查詢(xún)引用同一個(gè)已注冊(cè)的Table,這個(gè)Table會(huì)跟每個(gè)查詢(xún)內(nèi)聯(lián)并進(jìn)行多次執(zhí)行,即:已注冊(cè)的Table的結(jié)果不會(huì)共享。
注冊(cè)一個(gè)TableSource
TableSource可以訪(fǎng)問(wèn)保存在外部存儲(chǔ)系統(tǒng)如數(shù)據(jù)庫(kù)系統(tǒng)(MySQL、HBase...),指定編碼格式的文件(CSV, Apache [Parquet, Avro, ORC],...)或者消息系統(tǒng)(Apache Kafka,RabbitMQ,...)中的數(shù)據(jù)。
Flink的目標(biāo)是為通用的數(shù)據(jù)格式和存儲(chǔ)系統(tǒng)提供TableSource,請(qǐng)參考Table Sources和Sinks頁(yè)來(lái)了解Flink所支持的TableSource列表及如何自定義一個(gè)TableSource。
一個(gè)TableSource可以在TableEnvironment中按如下方式來(lái)定義:
// 獲取一個(gè)StreamTableEnvironment, 同樣適用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 創(chuàng)建一個(gè) TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
// 將TableSource注冊(cè)為表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// 獲取一個(gè) TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 創(chuàng)建一個(gè)TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// 將 TableSource 注冊(cè)為表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
注冊(cè)一個(gè)外部Catalog(目錄)
一個(gè)外部目錄提供了關(guān)于外部數(shù)據(jù)庫(kù)和表的信息如:它們的名稱(chēng)、模式、統(tǒng)計(jì)及如何訪(fǎng)問(wèn)保存在外部數(shù)據(jù)庫(kù)、表和文件中的數(shù)據(jù)。
一個(gè)外部目錄可以通過(guò)實(shí)現(xiàn)ExternalCatalog接口來(lái)創(chuàng)建并在TableEnvironment中注冊(cè),如下:
// 獲取一個(gè) StreamTableEnvironment, 同樣適用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 創(chuàng)建一個(gè)外部catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();
// 注冊(cè) ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// 獲取一個(gè) TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 創(chuàng)建一個(gè) catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// 注冊(cè) ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
一旦在TableEnvironment中注冊(cè)之后,所有定義在ExternalCatalog中的表都可以通過(guò)指定全路徑如:catalog.database.table 在Table API或者SQL查詢(xún)來(lái)訪(fǎng)問(wèn)。
目前,F(xiàn)link提供InMemoryExternalCatalog來(lái)做demo或者測(cè)試。然而,ExternalCatalog接口還可以被用來(lái)連接HCatalog或者M(jìn)etastore到Table API。
查詢(xún)一個(gè)Table
Table API
Table API是一個(gè)Scala和Java的語(yǔ)言集成查詢(xún)API,與SQL相反,查詢(xún)并不指定為字符串而是根據(jù)主機(jī)語(yǔ)言一步一步的構(gòu)建。
Table API是基于Table類(lèi)來(lái)的,Table類(lèi)代表了一個(gè)流或者批表,并且提供方法來(lái)使用關(guān)系型操作。這些方法返回一個(gè)新的Table對(duì)象,這個(gè)Table對(duì)象代表著輸入的Table應(yīng)用關(guān)系型操作后的結(jié)果。一些關(guān)系型操作是由多個(gè)方法調(diào)用組成的如:table.groupBy(...).select(), 其中groupBy(...)指定了table的分組,而select(...)則是table分組的映射。
Table API文檔描述了streaming和batch表所支持的所有Table API操作。
下面的例子展示了一個(gè)簡(jiǎn)單的Table API聚合查詢(xún):
// 獲取一個(gè) StreamTableEnvironment, 同樣適用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 注冊(cè)一個(gè)名叫 Orders 的表
// 掃描注冊(cè)的 Orders 表
Table orders = tableEnv.scan("Orders");
// 計(jì)算所有來(lái)自法國(guó)的客戶(hù)的收入
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// 發(fā)射或者轉(zhuǎn)換一個(gè) Table
// 執(zhí)行查詢(xún)
// 獲取一個(gè) TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注冊(cè)一個(gè)名叫 Orders 的表
// 掃描已注冊(cè)的 Orders 表
Table orders = tableEnv.scan("Orders")
// 計(jì)算所有來(lái)自法國(guó)偶的客戶(hù)的收入
Table revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// 發(fā)射或者轉(zhuǎn)換一個(gè)Table
// 執(zhí)行查詢(xún)
注意:Scala Table API使用Scala的符號(hào)在引用表屬性時(shí),以'`'開(kāi)始,Table API使用Scala的隱式轉(zhuǎn)換,為了使用Scala的隱式轉(zhuǎn)換,請(qǐng)確保導(dǎo)入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._。
SQL
Flink的SQL集成是基于Apache Calcite的,Apache Calcite實(shí)現(xiàn)了標(biāo)準(zhǔn)的SQL,SQL查詢(xún)被指定為常規(guī)字符串。
SQL文檔描述了Flink對(duì)流和批表的SQL支持。
下面的例子展示了如何指定一個(gè)查詢(xún)并返回一個(gè)Table結(jié)果;
// 獲取一個(gè) StreamTableEnvironment, 同樣適用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 注冊(cè)一個(gè)名叫Orders 的表
// 計(jì)算所有來(lái)自法國(guó)的客戶(hù)的收入
Table revenue = tableEnv.sql(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// 發(fā)射或者轉(zhuǎn)換一個(gè)Table
// 執(zhí)行查詢(xún)
// 獲取一個(gè) TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注冊(cè)一個(gè)名叫 Orders的表
// 計(jì)算所有來(lái)自法國(guó)的客戶(hù)的收入
Table revenue = tableEnv.sql("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// 發(fā)射或者轉(zhuǎn)換 Table
// 執(zhí)行查詢(xún)
混合使用Table API和SQL
Table API和SQL查詢(xún)可以很容易地合并因?yàn)樗鼈兌挤祷豑able對(duì)象:
1、Table API查詢(xún)可以基于SQL查詢(xún)結(jié)果的Table來(lái)進(jìn)行
2、SQL查詢(xún)可以基于Table API查詢(xún)的結(jié)果來(lái)定義
發(fā)射一個(gè)Table
為了發(fā)射一個(gè)Table,可以將其寫(xiě)入一個(gè)TableSink中,TableSink 是支持各種文件格式(如:CSV, Apache Parquet, Apache Avro)、存儲(chǔ)系統(tǒng)(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者消息系統(tǒng)(如:Apache Kafka,RabbitMQ)的通用接口。
一個(gè)批Table只能寫(xiě)入BatchTableSink中,而流Table需要一個(gè)AppendStreamTableSink、RetractStreamTableSink或者UpsertStreamTableSink
請(qǐng)參考Table Sources & Sinks文檔來(lái)了解更多可用sink的信息和如何實(shí)現(xiàn)一個(gè)自定義的TableSink。
// 獲取一個(gè)StreamTableEnvironment, 同樣適用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 使用Table API和/或SQL查詢(xún)獲取一個(gè) Table
Table result = ...
// 創(chuàng)建一個(gè)TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
// 將結(jié)果Table寫(xiě)入TableSink中
result.writeToSink(sink);
// 執(zhí)行程序
// 獲取一個(gè)TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 使用Table API和/或SQL查詢(xún)獲取一個(gè) Table
val result: Table = ...
//創(chuàng)建一個(gè) TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// 將結(jié)果 Table寫(xiě)入TableSink中
result.writeToSink(sink)
// 執(zhí)行程序
翻譯和執(zhí)行一個(gè)查詢(xún)
Table API和SQL查詢(xún)根據(jù)輸入是流還是批翻譯成DataStream或者DataSet,查詢(xún)內(nèi)部表示為一個(gè)邏輯查詢(xún)計(jì)劃,并分兩個(gè)階段進(jìn)行翻譯:
1、優(yōu)化邏輯計(jì)劃
2、翻譯成一個(gè)DataStream或者DataSet程序
Table API或者SQL查詢(xún)會(huì)在下面情況下觸發(fā):
當(dāng)調(diào)用Table.writeToSink()時(shí),Table會(huì)發(fā)射到TableSink中
Table轉(zhuǎn)換DataStream或者DataSet時(shí)(參考與DataStream和DataSet API集成)
一旦翻譯,Table API或者SQL查詢(xún)就會(huì)像常規(guī)DataStream或DataSet處理一樣,并且當(dāng)StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute()調(diào)用時(shí)執(zhí)行。
與DataStream和DataSet API集成
Table API和SQL查詢(xún)可以很容易地進(jìn)行集成并嵌入到DataStream和DataSet程序中。例如:我們可以查詢(xún)一個(gè)外部表(如:來(lái)自關(guān)系型數(shù)據(jù)庫(kù)的表)、做一些預(yù)處理,如過(guò)濾、映射、聚合或者與元數(shù)據(jù)關(guān)聯(lián),然后使用DataStream或者DataSet API(及其他基于這些API的庫(kù),如CEP或Gelly)進(jìn)行進(jìn)一步處理。同樣,Table API或者SQL查詢(xún)也可以應(yīng)用于DataStream或者DataSet程序的結(jié)果中。
這種交互可以通過(guò)將DataStream或者DataSet轉(zhuǎn)換成一個(gè)Table及將Table轉(zhuǎn)換成DataStream或者DataSet來(lái)實(shí)現(xiàn)。在本節(jié),我們將描述這些轉(zhuǎn)換是如何完成的。
Scala 隱式轉(zhuǎn)換
Scala Table API為DataSet、DataStream和Table類(lèi)提供了隱式轉(zhuǎn)換功能。這些轉(zhuǎn)換可以通過(guò)導(dǎo)入Scala DataStream API中的org.apache.flink.table.api.scala._和org.apache.flink.api.scala._包來(lái)啟用。
注冊(cè)一個(gè)DataStream或者DataSet為T(mén)able
一個(gè)DataStream或者DataSet可以在TableEnvironment中注冊(cè)為T(mén)able,表的結(jié)果模式根據(jù)注冊(cè)的DataStream或者DataSet的數(shù)據(jù)類(lèi)型來(lái)定。請(qǐng)參考數(shù)據(jù)類(lèi)型映射到表模式來(lái)了解更詳細(xì)的信息。
// 獲取 StreamTableEnvironment
// 注冊(cè)一個(gè)DataSet 到BatchTableEnvironment也是等效的
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// 注冊(cè)DataStream 為表 "myTable" ,并有兩個(gè)字段 "f0", "f1"
tableEnv.registerDataStream("myTable", stream);
// 注冊(cè) DataStream 為表 "myTable2" 并有兩個(gè)字段 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// 獲取 TableEnvironment
// 注冊(cè)一個(gè) DataSet 是等價(jià)的
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 注冊(cè) DataStream 為表 "myTable" 并有兩個(gè)字段 "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// 注冊(cè) DataStream 為 "myTable2" 并有兩個(gè)字段 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
將Table轉(zhuǎn)換為DataStream或者DataSet
Table可以轉(zhuǎn)換為DataStream或者DataSet,這樣的話(huà),自定義的DataStream或者DataSet程序就可以基于Table API或者SQL查詢(xún)的結(jié)果來(lái)執(zhí)行了。
當(dāng)將一個(gè)Table轉(zhuǎn)換為DataStream或者DataSet時(shí),你需要指定生成的DataStream或者DataSet的數(shù)據(jù)類(lèi)型,即需要轉(zhuǎn)換表的行的數(shù)據(jù)類(lèi)型,通常最方便的轉(zhuǎn)換類(lèi)型是Row,下面列表概述了不同選項(xiàng)的功能:
1、Row:字段通過(guò)位置映射、可以是任意數(shù)量字段,支持空值,非類(lèi)型安全訪(fǎng)問(wèn)
2、POJO:字段通過(guò)名稱(chēng)(POJO字段作為T(mén)able字段時(shí),必須命名)映射,可以是任意數(shù)量字段,支持空值,類(lèi)型安全訪(fǎng)問(wèn)
3、Case Class:字段通過(guò)位置映射,不支持空值,類(lèi)型安全訪(fǎng)問(wèn)
4、Tuple:字段通過(guò)位置映射,不得多于22(Scala)或者25(Java)個(gè)字段,不支持空值,類(lèi)型安全訪(fǎng)問(wèn)
5、Atomic Type:Table必須有一個(gè)字段,不支持空值,類(lèi)型安全訪(fǎng)問(wèn)。
將Table轉(zhuǎn)換為DataStream
流式查詢(xún)的結(jié)果Table會(huì)被動(dòng)態(tài)地更新,即每個(gè)新的記錄到達(dá)輸入流時(shí)結(jié)果就會(huì)發(fā)生變化。因此,轉(zhuǎn)換此動(dòng)態(tài)查詢(xún)的DataStream需要對(duì)表的更新進(jìn)行編碼。
有兩種模式來(lái)將Table轉(zhuǎn)換為DataStream:
1、Append Mode:這種模式只適用于當(dāng)動(dòng)態(tài)表僅由INSERT更改修改時(shí),即僅附加,之前發(fā)射的結(jié)果不會(huì)被更新。
2、Retract Mode:始終都可以使用此模式,它使用一個(gè)boolean標(biāo)識(shí)來(lái)編碼INSERT和DELETE更改。
// 獲取一個(gè) StreamTableEnvironment.
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 有兩個(gè)字段(String name, Integer age)的Table
Table table = ...
// 通過(guò)指定類(lèi)將Table轉(zhuǎn)換為Row的Append DataStream
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 通過(guò)一個(gè)TypeInformation將Table轉(zhuǎn)換為T(mén)uple2<String, Integer> 類(lèi)型的Append DataStream
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 將Table轉(zhuǎn)換為Row的react形式的DataStream
// 一個(gè)reactDataStream的類(lèi)型X為 DataStream<Tuple2<Boolean, X>>.
// boolean字段指定了更改的類(lèi)型.
// True 是 INSERT, false 是 DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有關(guān)動(dòng)態(tài)表及其屬性的詳細(xì)討論在Streaming Queries文檔中給出。
將Table轉(zhuǎn)換為DataSet
Table可以按照如下方式轉(zhuǎn)換為DataSet:
// 獲取 BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 有兩個(gè)字段(String name, Integer age)的Table
Table table = ...
// 通過(guò)指定類(lèi)將Table轉(zhuǎn)換為Row類(lèi)型的DataSet
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 通過(guò)TypeInformation 將Table轉(zhuǎn)換為T(mén)uple2<String, Integer>類(lèi)型的DataSet
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 獲取 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 有兩個(gè)字段(String name, Integer age)的Table
val table: Table = ...
// 將Table轉(zhuǎn)換為Row類(lèi)型的DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// 將Table轉(zhuǎn)換為T(mén)uple2[String, Int]類(lèi)型的DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
數(shù)據(jù)類(lèi)型映射到表模式
Flink的DataStream和DataSet API支持多種數(shù)據(jù)類(lèi)型,如Tuple,POJO, case class及原始數(shù)據(jù)類(lèi)型。接下來(lái)我們描述Table API如何將這些類(lèi)型轉(zhuǎn)換為內(nèi)部行表示及展示將DataStream轉(zhuǎn)換為T(mén)able的例子。
原子類(lèi)型
Flink將原生類(lèi)型(如:Integer, Double, String)或者通用類(lèi)型(不能再被分析或者分解的類(lèi)型)視為原子類(lèi)型,一個(gè)原子類(lèi)型的DataStream或者DataSet可以轉(zhuǎn)換為只有一個(gè)屬性的Table,屬性的類(lèi)型根據(jù)原子類(lèi)型推算,并且必須得指定屬性的名稱(chēng)。
// 獲取一個(gè) StreamTableEnvironment, 同樣原理適用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Long> stream = ...
// 將 DataStream轉(zhuǎn)換為具有屬性"myLong"的Table
Table table = tableEnv.fromDataStream(stream, "myLong");
// 獲取一個(gè) TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[Long] = ...
// 將 DataStream 轉(zhuǎn)換為具有屬性'myLong的Table
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuple(Java和Scala都支持)和Case Class(僅Scala支持)
Flink支持Scala內(nèi)置的Tuple和Flink為Java提供的Tuple,DataStream和DataSet類(lèi)型的Tuple都可以被轉(zhuǎn)換為表。字段可以通過(guò)為所有字段(通過(guò)位置來(lái)映射)提供的名稱(chēng)來(lái)重命名,如果沒(méi)有為字段指定名稱(chēng)的話(huà),就會(huì)采用默認(rèn)的字段名。
// 獲取一個(gè) StreamTableEnvironment, 同樣適用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// 將 DataStream為具有字段名為"myLong", "myString"的Table
Table table1 = tableEnv.fromDataStream(stream, "myLong, myString");
// 將 DataStream 轉(zhuǎn)換為具有默認(rèn)字段名 "f0", "f1"的 Table
Table table2 = tableEnv.fromDataStream(stream);
//獲取一個(gè) TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 將 DataStream 轉(zhuǎn)換為具有字段名 'myLong, 'myString' 的Table
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// 將 DataStream 轉(zhuǎn)換為具有默認(rèn)字段名 '_1, '_2的Table
val table2: Table = tableEnv.fromDataStream(stream)
// 定義一個(gè) case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// 將 DataStream 轉(zhuǎn)換為具有默認(rèn)字段名 'name, 'age'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC)
// 將 DataStream 轉(zhuǎn)換為具有字段名 'myName, 'myAge'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
POJO(Java 和 Scala)
Flink支持使用POJO作為復(fù)合類(lèi)型,決定POJO規(guī)則的文檔請(qǐng)參考這里。
當(dāng)將一個(gè)POJO類(lèi)型的DataStream或者DataSet轉(zhuǎn)換為T(mén)able而不指定字段名稱(chēng)時(shí),Table的字段名稱(chēng)將采用JOPO原生的字段名稱(chēng)作為字段名稱(chēng)。重命名原始的POJO字段需要關(guān)鍵字AS,因?yàn)镻OJO沒(méi)有固定的順序,名稱(chēng)映射需要原始名稱(chēng)并且不能通過(guò)位置來(lái)完成。
//獲取一個(gè) StreamTableEnvironment, 同樣原理適用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Person 是一個(gè)有兩個(gè)字段"name" and "age" 的POJO
DataStream<Person> stream = ...
// 將 DataStream 轉(zhuǎn)換為有字段 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);
// 將 DataStream 轉(zhuǎn)換為有字段 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 獲取一個(gè) TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Person 是一個(gè)有字段 "name" and "age" 的POJO
val stream: DataStream[Person] = ...
// 將 DataStream 轉(zhuǎn)換為具有字段 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)
// 將 DataStream 轉(zhuǎn)換為具有字段 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
Row
Row數(shù)據(jù)類(lèi)型支持任意數(shù)量的字段,并且字段可以是null值,字段名稱(chēng)可以通過(guò)RowTypeInformation來(lái)指定或者將一個(gè)Row DataStream或者DataSet轉(zhuǎn)換為T(mén)able時(shí)(根據(jù)位置)指定。
// 獲取一個(gè) StreamTableEnvironment, 同樣原理適用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 在`RowTypeInfo`中指定字段"name" and "age"的Row類(lèi)型DataStream
DataStream<Row> stream = ...
// 將 DataStream 轉(zhuǎn)換為具有字段 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);
// 將 DataStream 轉(zhuǎn)換為具有字段 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "myName, myAge");
// 獲取一個(gè) TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 在`RowTypeInfo`中指定字段"name" and "age"的Row類(lèi)型DataStream
val stream: DataStream[Row] = ...
// 將 DataStream 轉(zhuǎn)換為具有字段 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)
// 將 DataStream 轉(zhuǎn)換為具有字段 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
查詢(xún)優(yōu)化
Apache Flink使用Apache Calcite來(lái)優(yōu)化和翻譯查詢(xún),當(dāng)前的查詢(xún)優(yōu)化包括投影、過(guò)濾下推、子查詢(xún)?nèi)ハ嚓P(guān)及各種形式的查詢(xún)重寫(xiě)。Flink不去優(yōu)化join的順序,但是會(huì)根據(jù)它們的順序去執(zhí)行(FROM子句中表的順序或者WHERE子句中連接謂詞的順序)。
可以通過(guò)提供一個(gè)CalciteConfig對(duì)象來(lái)調(diào)整在不同階段應(yīng)用的優(yōu)化規(guī)則集,這個(gè)可以通過(guò)調(diào)用CalciteConfig.createBuilder())獲得的builder來(lái)創(chuàng)建,并且可以通過(guò)調(diào)用tableEnv.getConfig.setCalciteConfig(calciteConfig)來(lái)提供給TableEnvironment。
解析一個(gè)Table
Table API為計(jì)算一個(gè)Table提供了一個(gè)機(jī)制來(lái)解析邏輯和優(yōu)化查詢(xún)計(jì)劃,這個(gè)可以通過(guò)調(diào)用TableEnvironment.explain(table)方法來(lái)完成,它會(huì)返回描述三個(gè)計(jì)劃的字符串:
1、關(guān)系查詢(xún)語(yǔ)法樹(shù),即未優(yōu)化的查詢(xún)計(jì)劃
2、優(yōu)化后的邏輯查詢(xún)計(jì)劃
3、物理執(zhí)行計(jì)劃
以下代碼顯示了一個(gè)示例和相應(yīng)的輸出:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)
輸出如下:
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, 'F%')])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, 'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE