Flink Table API & SQL編程指南(1)

Apache Flink提供了兩種頂層的關(guān)系型API,分別為Table API和SQL,F(xiàn)link通過(guò)Table API&SQL實(shí)現(xiàn)了批流統(tǒng)一。其中Table API是用于Scala和Java的語(yǔ)言集成查詢API,它允許以非常直觀的方式組合關(guān)系運(yùn)算符(例如select,where和join)的查詢。Flink SQL基于Apache Calcite 實(shí)現(xiàn)了標(biāo)準(zhǔn)的SQL,用戶可以使用標(biāo)準(zhǔn)的SQL處理數(shù)據(jù)集。Table API和SQL與Flink的DataStream和DataSet API緊密集成在一起,用戶可以實(shí)現(xiàn)相互轉(zhuǎn)化,比如可以將DataStream或者DataSet注冊(cè)為table進(jìn)行操作數(shù)據(jù)。值得注意的是,Table API and SQL目前尚未完全完善,還在積極的開發(fā)中,所以并不是所有的算子操作都可以通過(guò)其實(shí)現(xiàn)。

依賴

從Flink1.9開始,F(xiàn)link為Table & SQL API提供了兩種planner,分別為Blink planner和old planner,其中old planner是在Flink1.9之前的版本使用。主要區(qū)別如下:

尖叫提示:對(duì)于生產(chǎn)環(huán)境,目前推薦使用old planner.

  • flink-table-common: 通用模塊,包含 Flink Planner 和 Blink Planner 一些共用的代碼
  • flink-table-api-java: java語(yǔ)言的Table & SQL API,僅針對(duì)table(處于早期的開發(fā)階段,不推薦使用)
  • flink-table-api-scala: scala語(yǔ)言的Table & SQL API,僅針對(duì)table(處于早期的開發(fā)階段,不推薦使用)
  • flink-table-api-java-bridge: java語(yǔ)言的Table & SQL API,支持DataStream/DataSet API(推薦使用)
  • flink-table-api-scala-bridge: scala語(yǔ)言的Table & SQL API,支持DataStream/DataSet API(推薦使用)
  • flink-table-planner:planner 和runtime. planner為Flink1,9之前的old planner(推薦使用)
  • flink-table-planner-blink: 新的Blink planner.
  • flink-table-runtime-blink: 新的Blink runtime.
  • flink-table-uber: 將上述的API模塊及old planner打成一個(gè)jar包,形如flink-table-*.jar,位與/lib目錄下
  • flink-table-uber-blink:將上述的API模塊及Blink 模塊打成一個(gè)jar包,形如fflink-table-blink-*.jar,位與/lib目錄下

Blink planner & old planner

Blink planner和old planner有許多不同的特點(diǎn),具體列舉如下:

  • Blink planner將批處理作業(yè)看做是流處理作業(yè)的特例。所以,不支持Table 與DataSet之間的轉(zhuǎn)換,批處理的作業(yè)也不會(huì)被轉(zhuǎn)成DataSet程序,而是被轉(zhuǎn)為DataStream程序。
  • Blink planner不支持 BatchTableSource,使用的是有界的StreamTableSource。
  • Blink planner僅支持新的 Catalog,不支持ExternalCatalog (已過(guò)時(shí))。
  • 對(duì)于FilterableTableSource的實(shí)現(xiàn),兩種Planner是不同的。old planner會(huì)謂詞下推到PlannerExpression(未來(lái)會(huì)被移除),而Blink planner 會(huì)謂詞下推到 Expression(表示一個(gè)產(chǎn)生計(jì)算結(jié)果的邏輯樹)。
  • 僅僅Blink planner支持key-value形式的配置,即通過(guò)Configuration進(jìn)行參數(shù)設(shè)置。
  • 關(guān)于PlannerConfig的實(shí)現(xiàn),兩種planner有所不同。
  • Blink planner 會(huì)將多個(gè)sink優(yōu)化成一個(gè)DAG(僅支持TableEnvironment,StreamTableEnvironment不支持),old planner總是將每一個(gè)sink優(yōu)化成一個(gè)新的DAG,每一個(gè)DAG都是相互獨(dú)立的。
  • old planner不支持catalog統(tǒng)計(jì),Blink planner支持catalog統(tǒng)計(jì)。

