一.整體概述
1.1 什么是 Table API 和 Flink SQL
??Flink本身是批流統(tǒng)一的處理框架,所以Table API和 SQL,就是批流統(tǒng)一的上層處理 API。目前 功能尚未完善 ,處于活躍的開發(fā)階段。
??Table API是一套內(nèi)嵌在 一套內(nèi)嵌在 一套內(nèi)嵌在 Java和 Scala語言中的 語言中的 查詢 API,它允許我們 以非常直觀的方式 組合來自一些關系運算符的查詢 (比如 (比如 select、filter和 join)。 而對于 Flink SQL,就是直接可以在代碼中寫中寫 SQL,來實現(xiàn)一些查詢( Query)操作。 Flink的 SQL支持 ,基于實現(xiàn) 基于實現(xiàn) 了 SQL標 準的 Apache Calcite(Apache開源 SQL解析工具) 。
??無論輸入是批 輸入還是流 式輸入,在 這兩套 這兩套 API中,指定的查詢都具有相同語義,得 到相同的結果。
1.2 需要引入的pom依賴
Table API和 SQL需要引入的依賴有兩個:planner和 bridge。
<!-- Table API 和 Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<!-- Table API 和 Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.0</version>
</dependency>
flink-table-planner:planner計劃器, 是 table API最主要的部分,提供了運行時環(huán)境和生成程序執(zhí)行計劃的 planner;
flink-table-api-java-bridge:bridge橋接器,主要負責 table API和 DataStream/DataSet API的連接支持,按照語言分 java和 scala。
這里的兩個依賴,是IDE環(huán)境下運行需要 環(huán)境下需要 添加的;如果是生產(chǎn)環(huán)境, lib目錄下默認已經(jīng)有了 planner,就只需要有 , bridge就可以了。
當然,如果想使用戶自定義函數(shù),或是跟 想使用戶自定義函數(shù),或是跟kafka做連接,需需要有一個 SQL client,這個 包含在 包含在 flink-table-common里。
1.3 兩種 planner(old & blink)的區(qū)別
批流統(tǒng)一: Blink將批處理作業(yè) ,視為流式處理的特殊情況。所以 ,blink不支持表和 DataSet之間的轉(zhuǎn)換,批處理作業(yè)將不轉(zhuǎn)換為DataSet應用程序,而是跟流處理一樣,轉(zhuǎn)換為DataStream程序來處理。
因為批流統(tǒng)一, Blink planner也不支持 BatchTableSource,而使用有界的StreamTableSource代替。
Blink planner只支持全新的目錄,不支持已棄用的ExternalCatalog。
舊planner和Blink planner的FilterableTableSource實現(xiàn)不兼容。舊的planner會把PlannerExpressions下推到filterableTableSource中,而blink
planner則會把Expressions下推.基于字符串的鍵值配置選項僅適用于Blink planner.
6.PlannerConfig在兩個planner中的實現(xiàn)不同。
Blink planner會將多個sink優(yōu)化在一個DAG中(僅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而舊planner的優(yōu)化總是將每一個sink放在一個新的DAG中,其中所有的DAG彼此獨立。
舊的planner不支持目錄統(tǒng)計,而Blink planner支持。
二.Table API和Flink SQL 測試樣例
代碼:
package org.flink.tableapi;
import org.flink.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author 只是甲
* @date 2021-09-26
*/
public class TableTest1_Example {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 讀取數(shù)據(jù)
DataStreamSource<String> inputStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkStudy\\src\\main\\resources\\sensor.txt");
// 2. 轉(zhuǎn)換成POJO
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 3. 創(chuàng)建表環(huán)境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 4. 基于流創(chuàng)建一張表
Table dataTable = tableEnv.fromDataStream(dataStream);
// 5. 調(diào)用table API進行轉(zhuǎn)換操作
Table resultTable = dataTable.select("id, temperature")
.where("id = 'sensor_1'");
// 6. 執(zhí)行SQL
Table sqlTable = dataTable.select("id, temperature");
tableEnv.registerTable("sensor", sqlTable);
// Flink 1.10之后可以生產(chǎn)一個臨時視圖,無需上面那么麻煩
// tableEnv.createTemporaryView("sensor", dataTable);
String sql = "select id, temperature from sensor where id = 'sensor_1'";
Table resultSqlTable = tableEnv.sqlQuery(sql);
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
env.execute();
}
}
測試記錄:
