Flink 開發(fā)一個簡單的應(yīng)用程序只需要構(gòu)建環(huán)境、構(gòu)建數(shù)據(jù)源、構(gòu)建數(shù)據(jù)處理方案、構(gòu)建數(shù)據(jù)輸出及執(zhí)行程序這五個步驟,但每個步驟都有對應(yīng)其他強(qiáng)大的API,所以本文一一舉例學(xué)習(xí)。
構(gòu)建環(huán)境
流處理
StreamExecutionEnvironment env = null;
// 構(gòu)建流環(huán)境,如果在本地則創(chuàng)建本地環(huán)境,如果是集群,則創(chuàng)建集群環(huán)境
env = StreamExecutionEnvironment.getExecutionEnvironment();
// 創(chuàng)建本地執(zhí)行環(huán)境并設(shè)置并行數(shù)
env = StreamExecutionEnvironment.createLocalEnvironment(3);
// 創(chuàng)建遠(yuǎn)程執(zhí)行環(huán)境,jobmanager的IP,端口,并行度,運行程序的位置
env = StreamExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");
第三種方式可以直接從本地代碼中構(gòu)建與遠(yuǎn)程集群的Flink JobManager 的RPC連接,通過指定應(yīng)用程序所在的jar包,將運行程序遠(yuǎn)程拷貝到 JobManager 節(jié)點上,然后將Flink 應(yīng)用運行在遠(yuǎn)程的環(huán)境中,本地程序相當(dāng)于一個客戶端。
批處理
ExecutionEnvironment env = null;
// 構(gòu)建流環(huán)境,如果在本地則創(chuàng)建本地環(huán)境,如果是集群,則創(chuàng)建集群環(huán)境
env = ExecutionEnvironment.getExecutionEnvironment();
// 創(chuàng)建本地執(zhí)行環(huán)境并設(shè)置并行數(shù)
env = ExecutionEnvironment.createLocalEnvironment(3);
// 創(chuàng)建遠(yuǎn)程執(zhí)行環(huán)境,jobmanager的IP,端口,并行度,運行程序的位置
env = ExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");
構(gòu)建數(shù)據(jù)源
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 從集合中讀取數(shù)據(jù)
DataStream<Integer> ds = env.fromCollection(Arrays.asList(1,2,3,4,5,6));
// 直接讀取數(shù)據(jù)
DataStream<Integer> ds1 = env.fromElements(1,2,3,4,5);
// 從文件讀取
DataStream<String> ds2 = env.readTextFile("D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
// 從 kafka 讀取
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.240.30.104:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "0");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("topic1", new SimpleStringSchema(),properties);
DataStream<String> ds3 = env.addSource(myConsumer);
// 自定義數(shù)據(jù)源
DataStream<String> ds4 = env.addSource(new CustomizeSource());
自定義數(shù)據(jù)源
package com.example.demo;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
* @author big uncle
* @date 2021/6/3 13:54
* @module
**/
public class CustomizeSource implements SourceFunction<String> {
private boolean running = true;
/**
* 讀取數(shù)據(jù)
**/
@Override
public void run(SourceContext<String> ctx) throws Exception {
String str = "a-b-c:";
int i =0;
while (running){
i++;
ctx.collect(str+i);
Thread.sleep(1000);
}
}
/**
* 關(guān)閉
**/
@Override
public void cancel() {
running = false;
}
}
Transform
map fliter flatmap
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("a,b,c,d,e,1,2,3,4");
// 讀取值是 "a,b,c,d,e,1,2,3,4" 轉(zhuǎn)變?yōu)?一個個的元素
dataStream.flatMap(new FlatMapFunction<String, Object>() {
@Override
public void flatMap(String s, Collector<Object> collector) throws Exception {
String[] str = s.split(",");
for(String ss : str) {
collector.collect(ss);
}
}
})
// 為 true 才會輸出,過濾掉不是 a 的值
.filter(i -> i.equals("a"))
// 轉(zhuǎn)黃給 a 添加 字符串 0
.map(i -> i+"0")
// 輸出結(jié)果為 a0
.print();
env.execute("a");
}
key
所有的聚合操作只能在 key 分組(分區(qū))之后。sum()、min()、max()、minBy()、maxBy()、reduce() 比較器
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
dataStream.keyBy(new KeySelector<Integer, Object>() {
@Override
// 把所有數(shù)據(jù)放到一個分區(qū) 0
public Object getKey(Integer integer) throws Exception {
return 0;
}
})
// 對 分區(qū)進(jìn)行聚合,聚合下標(biāo)為 0,注意這里是下標(biāo),最后輸出數(shù)據(jù)139,會顯示累加值
.sum(0).print();
env.execute("test");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
// keyBy 可以根據(jù)位置,也可以根據(jù)字段
dataStream.keyBy(new KeySelector<Integer, Object>() {
@Override
// 把所有數(shù)據(jù)放到一個分區(qū) 0
public Object getKey(Integer integer) throws Exception {
return 0;
}
})
// 注意這里是下標(biāo),也可以是字段名
.min(0).print();
env.execute("test");
}
min()、minBy() ,針對元祖和對象而言,min 會對比某個字段的大小,但不會更新其他字段值,minBy() 會更新其他字段值。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
// keyBy 可以根據(jù)位置,也可以根據(jù)字段
dataStream.keyBy(new KeySelector<Integer, Object>() {
@Override
// 把所有數(shù)據(jù)放到一個分區(qū) 0
public Object getKey(Integer integer) throws Exception {
return 0;
}
})
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer t1, Integer t2) throws Exception {
return t2 - t1 > 0 ? t2 : t1;
}
}).print();
env.execute("test");
}
reduce 可以比較兩個值,更自由一些
split 和 Select (OutputTag)Connect和CoMap以及Union
從代碼中可以看出幫我們收集了所有數(shù)據(jù),我們可以在做其他操作,多用于做數(shù)據(jù)標(biāo)簽。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Object> stream = dataStream.process(new ProcessFunction<Integer, Object>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Object> out) throws Exception {
if(value > 10) {
out.collect(value);
}
ctx.output(outputTag,"其他類型的流"+value.toString());
}
});
stream.print("原始數(shù)據(jù)進(jìn)行過濾后:");
DataStream<String> sideOutputStream = stream.getSideOutput(outputTag);
sideOutputStream.print("標(biāo)簽處理:");
// 合流
sideOutputStream.connect(stream).flatMap(new CoFlatMapFunction<String,Object,Integer>() {
// 第一個流如何處理
@Override
public void flatMap1(String value, Collector out) throws Exception {
String str = value.toString();
str = str.replaceAll("其他類型的流","");
out.collect(Integer.valueOf(str));
}
// 第二流如何處理
@Override
public void flatMap2(Object value, Collector out) throws Exception {
out.collect(Integer.valueOf(value.toString()));
}
}).keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return 0;
}
}).sum(0).print("合并后輸出:");
env.execute("test");
}
標(biāo)簽處理::1> 其他類型的流3
標(biāo)簽處理::2> 其他類型的流7
標(biāo)簽處理::3> 其他類型的流1
標(biāo)簽處理::7> 其他類型的流10
合并后輸出::6> 1
合并后輸出::6> 8
合并后輸出::6> 11
合并后輸出::6> 21
原始數(shù)據(jù)進(jìn)行過濾后::8> 27
原始數(shù)據(jù)進(jìn)行過濾后::6> 19
標(biāo)簽處理::8> 其他類型的流27
原始數(shù)據(jù)進(jìn)行過濾后::5> 22
原始數(shù)據(jù)進(jìn)行過濾后::4> 50
標(biāo)簽處理::5> 其他類型的流22
標(biāo)簽處理::6> 其他類型的流19
標(biāo)簽處理::4> 其他類型的流50
合并后輸出::6> 48
合并后輸出::6> 75
合并后輸出::6> 97
合并后輸出::6> 119
合并后輸出::6> 138
合并后輸出::6> 157
合并后輸出::6> 207
合并后輸出::6> 257
該方法只能在以下處理時使用。
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
flink 1.12 沒有 Split 和 Select,換成了 OutputTag
Union 要求多條流合并,必須是相同的數(shù)據(jù)類型,比較簡單,這里就不說了。
分區(qū)
keyBy 默認(rèn)是按照 hash 分區(qū),除了KeyBy 還有其他的分區(qū)方式。
- broadcast() 廣播,給下游分區(qū)全部廣播同一份數(shù)據(jù)
- shuffle() 隨機(jī)把當(dāng)前數(shù)據(jù)發(fā)配個下游分區(qū)
- foeward() 只放在當(dāng)前分區(qū)做計算
- rebalance() 輪詢給下游分區(qū)發(fā)數(shù)據(jù)
- rescale() 給相同分組的下游分區(qū)輪詢發(fā)數(shù)據(jù)
- global() 丟給下游分區(qū)的第一個實例
- partitionCustom() 用戶自定義重分區(qū)方式
Sink 輸出
kafka
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
dataStream.addSink(new FlinkKafkaProducer("topic-test",new SimpleStringSchema(),new Properties()));
env.execute("test");
}
自定義
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
dataStream.addSink(new MySink()).setParallelism(1);
env.execute("test");
}
public static class MySink extends RichSinkFunction<Integer> {
// 不建議 在open 和 clos 寫類似數(shù)據(jù)庫連接 把握不好..死翹翹,但可以在 open 和 clous 檢查連接狀態(tài),或有鏈接池 獲取也行
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open 有多少數(shù)據(jù)唄調(diào)用多少次");
}
@Override
public void close() throws Exception {
System.out.println("close 有多少數(shù)據(jù)唄調(diào)用多少次");
}
@Override
public void invoke(Integer value, Context context) throws Exception {
// 輸出數(shù)據(jù) 可以用來寫 到 redis kafka hdfs 等
System.out.println(value);
}
}
想要直到 flink 支持更多的 source 或 sink 可以去 maven庫里 搜索 flink connector
window
window 就是將無限流切割為有限流的一種方式,它會將流數(shù)據(jù)分發(fā)到有限大小的桶(bucket)中進(jìn)行分析。window 不是等有數(shù)據(jù)才去做窗口,而是有一個桶,等數(shù)據(jù),桶可以存在多個。只要桶不關(guān)閉,哪怕晚遲到的數(shù)據(jù),依然可以被分配到對應(yīng)的桶里。