Flink Table & SQL程序的pom依賴

根據(jù)使用的語(yǔ)言不同,可以選擇下面的依賴,包括scala版和java版,如下:

<!-- java版 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- scala版 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

除此之外,如果需要在本地的IDE中運(yùn)行Table API & SQL的程序,則需要添加下面的pom依賴:

<!-- Flink 1.9之前的old planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- 新的Blink planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

另外,如果需要實(shí)現(xiàn)自定義的格式(比如和kafka交互)或者用戶自定義函數(shù),需要添加如下依賴:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

Table API & SQL的編程模板

所有的Table API&SQL的程序(無(wú)論是批處理還是流處理)都有著相同的形式,下面將給出通用的編程結(jié)構(gòu)形式:

// 創(chuàng)建一個(gè)TableEnvironment對(duì)象,指定planner、處理模式(batch、streaming)
TableEnvironment tableEnv = ...; 
// 創(chuàng)建一個(gè)表
tableEnv.connect(...).createTemporaryTable("table1");
// 注冊(cè)一個(gè)外部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通過(guò)Table API的查詢創(chuàng)建一個(gè)Table 對(duì)象
Table tapiResult = tableEnv.from("table1").select(...);
// 通過(guò)SQL查詢的查詢創(chuàng)建一個(gè)Table 對(duì)象
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// 將結(jié)果寫入TableSink
tapiResult.insertInto("outputTable");
// 執(zhí)行
tableEnv.execute("java_job");

注意:Table API & SQL的查詢可以相互集成,另外還可以在DataStream或者DataSet中使用Table API & SQL的API,實(shí)現(xiàn)DataStreams、 DataSet與Table之間的相互轉(zhuǎn)換。

創(chuàng)建TableEnvironment

TableEnvironment是Table API & SQL程序的一個(gè)入口,主要包括如下的功能:

  • 在內(nèi)部的catalog中注冊(cè)Table
  • 注冊(cè)catalog
  • 加載可插拔模塊
  • 執(zhí)行SQL查詢
  • 注冊(cè)用戶定義函數(shù)
  • DataStream 、DataSet與Table之間的相互轉(zhuǎn)換
  • 持有對(duì)ExecutionEnvironment 、StreamExecutionEnvironment的引用

一個(gè)Table必定屬于一個(gè)具體的TableEnvironment,不可以將不同TableEnvironment的表放在一起使用(比如join,union等操作)。

TableEnvironment是通過(guò)調(diào)用 BatchTableEnvironment.create() 或者StreamTableEnvironment.create()的靜態(tài)方法進(jìn)行創(chuàng)建的。另外,默認(rèn)兩個(gè)planner的jar包都存在與classpath下,所有需要明確指定使用的planner。

// **********************
// FLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
//或者TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK 批處理查詢
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// 或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK 批處理查詢
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

在catalog中創(chuàng)建表

臨時(shí)表與永久表

表可以分為臨時(shí)表和永久表兩種,其中永久表需要一個(gè)catalog(比如Hive的Metastore)倆維護(hù)表的元數(shù)據(jù)信息,一旦永久表被創(chuàng)建,只要連接到該catalog就可以訪問(wèn)該表,只有顯示刪除永久表,該表才可以被刪除。臨時(shí)表的生命周期是Flink Session,這些表不能夠被其他的Flink Session訪問(wèn),這些表不屬于任何的catalog或者數(shù)據(jù)庫(kù),如果與臨時(shí)表相對(duì)應(yīng)的數(shù)據(jù)庫(kù)被刪除了,該臨時(shí)表也不會(huì)被刪除。

創(chuàng)建表

虛表(Virtual Tables)

一個(gè)Table對(duì)象相當(dāng)于SQL中的視圖(虛表),它封裝了一個(gè)邏輯執(zhí)行計(jì)劃,可以通過(guò)一個(gè)catalog創(chuàng)建,具體如下:

// 獲取一個(gè)TableEnvironment
TableEnvironment tableEnv = ...; 
// table對(duì)象,查詢的結(jié)果集
Table projTable = tableEnv.from("X").select(...);
// 注冊(cè)一個(gè)表,名稱為 "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);

外部數(shù)據(jù)源表(Connector Tables)

可以把外部的數(shù)據(jù)源注冊(cè)成表,比如可以讀取MySQL數(shù)據(jù)庫(kù)數(shù)據(jù)、Kafka數(shù)據(jù)等

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

擴(kuò)展創(chuàng)建表的標(biāo)識(shí)屬性

表的注冊(cè)總是包含三部分標(biāo)識(shí)屬性:catalog、數(shù)據(jù)庫(kù)、表名。用戶可以在內(nèi)部設(shè)置一個(gè)catalog和一個(gè)數(shù)據(jù)庫(kù)作為當(dāng)前的catalog和數(shù)據(jù)庫(kù),所以對(duì)于catalog和數(shù)據(jù)庫(kù)這兩個(gè)標(biāo)識(shí)屬性是可選的,即如果不指定,默認(rèn)使用的是“current catalog”和 “current database”。

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");//設(shè)置catalog
tEnv.useDatabase("custom_database");//設(shè)置數(shù)據(jù)庫(kù)
Table table = ...;
// 注冊(cè)一個(gè)名為exampleView的視圖,catalog名為custom_catalog
// 數(shù)據(jù)庫(kù)的名為custom_database
tableEnv.createTemporaryView("exampleView", table);

// 注冊(cè)一個(gè)名為exampleView的視圖,catalog的名為custom_catalog
// 數(shù)據(jù)庫(kù)的名為other_database
tableEnv.createTemporaryView("other_database.exampleView", table);
 
// 注冊(cè)一個(gè)名為'View'的視圖,catalog的名稱為custom_catalog
// 數(shù)據(jù)庫(kù)的名為custom_database,'View'是保留關(guān)鍵字,需要使用``(反引號(hào))
tableEnv.createTemporaryView("`View`", table);

// 注冊(cè)一個(gè)名為example.View的視圖,catalog的名為custom_catalog,
// 數(shù)據(jù)庫(kù)名為custom_database
tableEnv.createTemporaryView("`example.View`", table);

// 注冊(cè)一個(gè)名為'exampleView'的視圖, catalog的名為'other_catalog'
// 數(shù)據(jù)庫(kù)名為other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

查詢表

Table API

Table API是一個(gè)集成Scala與Java語(yǔ)言的查詢API,與SQL相比,它的查詢不是一個(gè)標(biāo)準(zhǔn)的SQL語(yǔ)句,而是由一步一步的操作組成的。如下展示了一個(gè)使用Table API實(shí)現(xiàn)一個(gè)簡(jiǎn)單的聚合查詢。

// 獲取TableEnvironment
TableEnvironment tableEnv = ...;
//注冊(cè)O(shè)rders表

// 查詢注冊(cè)的表
Table orders = tableEnv.from("Orders");
// 計(jì)算操作
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

SQL

Flink SQL依賴于Apache Calcite,其實(shí)現(xiàn)了標(biāo)準(zhǔn)的SQL語(yǔ)法,如下案例:

// 獲取TableEnvironment
TableEnvironment tableEnv = ...;

//注冊(cè)O(shè)rders表

// 計(jì)算邏輯同上面的Table API
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// 注冊(cè)"RevenueFrance"外部輸出表
// 計(jì)算結(jié)果插入"RevenueFrance"表
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

輸出表

一個(gè)表通過(guò)將其寫入到TableSink,然后進(jìn)行輸出。TableSink是一個(gè)通用的支持多種文件格式(CSV、Parquet, Avro)和多種外部存儲(chǔ)系統(tǒng)(JDBC, Apache HBase, Apache Cassandra, Elasticsearch)以及多種消息對(duì)列(Apache Kafka, RabbitMQ)的接口。

批處理的表只能被寫入到 BatchTableSink,流處理的表需要指明AppendStreamTableSink、RetractStreamTableSink或者 UpsertStreamTableSink

// 獲取TableEnvironment
TableEnvironment tableEnv = ...;

// 創(chuàng)建輸出表
final Schema schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.LONG());

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable");

// 計(jì)算結(jié)果表
Table result = ...
// 輸出結(jié)果表到注冊(cè)的TableSink
result.insertInto("CsvSinkTable");

Table API & SQL底層的轉(zhuǎn)換與執(zhí)行

上文提到了Flink提供了兩種planner,分別為old planner和Blink planner,對(duì)于不同的planner而言,Table API & SQL底層的執(zhí)行與轉(zhuǎn)換是有所不同的。

Old planner

根據(jù)是流處理作業(yè)還是批處理作業(yè),Table API &SQL會(huì)被轉(zhuǎn)換成DataStream或者DataSet程序。一個(gè)查詢?cè)趦?nèi)部表示為一個(gè)邏輯查詢計(jì)劃,會(huì)被轉(zhuǎn)換為兩個(gè)階段:

  • 1.邏輯查詢計(jì)劃優(yōu)化
  • 2.轉(zhuǎn)換成DataStream或者DataSet程序

上面的兩個(gè)階段只有下面的操作被執(zhí)行時(shí)才會(huì)被執(zhí)行:

  • 當(dāng)一個(gè)表被輸出到TableSink時(shí),比如調(diào)用了Table.insertInto()方法
  • 當(dāng)執(zhí)行更新查詢時(shí),比如調(diào)用TableEnvironment.sqlUpdate()方法
  • 當(dāng)一個(gè)表被轉(zhuǎn)換為DataStream或者DataSet時(shí)

一旦執(zhí)行上述兩個(gè)階段,Table API & SQL的操作會(huì)被看做是普通的DataStream或者DataSet程序,所以當(dāng)StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute() 被調(diào)用時(shí),會(huì)執(zhí)行轉(zhuǎn)換后的程序。

Blink planner

無(wú)論是批處理作業(yè)還是流處理作業(yè),如果使用的是Blink planner,底層都會(huì)被轉(zhuǎn)換為DataStream程序。在一個(gè)查詢?cè)趦?nèi)部表示為一個(gè)邏輯查詢計(jì)劃,會(huì)被轉(zhuǎn)換成兩個(gè)階段:

  • 1.邏輯查詢計(jì)劃優(yōu)化
  • 2.轉(zhuǎn)換成DataStream程序

對(duì)于TableEnvironment and StreamTableEnvironment而言,一個(gè)查詢的轉(zhuǎn)換是不同的

首先對(duì)于TableEnvironment,當(dāng)TableEnvironment.execute()方法執(zhí)行時(shí),Table API & SQL的查詢才會(huì)被轉(zhuǎn)換,因?yàn)門ableEnvironment會(huì)將多個(gè)sink優(yōu)化為一個(gè)DAG。

對(duì)于StreamTableEnvironment,轉(zhuǎn)換發(fā)生的時(shí)間與old planner相同。

與DataStream & DataSet API集成

對(duì)于Old planner與Blink planner而言,只要是流處理的操作,都可以與DataStream API集成,僅僅只有Old planner才可以與DataSet API集成,由于Blink planner的批處理作業(yè)會(huì)被轉(zhuǎn)換成DataStream程序,所以不能夠與DataSet API集成。值得注意的是,下面提到的table與DataSet之間的轉(zhuǎn)換僅適用于Old planner。

Table API & SQL的查詢很容易與DataStream或者DataSet程序集成,并可以將Table API & SQL的查詢嵌入DataStream或者DataSet程序中。DataStream或者DataSet可以轉(zhuǎn)換成表,反之,表也可以被轉(zhuǎn)換成DataStream或者DataSet。

從DataStream或者DataSet中注冊(cè)臨時(shí)表(視圖)

尖叫提示:只能將DataStream或者DataSet轉(zhuǎn)換為臨時(shí)表(視圖)

下面演示DataStream的轉(zhuǎn)換,對(duì)于DataSet的轉(zhuǎn)換類似。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream注冊(cè)為一個(gè)名為myTable的視圖,其中字段分別為"f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// 將DataStream注冊(cè)為一個(gè)名為myTable2的視圖,其中字段分別為"myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");

將DataStream或者DataSet轉(zhuǎn)化為Table對(duì)象

可以直接將DataStream或者DataSet轉(zhuǎn)換為Table對(duì)象,之后可以使用Table API進(jìn)行查詢操作。下面演示DataStream的轉(zhuǎn)換,對(duì)于DataSet的轉(zhuǎn)換類似。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉(zhuǎn)換為Table對(duì)象,默認(rèn)的字段為"f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)換為Table對(duì)象,默認(rèn)的字段為"myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

將表轉(zhuǎn)換為DataStream或者DataSet

當(dāng)將Table轉(zhuǎn)為DataStream或者DataSet時(shí),需要指定DataStream或者DataSet的數(shù)據(jù)類型。通常最方便的數(shù)據(jù)類型是row類型,F(xiàn)link提供了很多的數(shù)據(jù)類型供用戶選擇,具體包括Row、POJO、樣例類、Tuple和原子類型。

將表轉(zhuǎn)換為DataStream

一個(gè)流處理查詢的結(jié)果是動(dòng)態(tài)變化的,所以將表轉(zhuǎn)為DataStream時(shí)需要指定一個(gè)更新模式,共有兩種模式:Append ModeRetract Mode。

  • Append Mode

如果動(dòng)態(tài)表僅只有Insert操作,即之前輸出的結(jié)果不會(huì)被更新,則使用該模式。如果更新或刪除操作使用追加模式會(huì)失敗報(bào)錯(cuò)

  • Retract Mode

始終可以使用此模式。返回值是boolean類型。它用true或false來(lái)標(biāo)記數(shù)據(jù)的插入和撤回,返回true代表數(shù)據(jù)插入,false代表數(shù)據(jù)的撤回。

// 獲取StreamTableEnvironment. 
StreamTableEnvironment tableEnv = ...; 
// 包含兩個(gè)字段的表(String name, Integer age)
Table table = ...
// 將表轉(zhuǎn)為DataStream,使用Append Mode追加模式,數(shù)據(jù)類型為Row
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 將表轉(zhuǎn)為DataStream,使用Append Mode追加模式,數(shù)據(jù)類型為定義好的TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);
// 將表轉(zhuǎn)為DataStream,使用的模式為Retract Mode撤回模式,類型為Row
// 對(duì)于轉(zhuǎn)換后的DataStream<Tuple2<Boolean, X>>,X表示流的數(shù)據(jù)類型,
// boolean值表示數(shù)據(jù)改變的類型,其中INSERT返回true,DELETE返回的是false
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);

將表轉(zhuǎn)換為DataSet

// 獲取BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 包含兩個(gè)字段的表(String name, Integer age)
Table table = ...
// 將表轉(zhuǎn)為DataSet數(shù)據(jù)類型為Row
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 將表轉(zhuǎn)為DataSet,通過(guò)TypeInformation定義Tuple2<String, Integer>數(shù)據(jù)類型
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toDataSet(table, tupleType);

表的Schema與數(shù)據(jù)類型之間的映射

表的Schema與數(shù)據(jù)類型之間的映射有兩種方式:分別是基于字段下標(biāo)位置的映射和基于字段名稱的映射。

基于字段下標(biāo)位置的映射

該方式是按照字段的順序進(jìn)行一一映射,使用方式如下:

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,選取tuple的第一個(gè)元素,指定一個(gè)名為"myLong"的字段名
Table table = tableEnv.fromDataStream(stream, "myLong");
// 將DataStream轉(zhuǎn)為表,為tuple的第一個(gè)元素指定名為"myLong",為第二個(gè)元素指定myInt的字段名
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");

基于字段名稱的映射

基于字段名稱的映射方式支持任意的數(shù)據(jù)類型包括POJO類型,可以很靈活地定義表Schema映射,所有的字段被映射成一個(gè)具體的字段名稱,同時(shí)也可以使用"as"為字段起一個(gè)別名。其中Tuple元素的第一個(gè)元素為f0,第二個(gè)元素為f1,以此類推。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,選擇tuple的第二個(gè)元素,指定一個(gè)名為"f1"的字段名
Table table = tableEnv.fromDataStream(stream, "f1");
// 將DataStream轉(zhuǎn)為表,交換字段的順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 將DataStream轉(zhuǎn)為表,交換字段的順序,并為f1起別名為"myInt",為f0起別名為"myLong
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");

