Apache Flink 具有兩個(gè)關(guān)系型API:Table API 和SQL,用于統(tǒng)一流和批處理。
Table API 是用于 Scala 和 Java 語(yǔ)言的查詢API,允許以非常直觀的方式組合關(guān)系運(yùn)算符的查詢,例如 select,filter 和 join。Flink SQL 的支持是基于實(shí)現(xiàn)了SQL標(biāo)準(zhǔn)的 Apache Calcite。無(wú)論輸入是批輸入(DataSet)還是流輸入(DataStream),任一接口中指定的查詢都具有相同的語(yǔ)義并指定相同的結(jié)果。
Table API 和 SQL 還沒(méi)有完全支持并且正在積極開(kāi)發(fā)中。
要使用 Table API 和SQL,需要將以下依賴引入項(xiàng)目:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.6.1</version>
</dependency>
Table API 和SQL
批處理和流式傳輸?shù)?Table API 和SQL程序都遵循相同的模式。以下代碼示例顯示了常見(jiàn)的程序結(jié)構(gòu):
// 批處理使用 ExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 創(chuàng)建 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注冊(cè) Table
tableEnv.registerTable("table1", ...)
// Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// Sink query result
tapiResult.writeToSink(...)
// execute
env.execute()
TableEnvironment
TableEnvironment 是 Table API 和SQL集成的核心概念,它負(fù)責(zé):
- 在內(nèi)部目錄中注冊(cè)表
- 注冊(cè)外部目錄
- 執(zhí)行SQL查詢
- 注冊(cè)用戶定義的函數(shù)
- DataStream 或 DataSet 轉(zhuǎn)換為 Table
- 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
Table 總是與特定的 TableEnvironment 綁定。不能在同一查詢中組合不同 TableEnvironments 的表(例如,union 或 join)。創(chuàng)建 TableEnvironment:
// STREAMING QUERY
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// BATCH QUERY
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
注冊(cè) Table
TableEnvironment 維護(hù)一個(gè)按名稱注冊(cè)的表的目錄。有兩種類型的表,輸入表(input table)和輸出表(output table)。輸入表可以在 Table API 和SQL查詢中引用,并提供輸入數(shù)據(jù)。輸出表可用于將 Table API 或SQL查詢的結(jié)果發(fā)送到外部系統(tǒng)。
輸入表的注冊(cè)源:
- Table API 或SQL查詢的結(jié)果表
- 訪問(wèn)外部數(shù)據(jù)的 TableSource,例如文件,數(shù)據(jù)庫(kù)或消息系統(tǒng)
- DataStream 或 DataSet。
輸出表的注冊(cè)源:TableSink
代碼示例:
val tableEnv = TableEnvironment.getTableEnvironment(env)
// from Table API or SQL
val projTable: Table = tableEnv.scan("X").select(...)
tableEnv.registerTable("projectedTable", projTable)
// from TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
tableEnv.registerTableSource("CsvTable", csvSource)
// from TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
注冊(cè)外部目錄
外部目錄(external catalog)可以提供有關(guān)外部數(shù)據(jù)庫(kù)和表的信息(如名稱,schema,統(tǒng)計(jì)信息以及訪問(wèn)信息)??梢酝ㄟ^(guò)實(shí)現(xiàn) ExternalCatalog 接口創(chuàng)建外部目錄,并在 TableEnvironment 中注冊(cè):
// 獲取一個(gè) StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 創(chuàng)建一個(gè)外部目錄
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// 注冊(cè)外部目錄
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
查詢
Table API
Table API 是一個(gè) Scala 和 Java 的語(yǔ)言集成查詢API,是基于 Table類。Table類代表了一個(gè)流或者批表,并提供方法來(lái)使用關(guān)系型操作。這些方法返回一個(gè)新的 Table 對(duì)象,這個(gè)新的 Table 對(duì)象代表著輸入的 Table 應(yīng)用關(guān)系型操作后的結(jié)果。下面的例子展示了一個(gè)簡(jiǎn)單的 Table API 聚合查詢:
// 獲取一個(gè) StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注冊(cè)一個(gè)名叫 Orders 的表 ...
// 掃描注冊(cè)的 Orders 表
val orders = tableEnv.scan("Orders")
// 計(jì)算所有來(lái)自法國(guó)的客戶的收入
val revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName')
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// 執(zhí)行查詢
SQL
Flink SQL 集成是基于 Apache Calcite,Apache Calcite 實(shí)現(xiàn)了標(biāo)準(zhǔn)的SQL。下面的例子展示了如何指定一個(gè)查詢并返回結(jié)果:
// 獲取一個(gè) StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注冊(cè)一個(gè)名叫 Orders 的表
// 計(jì)算所有來(lái)自法國(guó)的客戶的收入
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// 執(zhí)行查詢
指定將其結(jié)果插入已注冊(cè)表的更新查詢:
// 注冊(cè)一個(gè)名叫 RevenueFrance 的輸出表
// 計(jì)算所有來(lái)自法國(guó)的客戶的收入,并更新到 RevenueFrance 表
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// 執(zhí)行查詢
混合使用 Table API 和SQL,Table API 和SQL查詢可以很容易地合并因?yàn)樗鼈兌挤祷?Table 對(duì)象:
- Table API 查詢可以基于SQL查詢結(jié)果的 Table 來(lái)進(jìn)行
- SQL查詢可以基于 Table API 查詢的結(jié)果來(lái)定義
輸出表
要輸出 Table 可以寫(xiě)入 TableSink。TableSink 是通用接口,支持各種文件格式(如:CSV,Apache Parquet,Apache Avro)、存儲(chǔ)系統(tǒng)(如:JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息系統(tǒng)(如:Apache Kafka,RabbitMQ)。
批處理 Table 只能寫(xiě)入 BatchTableSink,而流式處理 Table 需要 AppendStreamTableSink,RetractStreamTableSink 或 UpsertStreamTableSink。有關(guān)可用接收器的詳細(xì)信息,請(qǐng)參閱 Sources & Sinks。
有兩種方法可以發(fā)送表:
-
Table.writeToSink(TableSink sink)自動(dòng)匹配 schema -
Table.insertInto(String sinkTable)使用特定 schema
以下示例顯示如何發(fā)出Table:
// 獲取一個(gè) StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 使用Table API和/或SQL查詢獲取一個(gè) Table
val result: Table = ...
// 創(chuàng)建一個(gè) TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// METHOD 1:
// 將結(jié)果表寫(xiě)入 TableSink
result.writeToSink(sink)
// METHOD 2:
// 注冊(cè)指定 schema 的 TableSink
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
// 將結(jié)果表寫(xiě)入 TableSink
result.insertInto("CsvSinkTable")
// 執(zhí)行程序
與 DataStream 和 DataSet API 集成
Table API 和SQL查詢可以很容易地進(jìn)行集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查詢一個(gè)外部表(來(lái)自關(guān)系型數(shù)據(jù)庫(kù)的表),做一些處理(如過(guò)濾、映射、聚合或者關(guān)聯(lián)元數(shù)據(jù)),然后使用 DataStream 或者 DataSet API(以及在這些API之上構(gòu)建的任何庫(kù),例如CEP或 Gelly) 進(jìn)行進(jìn)一步處理。
同樣,Table API 或者SQL查詢也可以應(yīng)用于 DataStream 或者 DataSet 程序的結(jié)果中。這種交互可以通過(guò)將 DataStream 或者 DataSet 轉(zhuǎn)換成一個(gè) Table 及將 Table 轉(zhuǎn)換成 DataStream 或者 DataSet 來(lái)實(shí)現(xiàn)。
Scala 隱式轉(zhuǎn)換
Scala Table API 支持 DataSet,DataStream 以及 Table 間的隱式轉(zhuǎn)換。需要引入 org.apache.flink.table.api.scala._ 和 org.apache.flink.api.scala._。
DataStream 或 DataSet 轉(zhuǎn)換為 Table
DataStream 或 DataSet 可以在 TableEnvironment 中注冊(cè)為表,表的 schema 根據(jù)注冊(cè)的 DataStream 或 DataSet 的數(shù)據(jù)類型來(lái)定:
val stream: DataStream[(Long, String)] = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
也可以直接轉(zhuǎn)換為表,而不需要注冊(cè):
val stream: DataStream[(Long, String)] = ...
// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
Table 轉(zhuǎn)換為 DataStream 或 DataSet
Table 可以轉(zhuǎn)換為 DataStream 或者 DataSet,通過(guò)這種方式,DataStream 或者 DataSet 程序就可以基于 Table API 或者SQL查詢的結(jié)果來(lái)執(zhí)行了。
當(dāng)將一個(gè) Table 轉(zhuǎn)換為 DataStream 或 DataSet 時(shí),需要指定生成的 DataStream 或 DataSet 的數(shù)據(jù)類型,即需要轉(zhuǎn)換表的行的數(shù)據(jù)類型,通常最方便的轉(zhuǎn)換類型是 Row,下面列表描述了不同選項(xiàng)的功能:
- Row:字段按位置、任意數(shù)量字段映射,支持 null 值,無(wú)類型安全訪問(wèn)
- POJO:字段按名稱(POJO 字段命名為 Table 字段)、任意數(shù)量字段映射,支持 null 值,類型安全訪問(wèn)
- Case Class:字段按位置映射,不支持 null 值,類型安全訪問(wèn)
- Tuple:字段按位置映射,不得多于22(Scala)或 25(Java)個(gè)字段,不支持 null 值,類型安全訪問(wèn)
- Atomic Type:Table 必須有一個(gè)字段,不支持 null 值,類型安全訪問(wèn)
Table 轉(zhuǎn)換 DataStream
流式查詢的結(jié)果表會(huì)動(dòng)態(tài)地更新,每個(gè)新的記錄到達(dá)輸入流時(shí)結(jié)果就會(huì)發(fā)生變化。有兩種模式將 Table 轉(zhuǎn)換為 DataStream:
- Append Mode:只適用于當(dāng)動(dòng)態(tài)表僅由
INSERT修改時(shí),之前的結(jié)果不會(huì)被更新。 - Retract Mode:始終都可以使用此模式,使用一個(gè) boolean 標(biāo)識(shí)來(lái)編碼
INSERT和DELETE更改。
// 有兩個(gè)字段的 Table(String name, Integer age)
val table: Table = ...
// 將 Table 轉(zhuǎn)換為 Row 類型的 Append DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// 將 Table 轉(zhuǎn)換為 Tuple2<String, Integer> 類型的 Append DataStream
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// 將 Table 轉(zhuǎn)換為 Row 類型的 Retact DataStream
// 一個(gè) ReactDataStream 的類型X為表示為 DataStream[(Boolean, X)]
// boolean 字段指定了更改的類型
// True 是 INSERT, false 是 DELETE
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
Table 轉(zhuǎn)換 DataSet
// 有兩個(gè)字段的 Table(String name, Integer age)
val table: Table = ...
// 將 Table 轉(zhuǎn)換為 Row 類型的 DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// 將 Table 轉(zhuǎn)換為 Tuple2<String, Integer> 類型的 DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
將數(shù)據(jù)類型映射到表模式(Table schema)
DataStream 和 DataSet API 支持多種數(shù)據(jù)類型,如:Tuple、POJO、case class 及 Row 類型。
原子類型
Flink 將原生類型(Integer、Double、String...)或泛型類型視為原子類型(Atomic type)。一個(gè)原子類型的 DataStream 或 DataSet 可以轉(zhuǎn)換為只有一個(gè)屬性的 Table,屬性的類型根據(jù)原子類型推算,并且必須指定屬性的名稱。
val stream: DataStream[Long] = ...
// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuple 和 Case Class
Flink 支持 Scala 原生的 Tuple 類型,也為 Java 提供了 Tuple 類。兩種類型的 DataStream 和 DataSet 都可以被轉(zhuǎn)換為 Table。通過(guò)為所有字段提供名稱(基于位置的映射),可以重命名字段。如果未指定字段名,則使用默認(rèn)字段名?;诿Q的映射允許使用別名(as)重新排序字段。
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)
// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)
// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
POJO
Flink 支持使用 POJO 作為復(fù)合類型。當(dāng)將一個(gè) POJO 類型的 DataStream 或 DataSet 轉(zhuǎn)換為 Table 而不指定字段名稱時(shí),Table 的字段名稱將采用 POJO 原生的字段名稱。重命名原始的 POJO 字段需要關(guān)鍵字AS,因?yàn)?POJO 沒(méi)有固定的順序,名稱映射需要原始名稱并且不能通過(guò)位置來(lái)完成。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Row
Row 類型支持任意數(shù)量的字段,并且支持 null 值。字段名稱可以通過(guò) RowTypeInfo 來(lái)指定或者將一個(gè) Row 類型的 DataStream 或 DataSet 轉(zhuǎn)換為 Table 時(shí)指定。Row 類型支持按位置和名字映射??梢酝ㄟ^(guò)為所有字段提供名稱(基于位置)或?yàn)?映射/排序/重命名(基于名稱)單獨(dú)選擇字段來(lái)重命名字段。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...
// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html