基本類型
時間窗口(Time Window)
按時間段做為窗口,如8:01~8:02 為一個窗口
時間窗口分為:滾動時間窗口(TumBlingWindows)、滑動時間窗口(SlidingWindows)、會話窗口(Session Windows)
-
滾動時間窗口:時間是對齊的,如8:00~9:00 下一個窗口就是 9:00~10:00,數(shù)據(jù)不會重疊(不重復(fù)數(shù)據(jù)),如果數(shù)據(jù)剛好在整點,按照 [ ) 左開右閉,會屬于新時間。
-
滑動時間窗口:窗口長度固定,可以有重疊(重復(fù)數(shù)據(jù))
-
會話窗口:一定時間沒有收到數(shù)據(jù)會話結(jié)束,有新的數(shù)據(jù)來,開啟新的會話。session gap 為最小的間隔。
計數(shù)窗口(Count Window)
按多少各數(shù)做為一個窗口,如10個數(shù) 為一個窗口。
計數(shù)窗口分為:滾動計數(shù)窗口、滑動計數(shù)窗口
代碼演示
如果想要調(diào)用時間窗口,必須在 keyBy 之后,但也可以直接使用 windowALL() 方法,該方法把所有數(shù)據(jù)放到一個分區(qū),并行度為一。
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
// 滾動窗口,可以看這個方法的實現(xiàn)類
dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(15)));
// session 窗口
dataStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(1)));
// 滾動窗口
dataStream.keyBy(0).timeWindow(Time.seconds(15));
// 滑動窗口
dataStream.keyBy(0).timeWindow(Time.seconds(20),Time.seconds(5));
// 滑動窗口
dataStream.keyBy(0).window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5)));
// 滾動計數(shù)窗口
dataStream.keyBy(0).countWindow(10);
// 滑動計數(shù)窗口
dataStream.keyBy(0).countWindow(10,3);
}
窗口函數(shù)
增量聚合函數(shù)
每條數(shù)據(jù)到來進(jìn)行計算,保持一個簡單的狀態(tài)。一定會等到時間點(窗口結(jié)束點)到才會輸出結(jié)果。這種窗口可以做一些max、min等比較,累計的函數(shù)
- ResuceFunction
- AggregateFunction
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
// 滑動窗口
dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return 0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// .reduce(new ReduceFunction<Integer>() {
// // 比較兩個數(shù)大小等 可以實現(xiàn) max min sum 等操作
// @Override
// public Integer reduce(Integer value1, Integer value2) throws Exception {
// return null;
// }
// });
// 實現(xiàn)一個計數(shù)器,統(tǒng)計有多少個數(shù)據(jù)
.aggregate(new AggregateFunction<Integer, Integer, Integer>() {
// 創(chuàng)建一個累加器,初始值
@Override
public Integer createAccumulator() {
return 0;
}
// 累加規(guī)則
@Override
public Integer add(Integer value, Integer accumulator) {
return accumulator + 1;
}
// 輸出結(jié)果
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
// merge一般使用的是 session window 做合并操作
@Override
public Integer merge(Integer a, Integer b) {
return null;
}
}).print();
env.execute("test");
}
全窗口函數(shù)
先把窗口所有數(shù)據(jù)收集起來,等到計算的時候會便利所有數(shù)據(jù)。這種窗口可以做一些中位數(shù)或平均數(shù)等操作。
- ProcessWindowFunction
- WindowFunction
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
// 滑動窗口
dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return 0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 攢齊5s 的數(shù)據(jù),求平均值
// 參數(shù)1 輸入類型,2 輸出類型,3 key的類型,4 就是個TimeWindow
.apply(new WindowFunction<Integer, Integer, Object, TimeWindow>() {
// 參數(shù)1 key的類型,2 就是window,3 所有輸入的數(shù)據(jù),4 輸出收集器
@Override
public void apply(Object o, TimeWindow window, Iterable<Integer> input, Collector<Integer> out) throws Exception {
List<Integer> list = IteratorUtils.toList(input.iterator());
int count = list.size();
int sum = list.stream().reduce((i,y) -> i+y).get();
out.collect(sum/count);
}
}).print();
env.execute("test");
}
把 apply 換成 process 是一樣的。
.process(new ProcessWindowFunction<Integer, Integer, Object, TimeWindow>() {
// context 會拿到的信息比 window 多,包含了 window
@Override
public void process(Object o, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
List<Integer> list = IteratorUtils.toList(elements.iterator());
int count = list.size();
int sum = list.stream().reduce((i,y) -> i+y).get();
out.collect(sum/count);
}
}).print();
在滑動時間窗口,或滑動計數(shù)中,如下:

