1. 函數(shù)類型
函數(shù) | Apache Flink
Flink 中的函數(shù)有兩個(gè)劃分標(biāo)準(zhǔn)
- 一個(gè)劃分標(biāo)準(zhǔn)是:系統(tǒng)(內(nèi)置)函數(shù)和 Catalog 函數(shù)。系統(tǒng)函數(shù)沒(méi)有名稱空間,只能通過(guò)其名稱來(lái)進(jìn)行引用。 Catalog 函數(shù)屬于 Catalog 和數(shù)據(jù)庫(kù),因此它們擁有 Catalog 和數(shù)據(jù)庫(kù)命名空間。 用戶可以通過(guò)全/部分限定名(catalog.db.func 或 db.func)或者函數(shù)名 來(lái)對(duì) Catalog 函數(shù)進(jìn)行引用。
- 另一個(gè)劃分標(biāo)準(zhǔn)是:臨時(shí)函數(shù)和持久化函數(shù)。 臨時(shí)函數(shù)始終由用戶創(chuàng)建,它容易改變并且僅在會(huì)話的生命周期內(nèi)有效。 持久化函數(shù)不是由系統(tǒng)提供,就是存儲(chǔ)在 Catalog 中,它在會(huì)話的整個(gè)生命周期內(nèi)都有效
看一下函數(shù)如何引用和函數(shù)解析優(yōu)先級(jí)??
2. 系統(tǒng)內(nèi)置函數(shù)
系統(tǒng)(內(nèi)置)函數(shù) | Apache Flink
3. 自定義函數(shù)
當(dāng)前 Flink 有如下幾種函數(shù):
- 標(biāo)量函數(shù) 將標(biāo)量值轉(zhuǎn)換成一個(gè)新標(biāo)量值;
- 表值函數(shù) 將標(biāo)量值轉(zhuǎn)換成新的行數(shù)據(jù);
- 聚合函數(shù) 將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個(gè)新標(biāo)量值;
- 表值聚合函數(shù) 將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成新的行數(shù)據(jù);
- 異步表值函數(shù) 是異步查詢外部數(shù)據(jù)系統(tǒng)的特殊函數(shù)。
public class tableDemo5 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//輸入表
tableEnv.executeSql("CREATE TABLE input_table( tag STRING ,uid INT, money INT ) " +
" WITH( 'connector' = 'datagen', " +
" 'rows-per-second'='1', " +
" 'fields.uid.kind'='sequence', " +
" 'fields.uid.start'='1'," +
" 'fields.uid.end'='1000'," +
" 'fields.tag.length'='1'," +
" 'fields.money.min'='1'," +
" 'fields.money.max'='1000')");
//輸出表
tableEnv.executeSql("CREATE TABLE out_Table (tag STRING, money_2 BIGINT, money BIGINT ) WITH ( 'connector' = 'print' )");
//注冊(cè)自定義標(biāo)量函數(shù)
tableEnv.createTemporarySystemFunction("myDouble",DoubleFunction.class);
tableEnv.executeSql("INSERT INTO out_Table SELECT tag ,myDouble(money),money FROM input_table");
env.execute();
}
public static class DoubleFunction extends ScalarFunction {
public Integer eval(Integer money) {
return money * 2;
}
}
}
如果你的函數(shù)在初始化時(shí),是有入?yún)⒌模敲葱枰愕娜雲(yún)⑹?Serializable 的。即 Java 中需要繼承 Serializable 接口
public class tableDemo6 {
public static void main(String[] args) throws Exception {
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(setting);
//輸入表
tableEnv.executeSql("CREATE TABLE input_table( tag STRING ,uid INT, money INT ) " +
" WITH( 'connector' = 'datagen', " +
" 'rows-per-second'='1', " +
" 'fields.uid.kind'='sequence', " +
" 'fields.uid.start'='1'," +
" 'fields.uid.end'='1000'," +
" 'fields.tag.length'='1'," +
" 'fields.money.min'='1'," +
" 'fields.money.max'='1000')");
//輸出表
tableEnv.executeSql("CREATE TABLE out_Table (tag STRING, money_2 BIGINT, money BIGINT ) WITH ( 'connector' = 'print' )");
//注冊(cè)自定義標(biāo)量函數(shù)
tableEnv.createTemporarySystemFunction("myDouble",new DoubleFunction(false));
tableEnv.executeSql("INSERT INTO out_Table SELECT tag ,myDouble(money),money FROM input_table");
}
public static class DoubleFunction extends ScalarFunction {
private boolean endInclusive;
public DoubleFunction(boolean endInclusive) {
this.endInclusive = endInclusive;
}
public Integer eval(Integer money) {
if (endInclusive) {
return money * 2;
}
return money;
}
}
}
4. 開(kāi)發(fā)UDF的需知事項(xiàng)
- 首先需要繼承 Flink SQL UDF 體系提供的基類,每種 UDF 實(shí)現(xiàn)都有不同的基類
- 實(shí)現(xiàn) UDF 執(zhí)行邏輯函數(shù),不同類型的 UDF 需要實(shí)現(xiàn)不同的執(zhí)行邏輯函數(shù)
- 注意 UDF 入?yún)?、出參類型推?dǎo),F(xiàn)link 在一些基礎(chǔ)類型上的是可以直接推導(dǎo)出類型信息的,但是一些復(fù)雜類型就無(wú)能為力了,這里需要用戶主動(dòng)介入
- 明確 UDF 輸出結(jié)果是否是定值,如果是定值則 Flink 會(huì)在生成計(jì)劃時(shí)就執(zhí)行一遍,得出結(jié)果,然后使用這個(gè)定值的結(jié)果作為后續(xù)的執(zhí)行邏輯的參數(shù),這樣可以做到不用在 Flink SQL 任務(wù)運(yùn)行時(shí)每次都執(zhí)行一次,會(huì)有性能優(yōu)化
- 巧妙運(yùn)用運(yùn)行時(shí)上下文,可以在任務(wù)運(yùn)行前加載到一些外部資源、上下文配置信息,擴(kuò)展 UDF 能力
注意 UDF 入?yún)ⅰ⒊鰠㈩愋屯茖?dǎo)
5. SQL 標(biāo)量函數(shù)(Scalar Function)
// 有多個(gè)重載求值方法的函數(shù)
public static class OverloadedFunction extends ScalarFunction {
// 不需要任何聲明,可以直接推導(dǎo)出類型信息,即入?yún)⒑统鰠?duì)應(yīng)到 SQL 中的 bigint 類型
public Long eval(long a, long b) {
return a + b;
}
// 使用 @DataTypeHint("DECIMAL(12, 3)") 定義 decimal 的精度和小數(shù)位
public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
return BigDecimal.valueOf(a + b);
}
// 使用注解定義嵌套數(shù)據(jù)類型
@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
public Row eval(int i) {
return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
}
// 允許任意類型的輸入,并輸出序列化定制后的值
@DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return MyUtils.serializeToByteBuffer(o);
}
}
6. SQL 表值函數(shù)(Table Function)
表值函數(shù)即 UDTF,常用于進(jìn)一條數(shù)據(jù),出多條數(shù)據(jù)的場(chǎng)景
public class tableDemo6 {
public static void main(String[] args) throws Exception {
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(setting);
tableEnv.executeSql("CREATE TABLE input_table (id INT ,tag STRING) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/e.txt', 'format' = 'csv')");
tableEnv.executeSql("CREATE TABLE out_Table (id INT, tag_1 STRING, tag_2 INT ) WITH ( 'connector' = 'print' )");
tableEnv.createTemporarySystemFunction("MySplit", MyFunction.class);
tableEnv.executeSql("INSERT INTO out_Table SELECT id,tag_1, tag_2 FROM input_table , LATERAL TABLE(MySplit(tag)) ");
}
//自定義實(shí)現(xiàn)ScalarFunction
@FunctionHint(output = @DataTypeHint("ROW<tag_1 STRING, tag_2 INT>"))
public static class MyFunction extends TableFunction<Row> {
public void eval(String str) {
String[] split = str.split("\\|");
for (String s : split) {
collect(Row.of(s, s.length()));
}
}
}
}
BUG:Recovery is suppressed by NoRestartBackoffTimeStrategy
Row導(dǎo)錯(cuò)包了...
如果你是使用 Scala 實(shí)現(xiàn)函數(shù),不要使用 Scala 中 object 實(shí)現(xiàn) UDF,Scala object 是單例的,有可能會(huì)導(dǎo)致并發(fā)問(wèn)題
7. SQL 聚合函數(shù)(Aggregate Function)
聚合函數(shù)即 UDAF,常用于進(jìn)多條數(shù)據(jù),出一條數(shù)據(jù)的場(chǎng)景
- 實(shí)現(xiàn) AggregateFunction 接口,其中所有的方法必須是 public 的、非 static 的,傳一個(gè)是最終的輸出類型和中間狀態(tài)類型
- Acc聚合中間結(jié)果 createAccumulator():為當(dāng)前 Key 初始化一個(gè)空的 accumulator
- accumulate(Acc accumulator, Input輸入?yún)?shù)):對(duì)于每一行數(shù)據(jù),都會(huì)調(diào)用 accumulate() 方法來(lái)更新 accumulator,這個(gè)方法就是用于處理每一條輸入數(shù)據(jù);
- Output輸出參數(shù) getValue(Acc accumulator):通過(guò)調(diào)用 getValue 方法來(lái)計(jì)算和返回最終的結(jié)果
- retract(Acc accumulator, Input輸入?yún)?shù)):在回撤流的場(chǎng)景下必須要實(shí)現(xiàn)
- merge(Acc accumulator, Iterable<Acc> it):在許多批式聚合以及流式聚合中的 Session、Hop 窗口聚合場(chǎng)景下都是必須要實(shí)現(xiàn)的。除此之外,這個(gè)方法對(duì)于優(yōu)化也很多幫助。例如,如果你打開(kāi)了兩階段聚合優(yōu)化,就需要 AggregateFunction 實(shí)現(xiàn) merge 方法,從而可以做到在數(shù)據(jù)進(jìn)行 shuffle 前先進(jìn)行一次聚合計(jì)算。
- resetAccumulator():在批式聚合中是必須實(shí)現(xiàn)的。
public class tableDemo7 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE input_table (id INT ,money INT ,cnt INT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/f.txt', 'format' = 'csv')");
//加權(quán)平均值
tableEnv.createTemporarySystemFunction("myAvg",myAvg.class);
Table table = tableEnv.sqlQuery("SELECT id,myAvg(money,cnt) FROM input_table GROUP BY id");
tableEnv.toChangelogStream(table).print();
env.execute();
}
//最終輸出類型和中間狀態(tài)類型
public static class myAvg extends AggregateFunction<Long, avgAccumulator> {
// 獲取返回結(jié)果
@Override
public Long getValue(avgAccumulator acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}
//初始化
@Override
public avgAccumulator createAccumulator() {
return new avgAccumulator();
}
//中間狀態(tài)的計(jì)算
public void accumulate(avgAccumulator acc, Long iMoney, Integer iCnt) {
acc.sum += iMoney * iCnt;
acc.count += iCnt;
}
// Session window 可以使用這個(gè)方法將幾個(gè)單獨(dú)窗口的結(jié)果合并
public void merge(avgAccumulator acc, Iterable<avgAccumulator> it) {
for (avgAccumulator a : it) {
acc.count += a.count;
acc.sum += a.sum;
}
}
public void resetAccumulator(avgAccumulator acc) {
acc.count = 0;
acc.sum = 0L;
}
}
public static class avgAccumulator {
public long sum = 0;
public int count = 0;
}
}
突然發(fā)現(xiàn)案例都是官網(wǎng)的,哈哈哈哈
User-defined Functions | Apache Flink
8. SQL 表值聚合函數(shù)(Table Aggregate Function)
- 實(shí)現(xiàn) TableAggregateFunction 接口,其中所有的方法必須是 public 的、非 static 的
- Acc聚合中間結(jié)果 createAccumulator():為當(dāng)前 Key 初始化一個(gè)空的 accumulator,其存儲(chǔ)了聚合的中間結(jié)果
- accumulate(Acc accumulator, Input輸入?yún)?shù)):對(duì)于每一行數(shù)據(jù),都會(huì)調(diào)用 accumulate() 方法來(lái)更新 accumulator
- emitValue(Acc accumulator, Collector<OutPut> collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector<OutPut> collector):當(dāng)遍歷所有的數(shù)據(jù),當(dāng)所有的數(shù)據(jù)都處理完了之后,通過(guò)調(diào)用 emit 方法來(lái)計(jì)算和輸出最終的結(jié)果
- retract(Acc accumulator, Input輸入?yún)?shù)):在回撤流的場(chǎng)景下必須要實(shí)現(xiàn)
- merge(Acc accumulator, Iterable<Acc> it):在許多批式聚合以及流式聚合中的 Session、Hop 窗口聚合場(chǎng)景下都是必須要實(shí)現(xiàn)的。
public class tableDemo8 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE input_table (id INT ,money INT ,cnt INT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/f.txt', 'format' = 'csv')");
//加權(quán)平均值
tableEnv.createTemporarySystemFunction("top2", Top2.class);
Table table = tableEnv.from("input_table").groupBy($("id")).flatAggregate(Expressions.call("top2", $("money")).as("value", "row")).select($("id"), $("value"), $("row"));
tableEnv.toChangelogStream(table).print();
env.execute();
}
public static class Top2Accum {
public Integer first;
public Integer second;
public Integer oldFirst;
public Integer oldSecond;
}
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
//初始化
@Override
public Top2Accum createAccumulator() {
Top2Accum acc = new Top2Accum();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
acc.oldFirst = Integer.MIN_VALUE;
acc.oldSecond = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accum acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
public void emitUpdateWithRetract(Top2Accum acc, TableAggregateFunction.RetractableCollector<Tuple2<Integer, Integer>> out) {
if (!acc.first.equals(acc.oldFirst)) {
// if there is an update, retract old value then emit new value.
if (acc.oldFirst != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.oldFirst, 1));
}
out.collect(Tuple2.of(acc.first, 1));
acc.oldFirst = acc.first;
}
if (!acc.second.equals(acc.oldSecond)) {
// if there is an update, retract old value then emit new value.
if (acc.oldSecond != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.oldSecond, 2));
}
out.collect(Tuple2.of(acc.second, 2));
acc.oldSecond = acc.second;
}
}
public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
for (Top2Accum otherAcc : iterable) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
}
}
???emitUpdateWithRetract怎么用不了
9. FlinkSQL-UDF Module
目前 Flink 包含了以下三種 Module:
- CoreModule:CoreModule 是 Flink 內(nèi)置的 Module,其包含了目前 Flink 內(nèi)置的所有 UDF,F(xiàn)link 默認(rèn)開(kāi)啟的 Module 就是 CoreModule,我們可以直接使用其中的 UDF
- HiveModule:HiveModule 可以將 Hive 內(nèi)置函數(shù)作為 Flink 的系統(tǒng)函數(shù)提供給 SQL\Table API 用戶進(jìn)行使用,比如 get_json_object 這類 Hive 內(nèi)置函數(shù)(Flink 默認(rèn)的 CoreModule 是沒(méi)有的)
- 用戶自定義 Module:用戶可以實(shí)現(xiàn) Module 接口實(shí)現(xiàn)自己的 UDF 擴(kuò)展 Module
Flink SQL 支持 Hive UDF
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
10. FlinkSQL Catalog
Flink SQL 中是由 Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫(kù)、表、分區(qū)、視圖以及數(shù)據(jù)庫(kù)或其他外部系統(tǒng)中存儲(chǔ)的函數(shù)和信息。對(duì)標(biāo) Hive 去理解就是 Hive 的 MetaStore,都是用于存儲(chǔ)計(jì)算引擎涉及到的元數(shù)據(jù)信息。
目前 Flink 包含了以下四種 Catalog:
- GenericInMemoryCatalog:GenericInMemoryCatalog 是基于內(nèi)存實(shí)現(xiàn)的 Catalog,所有元數(shù)據(jù)只在 session 的生命周期(即一個(gè) Flink 任務(wù)一次運(yùn)行生命周期內(nèi))內(nèi)可用。
- JdbcCatalog:JdbcCatalog 使得用戶可以將 Flink 通過(guò) JDBC 協(xié)議連接到關(guān)系數(shù)據(jù)庫(kù)。PostgresCatalog 是當(dāng)前實(shí)現(xiàn)的唯一一種 JDBC Catalog,即可以將 Flink SQL 的預(yù)案數(shù)據(jù)存儲(chǔ)在 Postgres 中。
- HiveCatalog:HiveCatalog 有兩個(gè)用途,作為 Flink 元數(shù)據(jù)的持久化存儲(chǔ),以及作為讀寫(xiě)現(xiàn)有 Hive 元數(shù)據(jù)的接口。注意:Hive MetaStore 以小寫(xiě)形式存儲(chǔ)所有元數(shù)據(jù)對(duì)象名稱。而 GenericInMemoryCatalog 會(huì)區(qū)分大小寫(xiě)。
- 用戶自定義 Catalog:用戶可以實(shí)現(xiàn) Catalog 接口實(shí)現(xiàn)自定義 Catalog
11. FlinkSQL 任務(wù)參數(shù)配置
具體參數(shù)分為以下 3 類:
- 運(yùn)行時(shí)參數(shù):優(yōu)化 Flink SQL 任務(wù)在執(zhí)行時(shí)的任務(wù)性能
- 優(yōu)化器參數(shù):Flink SQL 任務(wù)在生成執(zhí)行計(jì)劃時(shí),經(jīng)過(guò)優(yōu)化器優(yōu)化生成更優(yōu)的執(zhí)行計(jì)劃
- 表參數(shù):用于調(diào)整 Flink SQL table 的執(zhí)行行為