原子類型

Flink將Integer, Double, String或者普通的類型稱之為原子類型,一個(gè)數(shù)據(jù)類型為原子類型的DataStream或者DataSet可以被轉(zhuǎn)成單個(gè)字段屬性的表,這個(gè)字段的類型與DataStream或者DataSet的數(shù)據(jù)類型一致,這個(gè)字段的名稱可以進(jìn)行指定。

//獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// 數(shù)據(jù)類型為原子類型Long
DataStream<Long> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為"f0"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,指定字段名為myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");

Tuple類型

Tuple類型的DataStream或者DataSet都可以轉(zhuǎn)為表,可以重新設(shè)定表的字段名(即根據(jù)tuple元素的位置進(jìn)行一一映射,轉(zhuǎn)為表之后,每個(gè)元素都有一個(gè)別名),如果不為字段指定名稱,則使用默認(rèn)的名稱(java語(yǔ)言默認(rèn)的是f0,f1,scala默認(rèn)的是_1),用戶也可以重新排列字段的順序,并為每個(gè)字段起一個(gè)別名。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//Tuple2<Long, String>類型的DataStream
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為 "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,指定字段名為 "myLong", "myString"(按照Tuple元素的順序位置)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// 將DataStream轉(zhuǎn)為表,指定字段名為 "f0", "f1",并且交換順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 將DataStream轉(zhuǎn)為表,只選擇Tuple的第二個(gè)元素,指定字段名為"f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// 將DataStream轉(zhuǎn)為表,為Tuple的第二個(gè)元素指定別名為myString,為第一個(gè)元素指定字段名為myLong
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");

POJO類型

當(dāng)將POJO類型的DataStream或者DataSet轉(zhuǎn)為表時(shí),如果不指定表名,則默認(rèn)使用的是POJO字段本身的名稱,原始字段名稱的映射需要指定原始字段的名稱,可以為其起一個(gè)別名,也可以調(diào)換字段的順序,也可以只選擇部分的字段。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//數(shù)據(jù)類型為Person的POJO類型,字段包括"name"和"age"
DataStream<Person> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名稱為"age", "name"
Table table = tableEnv.fromDataStream(stream);
//  將DataStream轉(zhuǎn)為表,為"age"字段指定別名myAge, 為"name"字段指定別名myName
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
//  將DataStream轉(zhuǎn)為表,只選擇一個(gè)name字段
Table table = tableEnv.fromDataStream(stream, "name");
//  將DataStream轉(zhuǎn)為表,只選擇一個(gè)name字段,并起一個(gè)別名myName
Table table = tableEnv.fromDataStream(stream, "name as myName");

Row類型

Row類型的DataStream或者DataSet轉(zhuǎn)為表的過(guò)程中,可以根據(jù)字段的位置或者字段名稱進(jìn)行映射,同時(shí)也可以為字段起一個(gè)別名,或者只選擇部分字段。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// Row類型的DataStream,通過(guò)RowTypeInfo指定兩個(gè)字段"name"和"age"
DataStream<Row> stream = ...
// 將DataStream轉(zhuǎn)為表,默認(rèn)的字段名為原始字段名"name"和"age"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)為表,根據(jù)位置映射,為第一個(gè)字段指定myName別名,為第二個(gè)字段指定myAge別名
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// 將DataStream轉(zhuǎn)為表,根據(jù)字段名映射,為name字段起別名myName,為age字段起別名myAge
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 將DataStream轉(zhuǎn)為表,根據(jù)字段名映射,只選擇name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 將DataStream轉(zhuǎn)為表,根據(jù)字段名映射,只選擇name字段,并起一個(gè)別名"myName"
Table table = tableEnv.fromDataStream(stream, "name as myName");

查詢優(yōu)化

Old planner