我們會想滑動計數(shù),應(yīng)該是10個數(shù),來2個滑動一次,但實際流操作中,當(dāng)你輸入2個數(shù),就會輸出一次,如下:

假設(shè)我們的程序是算平均值,第一次滑動是 (27+10)/2 第二次滑動則是 (27+10+3+7)/4 依此類推。這種方式是等補(bǔ)齊滑動窗口的長度,才會全額計算,就是10個一算。
其他可選API
- trigger() 觸發(fā)器:定義windows什么時候關(guān)閉,觸發(fā)計算并輸出結(jié)果,分兩步的原因是,考慮數(shù)據(jù)會遲到,遲到的數(shù)據(jù)到windows關(guān)閉的時間,可以先輸出結(jié)果,在等等認(rèn)為會遲到的數(shù)據(jù),然后關(guān)閉。
- evictor 移除器:定義某些數(shù)據(jù)的邏輯。
- allowedLateness() 允許處理遲到的數(shù)據(jù),5秒后輸出結(jié)果,等1分鐘才關(guān)閉窗口,但1分鐘內(nèi)該窗口會實時更新數(shù)據(jù),不斷輸出新邏輯數(shù)據(jù)。
window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.minutes(1));
- sideOutputLateData() 將遲到的數(shù)據(jù)放入側(cè)輸出流
- getSideOutput() 獲取側(cè)輸出流
// 標(biāo)記輸出流(側(cè)輸出流)
OutputTag<Integer> outputTag = new OutputTag<Integer>("late"){};
SingleOutputStreamOperator<Integer> ds = dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return 0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.minutes(1))
// 如果1分鐘后還有遲到數(shù)據(jù),就放到一個側(cè)輸出流里,給一個標(biāo)記,也可以不搭配 allowedLateness() 使用
.sideOutputLateData(outputTag)
.sum(0);
ds.getSideOutput(outputTag).print();
這種等的方式,遲到數(shù)據(jù)底屬于 1窗口 還是屬于 2窗口呢?那就需要和數(shù)據(jù)本身的時間作比較了,所以以上只是API的調(diào)用,但不建議這樣使用,針對遲到數(shù)據(jù)我們一定會用遲到數(shù)據(jù)的本身時間,來決定他在哪個窗口。
ProcessFunction API
ProcessFunction 可以訪問時間戳、watermark以及注冊定時事件。還可以輸出特定的一些事件,例如超時事件等。ProcessFunction用來構(gòu)建事件驅(qū)動的應(yīng)用以及實現(xiàn)自定義的業(yè)務(wù)邏輯(使用之前的window函數(shù)和轉(zhuǎn)換算子、Richfunction都無法實現(xiàn))。例如,flink sql 就是使用ProcessFunction 實現(xiàn)的。
flink 提供了8個 ProcessFunction
- ProcessFunction
- KeyedProcessFunction 分組之后得到KeyedStream,調(diào)用該 ProcessFunction
- CoProcessFunction
- ProcessJooinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
KeyedProcessFunction
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
// 數(shù)據(jù)轉(zhuǎn)換
DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
@Override
public EventData map(String value) throws Exception {
String[] strs = value.split(",");
return new EventData(
strs[0],
Long.valueOf(strs[1]),
String.valueOf((Long.valueOf(strs[1])-5)),
Integer.valueOf(strs[3])
);
}
});
stream.keyBy(new KeySelector<EventData, String>() {
@Override
public String getKey(EventData value) throws Exception {
return value.getId();
}
}).process(new MyProcess()).print();
env.execute("test");
}
public static class MyProcess extends KeyedProcessFunction<String,EventData,Integer> {
@Override
public void processElement(EventData value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.hashCode());
// context
ctx.timestamp(); //獲取數(shù)據(jù)時間戳
ctx.getCurrentKey();// 獲取key
//ctx.output();//測輸出流,可以通過構(gòu)造函數(shù)吧 測輸出流傳入
ctx.timerService().currentProcessingTime();// 獲取處理時間(屬于時間語義的一種)
ctx.timerService().currentWatermark();// 獲取水位(事件時間)(屬于時間語義的一種)
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);//處理時間
// 事件時間,讓定時器 延時10s 處理
ctx.timerService().registerEventTimeTimer((value.getEventTime() + 10) * 1000L);
// 刪除處理定時器,根據(jù)相同的時間為標(biāo)準(zhǔn),有多個定時器,每個定時器的時間肯定不同,根據(jù)時間進(jìn)行刪除
ctx.timerService().deleteProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);
ctx.timerService().deleteEventTimeTimer((value.getEventTime() + 10) * 1000L);
}
// 觸發(fā)定時器 timestamp,就是觸發(fā)時間
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
super.onTimer(timestamp, ctx, out);
ctx.getCurrentKey();
// ctx.output();
// 查看基于什么類型的時間
ctx.timeDomain();
}
}
(1)處理時間——調(diào)? ctx.timerService().registerProcessingTimeTimer()注冊;onTimer()在系統(tǒng)時間戳達(dá)到Timer設(shè)定的時間戳?xí)r觸發(fā)。
(2)事件時間——調(diào)? ctx.timerService().registerEventTimeTimer()注冊;onTimer()在Flink內(nèi)部?印達(dá)到或超過Timer設(shè)定的時間戳?xí)r觸發(fā)。


