Apache Flink提供了兩種頂層的關(guān)系型API,分別為Table API和SQL,F(xiàn)link通過(guò)Table API&SQL實(shí)現(xiàn)了批流統(tǒng)一。其中Table API是用于Scala和Java的語(yǔ)言集成查詢API,它允許以非常直觀的方式組合關(guān)系運(yùn)算符(例如select,where和join)的查詢。Flink SQL基于Apache Calcite 實(shí)現(xiàn)了標(biāo)準(zhǔn)的SQL,用戶可以使用標(biāo)準(zhǔn)的SQL處理數(shù)據(jù)集。Table API和SQL與Flink的DataStream和DataSet API緊密集成在一起,用戶可以實(shí)現(xiàn)相互轉(zhuǎn)化,比如可以將DataStream或者DataSet注冊(cè)為table進(jìn)行操作數(shù)據(jù)。值得注意的是,Table API and SQL目前尚未完全完善,還在積極的開發(fā)中,所以并不是所有的算子操作都可以通過(guò)其實(shí)現(xiàn)。
依賴
從Flink1.9開始,F(xiàn)link為Table & SQL API提供了兩種planner,分別為Blink planner和old planner,其中old planner是在Flink1.9之前的版本使用。主要區(qū)別如下:
尖叫提示:對(duì)于生產(chǎn)環(huán)境,目前推薦使用old planner.
-
flink-table-common: 通用模塊,包含 Flink Planner 和 Blink Planner 一些共用的代碼 -
flink-table-api-java: java語(yǔ)言的Table & SQL API,僅針對(duì)table(處于早期的開發(fā)階段,不推薦使用) -
flink-table-api-scala: scala語(yǔ)言的Table & SQL API,僅針對(duì)table(處于早期的開發(fā)階段,不推薦使用) -
flink-table-api-java-bridge: java語(yǔ)言的Table & SQL API,支持DataStream/DataSet API(推薦使用) -
flink-table-api-scala-bridge: scala語(yǔ)言的Table & SQL API,支持DataStream/DataSet API(推薦使用) -
flink-table-planner:planner 和runtime. planner為Flink1,9之前的old planner(推薦使用) -
flink-table-planner-blink: 新的Blink planner. -
flink-table-runtime-blink: 新的Blink runtime. -
flink-table-uber: 將上述的API模塊及old planner打成一個(gè)jar包,形如flink-table-*.jar,位與/lib目錄下 -
flink-table-uber-blink:將上述的API模塊及Blink 模塊打成一個(gè)jar包,形如fflink-table-blink-*.jar,位與/lib目錄下
Blink planner & old planner
Blink planner和old planner有許多不同的特點(diǎn),具體列舉如下:
- Blink planner將批處理作業(yè)看做是流處理作業(yè)的特例。所以,不支持Table 與DataSet之間的轉(zhuǎn)換,批處理的作業(yè)也不會(huì)被轉(zhuǎn)成DataSet程序,而是被轉(zhuǎn)為DataStream程序。
- Blink planner不支持
BatchTableSource,使用的是有界的StreamTableSource。 - Blink planner僅支持新的
Catalog,不支持ExternalCatalog(已過(guò)時(shí))。 - 對(duì)于FilterableTableSource的實(shí)現(xiàn),兩種Planner是不同的。old planner會(huì)謂詞下推到
PlannerExpression(未來(lái)會(huì)被移除),而Blink planner 會(huì)謂詞下推到Expression(表示一個(gè)產(chǎn)生計(jì)算結(jié)果的邏輯樹)。 - 僅僅Blink planner支持key-value形式的配置,即通過(guò)Configuration進(jìn)行參數(shù)設(shè)置。
- 關(guān)于PlannerConfig的實(shí)現(xiàn),兩種planner有所不同。
- Blink planner 會(huì)將多個(gè)sink優(yōu)化成一個(gè)DAG(僅支持TableEnvironment,StreamTableEnvironment不支持),old planner總是將每一個(gè)sink優(yōu)化成一個(gè)新的DAG,每一個(gè)DAG都是相互獨(dú)立的。
- old planner不支持catalog統(tǒng)計(jì),Blink planner支持catalog統(tǒng)計(jì)。
Flink Table & SQL程序的pom依賴
根據(jù)使用的語(yǔ)言不同,可以選擇下面的依賴,包括scala版和java版,如下:
<!-- java版 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- scala版 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
除此之外,如果需要在本地的IDE中運(yùn)行Table API & SQL的程序,則需要添加下面的pom依賴:
<!-- Flink 1.9之前的old planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- 新的Blink planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
另外,如果需要實(shí)現(xiàn)自定義的格式(比如和kafka交互)或者用戶自定義函數(shù),需要添加如下依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
Table API & SQL的編程模板
所有的Table API&SQL的程序(無(wú)論是批處理還是流處理)都有著相同的形式,下面將給出通用的編程結(jié)構(gòu)形式:
// 創(chuàng)建一個(gè)TableEnvironment對(duì)象,指定planner、處理模式(batch、streaming)
TableEnvironment tableEnv = ...;
// 創(chuàng)建一個(gè)表
tableEnv.connect(...).createTemporaryTable("table1");
// 注冊(cè)一個(gè)外部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通過(guò)Table API的查詢創(chuàng)建一個(gè)Table 對(duì)象
Table tapiResult = tableEnv.from("table1").select(...);
// 通過(guò)SQL查詢的查詢創(chuàng)建一個(gè)Table 對(duì)象
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// 將結(jié)果寫入TableSink
tapiResult.insertInto("outputTable");
// 執(zhí)行
tableEnv.execute("java_job");
注意:Table API & SQL的查詢可以相互集成,另外還可以在DataStream或者DataSet中使用Table API & SQL的API,實(shí)現(xiàn)DataStreams、 DataSet與Table之間的相互轉(zhuǎn)換。
創(chuàng)建TableEnvironment
TableEnvironment是Table API & SQL程序的一個(gè)入口,主要包括如下的功能:
- 在內(nèi)部的catalog中注冊(cè)Table
- 注冊(cè)catalog
- 加載可插拔模塊
- 執(zhí)行SQL查詢
- 注冊(cè)用戶定義函數(shù)
-
DataStream、DataSet與Table之間的相互轉(zhuǎn)換 - 持有對(duì)
ExecutionEnvironment、StreamExecutionEnvironment的引用
一個(gè)Table必定屬于一個(gè)具體的TableEnvironment,不可以將不同TableEnvironment的表放在一起使用(比如join,union等操作)。
TableEnvironment是通過(guò)調(diào)用 BatchTableEnvironment.create() 或者StreamTableEnvironment.create()的靜態(tài)方法進(jìn)行創(chuàng)建的。另外,默認(rèn)兩個(gè)planner的jar包都存在與classpath下,所有需要明確指定使用的planner。
// **********************
// FLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
//或者TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK 批處理查詢
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// 或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK 批處理查詢
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
在catalog中創(chuàng)建表
臨時(shí)表與永久表
表可以分為臨時(shí)表和永久表兩種,其中永久表需要一個(gè)catalog(比如Hive的Metastore)倆維護(hù)表的元數(shù)據(jù)信息,一旦永久表被創(chuàng)建,只要連接到該catalog就可以訪問(wèn)該表,只有顯示刪除永久表,該表才可以被刪除。臨時(shí)表的生命周期是Flink Session,這些表不能夠被其他的Flink Session訪問(wèn),這些表不屬于任何的catalog或者數(shù)據(jù)庫(kù),如果與臨時(shí)表相對(duì)應(yīng)的數(shù)據(jù)庫(kù)被刪除了,該臨時(shí)表也不會(huì)被刪除。
創(chuàng)建表
虛表(Virtual Tables)
一個(gè)Table對(duì)象相當(dāng)于SQL中的視圖(虛表),它封裝了一個(gè)邏輯執(zhí)行計(jì)劃,可以通過(guò)一個(gè)catalog創(chuàng)建,具體如下:
// 獲取一個(gè)TableEnvironment
TableEnvironment tableEnv = ...;
// table對(duì)象,查詢的結(jié)果集
Table projTable = tableEnv.from("X").select(...);
// 注冊(cè)一個(gè)表,名稱為 "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
外部數(shù)據(jù)源表(Connector Tables)
可以把外部的數(shù)據(jù)源注冊(cè)成表,比如可以讀取MySQL數(shù)據(jù)庫(kù)數(shù)據(jù)、Kafka數(shù)據(jù)等
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
擴(kuò)展創(chuàng)建表的標(biāo)識(shí)屬性
表的注冊(cè)總是包含三部分標(biāo)識(shí)屬性:catalog、數(shù)據(jù)庫(kù)、表名。用戶可以在內(nèi)部設(shè)置一個(gè)catalog和一個(gè)數(shù)據(jù)庫(kù)作為當(dāng)前的catalog和數(shù)據(jù)庫(kù),所以對(duì)于catalog和數(shù)據(jù)庫(kù)這兩個(gè)標(biāo)識(shí)屬性是可選的,即如果不指定,默認(rèn)使用的是“current catalog”和 “current database”。
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");//設(shè)置catalog
tEnv.useDatabase("custom_database");//設(shè)置數(shù)據(jù)庫(kù)
Table table = ...;
// 注冊(cè)一個(gè)名為exampleView的視圖,catalog名為custom_catalog
// 數(shù)據(jù)庫(kù)的名為custom_database
tableEnv.createTemporaryView("exampleView", table);
// 注冊(cè)一個(gè)名為exampleView的視圖,catalog的名為custom_catalog
// 數(shù)據(jù)庫(kù)的名為other_database
tableEnv.createTemporaryView("other_database.exampleView", table);
// 注冊(cè)一個(gè)名為'View'的視圖,catalog的名稱為custom_catalog
// 數(shù)據(jù)庫(kù)的名為custom_database,'View'是保留關(guān)鍵字,需要使用``(反引號(hào))
tableEnv.createTemporaryView("`View`", table);
// 注冊(cè)一個(gè)名為example.View的視圖,catalog的名為custom_catalog,
// 數(shù)據(jù)庫(kù)名為custom_database
tableEnv.createTemporaryView("`example.View`", table);
// 注冊(cè)一個(gè)名為'exampleView'的視圖, catalog的名為'other_catalog'
// 數(shù)據(jù)庫(kù)名為other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
查詢表
Table API
Table API是一個(gè)集成Scala與Java語(yǔ)言的查詢API,與SQL相比,它的查詢不是一個(gè)標(biāo)準(zhǔn)的SQL語(yǔ)句,而是由一步一步的操作組成的。如下展示了一個(gè)使用Table API實(shí)現(xiàn)一個(gè)簡(jiǎn)單的聚合查詢。
// 獲取TableEnvironment
TableEnvironment tableEnv = ...;
//注冊(cè)O(shè)rders表
// 查詢注冊(cè)的表
Table orders = tableEnv.from("Orders");
// 計(jì)算操作
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
SQL
Flink SQL依賴于Apache Calcite,其實(shí)現(xiàn)了標(biāo)準(zhǔn)的SQL語(yǔ)法,如下案例:
// 獲取TableEnvironment
TableEnvironment tableEnv = ...;
//注冊(cè)O(shè)rders表
// 計(jì)算邏輯同上面的Table API
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// 注冊(cè)"RevenueFrance"外部輸出表
// 計(jì)算結(jié)果插入"RevenueFrance"表
tableEnv.sqlUpdate(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
輸出表
一個(gè)表通過(guò)將其寫入到TableSink,然后進(jìn)行輸出。TableSink是一個(gè)通用的支持多種文件格式(CSV、Parquet, Avro)和多種外部存儲(chǔ)系統(tǒng)(JDBC, Apache HBase, Apache Cassandra, Elasticsearch)以及多種消息對(duì)列(Apache Kafka, RabbitMQ)的接口。
批處理的表只能被寫入到 BatchTableSink,流處理的表需要指明AppendStreamTableSink、RetractStreamTableSink或者 UpsertStreamTableSink
// 獲取TableEnvironment
TableEnvironment tableEnv = ...;
// 創(chuàng)建輸出表
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.LONG());
tableEnv.connect(new FileSystem("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// 計(jì)算結(jié)果表
Table result = ...
// 輸出結(jié)果表到注冊(cè)的TableSink
result.insertInto("CsvSinkTable");
Table API & SQL底層的轉(zhuǎn)換與執(zhí)行
上文提到了Flink提供了兩種planner,分別為old planner和Blink planner,對(duì)于不同的planner而言,Table API & SQL底層的執(zhí)行與轉(zhuǎn)換是有所不同的。
Old planner
根據(jù)是流處理作業(yè)還是批處理作業(yè),Table API &SQL會(huì)被轉(zhuǎn)換成DataStream或者DataSet程序。一個(gè)查詢?cè)趦?nèi)部表示為一個(gè)邏輯查詢計(jì)劃,會(huì)被轉(zhuǎn)換為兩個(gè)階段:
- 1.邏輯查詢計(jì)劃優(yōu)化
- 2.轉(zhuǎn)換成DataStream或者DataSet程序
上面的兩個(gè)階段只有下面的操作被執(zhí)行時(shí)才會(huì)被執(zhí)行:
- 當(dāng)一個(gè)表被輸出到TableSink時(shí),比如調(diào)用了Table.insertInto()方法
- 當(dāng)執(zhí)行更新查詢時(shí),比如調(diào)用TableEnvironment.sqlUpdate()方法
- 當(dāng)一個(gè)表被轉(zhuǎn)換為DataStream或者DataSet時(shí)
一旦執(zhí)行上述兩個(gè)階段,Table API & SQL的操作會(huì)被看做是普通的DataStream或者DataSet程序,所以當(dāng)StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute() 被調(diào)用時(shí),會(huì)執(zhí)行轉(zhuǎn)換后的程序。
Blink planner
無(wú)論是批處理作業(yè)還是流處理作業(yè),如果使用的是Blink planner,底層都會(huì)被轉(zhuǎn)換為DataStream程序。在一個(gè)查詢?cè)趦?nèi)部表示為一個(gè)邏輯查詢計(jì)劃,會(huì)被轉(zhuǎn)換成兩個(gè)階段:
- 1.邏輯查詢計(jì)劃優(yōu)化
- 2.轉(zhuǎn)換成DataStream程序
對(duì)于TableEnvironment and StreamTableEnvironment而言,一個(gè)查詢的轉(zhuǎn)換是不同的
首先對(duì)于TableEnvironment,當(dāng)TableEnvironment.execute()方法執(zhí)行時(shí),Table API & SQL的查詢才會(huì)被轉(zhuǎn)換,因?yàn)門ableEnvironment會(huì)將多個(gè)sink優(yōu)化為一個(gè)DAG。
對(duì)于StreamTableEnvironment,轉(zhuǎn)換發(fā)生的時(shí)間與old planner相同。
與DataStream & DataSet API集成
對(duì)于Old planner與Blink planner而言,只要是流處理的操作,都可以與DataStream API集成,僅僅只有Old planner才可以與DataSet API集成,由于Blink planner的批處理作業(yè)會(huì)被轉(zhuǎn)換成DataStream程序,所以不能夠與DataSet API集成。值得注意的是,下面提到的table與DataSet之間的轉(zhuǎn)換僅適用于Old planner。
Table API & SQL的查詢很容易與DataStream或者DataSet程序集成,并可以將Table API & SQL的查詢嵌入DataStream或者DataSet程序中。DataStream或者DataSet可以轉(zhuǎn)換成表,反之,表也可以被轉(zhuǎn)換成DataStream或者DataSet。
從DataStream或者DataSet中注冊(cè)臨時(shí)表(視圖)
尖叫提示:只能將DataStream或者DataSet轉(zhuǎn)換為臨時(shí)表(視圖)
下面演示DataStream的轉(zhuǎn)換,對(duì)于DataSet的轉(zhuǎn)換類似。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream注冊(cè)為一個(gè)名為myTable的視圖,其中字段分別為"f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// 將DataStream注冊(cè)為一個(gè)名為myTable2的視圖,其中字段分別為"myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");
將DataStream或者DataSet轉(zhuǎn)化為Table對(duì)象
可以直接將DataStream或者DataSet轉(zhuǎn)換為Table對(duì)象,之后可以使用Table API進(jìn)行查詢操作。下面演示DataStream的轉(zhuǎn)換,對(duì)于DataSet的轉(zhuǎn)換類似。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉(zhuǎn)換為Table對(duì)象,默認(rèn)的字段為"f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)換為Table對(duì)象,默認(rèn)的字段為"myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
將表轉(zhuǎn)換為DataStream或者DataSet
當(dāng)將Table轉(zhuǎn)為DataStream或者DataSet時(shí),需要指定DataStream或者DataSet的數(shù)據(jù)類型。通常最方便的數(shù)據(jù)類型是row類型,F(xiàn)link提供了很多的數(shù)據(jù)類型供用戶選擇,具體包括Row、POJO、樣例類、Tuple和原子類型。
將表轉(zhuǎn)換為DataStream
一個(gè)流處理查詢的結(jié)果是動(dòng)態(tài)變化的,所以將表轉(zhuǎn)為DataStream時(shí)需要指定一個(gè)更新模式,共有兩種模式:Append Mode和Retract Mode。
- Append Mode
如果動(dòng)態(tài)表僅只有Insert操作,即之前輸出的結(jié)果不會(huì)被更新,則使用該模式。如果更新或刪除操作使用追加模式會(huì)失敗報(bào)錯(cuò)
- Retract Mode
始終可以使用此模式。返回值是boolean類型。它用true或false來(lái)標(biāo)記數(shù)據(jù)的插入和撤回,返回true代表數(shù)據(jù)插入,false代表數(shù)據(jù)的撤回。
// 獲取StreamTableEnvironment.
StreamTableEnvironment tableEnv = ...;
// 包含兩個(gè)字段的表(String name, Integer age)
Table table = ...
// 將表轉(zhuǎn)為DataStream,使用Append Mode追加模式,數(shù)據(jù)類型為Row
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 將表轉(zhuǎn)為DataStream,使用Append Mode追加模式,數(shù)據(jù)類型為定義好的TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 將表轉(zhuǎn)為DataStream,使用的模式為Retract Mode撤回模式,類型為Row
// 對(duì)于轉(zhuǎn)換后的DataStream<Tuple2<Boolean, X>>,X表示流的數(shù)據(jù)類型,
// boolean值表示數(shù)據(jù)改變的類型,其中INSERT返回true,DELETE返回的是false
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
將表轉(zhuǎn)換為DataSet
// 獲取BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 包含兩個(gè)字段的表(String name, Integer age)
Table table = ...
// 將表轉(zhuǎn)為DataSet數(shù)據(jù)類型為Row
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 將表轉(zhuǎn)為DataSet,通過(guò)TypeInformation定義Tuple2<String, Integer>數(shù)據(jù)類型
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
tableEnv.toDataSet(table, tupleType);
表的Schema與數(shù)據(jù)類型之間的映射
表的Schema與數(shù)據(jù)類型之間的映射有兩種方式:分別是基于字段下標(biāo)位置的映射和基于字段名稱的映射。
基于字段下標(biāo)位置的映射
該方式是按照字段的順序進(jìn)行一一映射,使用方式如下:
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,選取tuple的第一個(gè)元素,指定一個(gè)名為"myLong"的字段名
Table table = tableEnv.fromDataStream(stream, "myLong");
// 將DataStream轉(zhuǎn)為表,為tuple的第一個(gè)元素指定名為"myLong",為第二個(gè)元素指定myInt的字段名
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
基于字段名稱的映射
基于字段名稱的映射方式支持任意的數(shù)據(jù)類型包括POJO類型,可以很靈活地定義表Schema映射,所有的字段被映射成一個(gè)具體的字段名稱,同時(shí)也可以使用"as"為字段起一個(gè)別名。其中Tuple元素的第一個(gè)元素為f0,第二個(gè)元素為f1,以此類推。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,選擇tuple的第二個(gè)元素,指定一個(gè)名為"f1"的字段名
Table table = tableEnv.fromDataStream(stream, "f1");
// 將DataStream轉(zhuǎn)為表,交換字段的順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 將DataStream轉(zhuǎn)為表,交換字段的順序,并為f1起別名為"myInt",為f0起別名為"myLong
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
原子類型
Flink將Integer, Double, String或者普通的類型稱之為原子類型,一個(gè)數(shù)據(jù)類型為原子類型的DataStream或者DataSet可以被轉(zhuǎn)成單個(gè)字段屬性的表,這個(gè)字段的類型與DataStream或者DataSet的數(shù)據(jù)類型一致,這個(gè)字段的名稱可以進(jìn)行指定。
//獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
// 數(shù)據(jù)類型為原子類型Long
DataStream<Long> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為"f0"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,指定字段名為myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
Tuple類型
Tuple類型的DataStream或者DataSet都可以轉(zhuǎn)為表,可以重新設(shè)定表的字段名(即根據(jù)tuple元素的位置進(jìn)行一一映射,轉(zhuǎn)為表之后,每個(gè)元素都有一個(gè)別名),如果不為字段指定名稱,則使用默認(rèn)的名稱(java語(yǔ)言默認(rèn)的是f0,f1,scala默認(rèn)的是_1),用戶也可以重新排列字段的順序,并為每個(gè)字段起一個(gè)別名。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
//Tuple2<Long, String>類型的DataStream
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為 "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,指定字段名為 "myLong", "myString"(按照Tuple元素的順序位置)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// 將DataStream轉(zhuǎn)為表,指定字段名為 "f0", "f1",并且交換順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 將DataStream轉(zhuǎn)為表,只選擇Tuple的第二個(gè)元素,指定字段名為"f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// 將DataStream轉(zhuǎn)為表,為Tuple的第二個(gè)元素指定別名為myString,為第一個(gè)元素指定字段名為myLong
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
POJO類型
當(dāng)將POJO類型的DataStream或者DataSet轉(zhuǎn)為表時(shí),如果不指定表名,則默認(rèn)使用的是POJO字段本身的名稱,原始字段名稱的映射需要指定原始字段的名稱,可以為其起一個(gè)別名,也可以調(diào)換字段的順序,也可以只選擇部分的字段。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
//數(shù)據(jù)類型為Person的POJO類型,字段包括"name"和"age"
DataStream<Person> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名稱為"age", "name"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,為"age"字段指定別名myAge, 為"name"字段指定別名myName
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
// 將DataStream轉(zhuǎn)為表,只選擇一個(gè)name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 將DataStream轉(zhuǎn)為表,只選擇一個(gè)name字段,并起一個(gè)別名myName
Table table = tableEnv.fromDataStream(stream, "name as myName");
Row類型
Row類型的DataStream或者DataSet轉(zhuǎn)為表的過(guò)程中,可以根據(jù)字段的位置或者字段名稱進(jìn)行映射,同時(shí)也可以為字段起一個(gè)別名,或者只選擇部分字段。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
// Row類型的DataStream,通過(guò)RowTypeInfo指定兩個(gè)字段"name"和"age"
DataStream<Row> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為原始字段名"name"和"age"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,根據(jù)位置映射,為第一個(gè)字段指定myName別名,為第二個(gè)字段指定myAge別名
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// 將DataStream轉(zhuǎn)為表,根據(jù)字段名映射,為name字段起別名myName,為age字段起別名myAge
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 將DataStream轉(zhuǎn)為表,根據(jù)字段名映射,只選擇name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 將DataStream轉(zhuǎn)為表,根據(jù)字段名映射,只選擇name字段,并起一個(gè)別名"myName"
Table table = tableEnv.fromDataStream(stream, "name as myName");
查詢優(yōu)化
Old planner
Apache Flink利用Apache Calcite來(lái)優(yōu)化和轉(zhuǎn)換查詢。當(dāng)前執(zhí)行的優(yōu)化包括投影和過(guò)濾器下推,去相關(guān)子查詢以及其他類型的查詢重寫。Old Planner目前不支持優(yōu)化JOIN的順序,而是按照查詢中定義的順序執(zhí)行它們。
通過(guò)提供一個(gè)CalciteConfig對(duì)象,可以調(diào)整在不同階段應(yīng)用的優(yōu)化規(guī)則集。這可通過(guò)調(diào)用CalciteConfig.createBuilder()方法來(lái)進(jìn)行創(chuàng)建,并通過(guò)調(diào)用tableEnv.getConfig.setPlannerConfig(calciteConfig)方法將該對(duì)象傳遞給TableEnvironment。
Blink planner
Apache Flink利用并擴(kuò)展了Apache Calcite來(lái)執(zhí)行復(fù)雜的查詢優(yōu)化。這包括一系列基于規(guī)則和基于成本的優(yōu)化(cost_based),例如:
- 基于Apache Calcite的去相關(guān)子查詢
- 投影裁剪
- 分區(qū)裁剪
- 過(guò)濾器謂詞下推
- 過(guò)濾器下推
- 子計(jì)劃重復(fù)數(shù)據(jù)刪除以避免重復(fù)計(jì)算
- 特殊的子查詢重寫,包括兩個(gè)部分:
- 將IN和EXISTS轉(zhuǎn)換為左半聯(lián)接( left semi-join)
- 將NOT IN和NOT EXISTS轉(zhuǎn)換為left anti-join
- 調(diào)整join的順序,需要啟用
table.optimizer.join-reorder-enabled
注意: IN / EXISTS / NOT IN / NOT EXISTS當(dāng)前僅在子查詢重寫的結(jié)合條件下受支持。
查詢優(yōu)化器不僅基于計(jì)劃,而且還可以基于數(shù)據(jù)源的統(tǒng)計(jì)信息以及每個(gè)操作的細(xì)粒度開銷(例如io,cpu,網(wǎng)絡(luò)和內(nèi)存),從而做出更加明智且合理的優(yōu)化決策。
高級(jí)用戶可以通過(guò)CalciteConfig對(duì)象提供自定義優(yōu)化規(guī)則,通過(guò)調(diào)用tableEnv.getConfig.setPlannerConfig(calciteConfig),將參數(shù)傳遞給TableEnvironment。
查看執(zhí)行計(jì)劃
SQL語(yǔ)言支持通過(guò)explain來(lái)查看某條SQL的執(zhí)行計(jì)劃,F(xiàn)link Table API也可以通過(guò)調(diào)用explain()方法來(lái)查看具體的執(zhí)行計(jì)劃。該方法返回一個(gè)字符串用來(lái)描述三個(gè)部分計(jì)劃,分別為:
- 關(guān)系查詢的抽象語(yǔ)法樹,即未優(yōu)化的邏輯查詢計(jì)劃,
- 優(yōu)化的邏輯查詢計(jì)劃
- 實(shí)際執(zhí)行計(jì)劃
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
// 查看執(zhí)行計(jì)劃
String explanation = tEnv.explain(table);
System.out.println(explanation);
執(zhí)行計(jì)劃的結(jié)果為:
== 抽象語(yǔ)法樹 ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== 優(yōu)化的邏輯執(zhí)行計(jì)劃 ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== 物理執(zhí)行計(jì)劃 ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
小結(jié)
本文主要介紹了Flink TableAPI &SQL,首先介紹了Flink Table API &SQL的基本概念 ,然后介紹了構(gòu)建Flink Table API & SQL程序所需要的依賴,接著介紹了Flink的兩種planner,還介紹了如何注冊(cè)表以及DataStream、DataSet與表的相互轉(zhuǎn)換,最后介紹了Flink的兩種planner對(duì)應(yīng)的查詢優(yōu)化并給出了一個(gè)查看執(zhí)行計(jì)劃的案例。
公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)』,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包