Apache Flink利用Apache Calcite來(lái)優(yōu)化和轉(zhuǎn)換查詢。當(dāng)前執(zhí)行的優(yōu)化包括投影和過(guò)濾器下推,去相關(guān)子查詢以及其他類型的查詢重寫。Old Planner目前不支持優(yōu)化JOIN的順序,而是按照查詢中定義的順序執(zhí)行它們。

通過(guò)提供一個(gè)CalciteConfig對(duì)象,可以調(diào)整在不同階段應(yīng)用的優(yōu)化規(guī)則集。這可通過(guò)調(diào)用CalciteConfig.createBuilder()方法來(lái)進(jìn)行創(chuàng)建,并通過(guò)調(diào)用tableEnv.getConfig.setPlannerConfig(calciteConfig)方法將該對(duì)象傳遞給TableEnvironment。

Blink planner

Apache Flink利用并擴(kuò)展了Apache Calcite來(lái)執(zhí)行復(fù)雜的查詢優(yōu)化。這包括一系列基于規(guī)則和基于成本的優(yōu)化(cost_based),例如:

  • 基于Apache Calcite的去相關(guān)子查詢
  • 投影裁剪
  • 分區(qū)裁剪
  • 過(guò)濾器謂詞下推
  • 過(guò)濾器下推
  • 子計(jì)劃重復(fù)數(shù)據(jù)刪除以避免重復(fù)計(jì)算
  • 特殊的子查詢重寫,包括兩個(gè)部分:
    • 將IN和EXISTS轉(zhuǎn)換為左半聯(lián)接( left semi-join)
    • 將NOT IN和NOT EXISTS轉(zhuǎn)換為left anti-join
  • 調(diào)整join的順序,需要啟用 table.optimizer.join-reorder-enabled

注意: IN / EXISTS / NOT IN / NOT EXISTS當(dāng)前僅在子查詢重寫的結(jié)合條件下受支持。

查詢優(yōu)化器不僅基于計(jì)劃,而且還可以基于數(shù)據(jù)源的統(tǒng)計(jì)信息以及每個(gè)操作的細(xì)粒度開銷(例如io,cpu,網(wǎng)絡(luò)和內(nèi)存),從而做出更加明智且合理的優(yōu)化決策。

高級(jí)用戶可以通過(guò)CalciteConfig對(duì)象提供自定義優(yōu)化規(guī)則,通過(guò)調(diào)用tableEnv.getConfig.setPlannerConfig(calciteConfig),將參數(shù)傳遞給TableEnvironment。

查看執(zhí)行計(jì)劃

SQL語(yǔ)言支持通過(guò)explain來(lái)查看某條SQL的執(zhí)行計(jì)劃,F(xiàn)link Table API也可以通過(guò)調(diào)用explain()方法來(lái)查看具體的執(zhí)行計(jì)劃。該方法返回一個(gè)字符串用來(lái)描述三個(gè)部分計(jì)劃,分別為:

  1. 關(guān)系查詢的抽象語(yǔ)法樹,即未優(yōu)化的邏輯查詢計(jì)劃,
  2. 優(yōu)化的邏輯查詢計(jì)劃
  3. 實(shí)際執(zhí)行計(jì)劃
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);
// 查看執(zhí)行計(jì)劃
String explanation = tEnv.explain(table);
System.out.println(explanation);

執(zhí)行計(jì)劃的結(jié)果為:

== 抽象語(yǔ)法樹 ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== 優(yōu)化的邏輯執(zhí)行計(jì)劃 ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== 物理執(zhí)行計(jì)劃 ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

Stage 2 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 3 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE

        Stage 4 : Operator
            content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
            ship_strategy : FORWARD

            Stage 5 : Operator
                content : from: (count, word)
                ship_strategy : REBALANCE

小結(jié)

本文主要介紹了Flink TableAPI &SQL,首先介紹了Flink Table API &SQL的基本概念 ,然后介紹了構(gòu)建Flink Table API & SQL程序所需要的依賴,接著介紹了Flink的兩種planner,還介紹了如何注冊(cè)表以及DataStream、DataSet與表的相互轉(zhuǎn)換,最后介紹了Flink的兩種planner對(duì)應(yīng)的查詢優(yōu)化并給出了一個(gè)查看執(zhí)行計(jì)劃的案例。

公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)』,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容