Flink-1.12(四)Flink API

Flink 開發(fā)一個簡單的應(yīng)用程序只需要構(gòu)建環(huán)境、構(gòu)建數(shù)據(jù)源、構(gòu)建數(shù)據(jù)處理方案、構(gòu)建數(shù)據(jù)輸出及執(zhí)行程序這五個步驟,但每個步驟都有對應(yīng)其他強(qiáng)大的API,所以本文一一舉例學(xué)習(xí)。

構(gòu)建環(huán)境

流處理

StreamExecutionEnvironment env = null;
// 構(gòu)建流環(huán)境,如果在本地則創(chuàng)建本地環(huán)境,如果是集群,則創(chuàng)建集群環(huán)境
env  = StreamExecutionEnvironment.getExecutionEnvironment();
// 創(chuàng)建本地執(zhí)行環(huán)境并設(shè)置并行數(shù)
env = StreamExecutionEnvironment.createLocalEnvironment(3);
// 創(chuàng)建遠(yuǎn)程執(zhí)行環(huán)境,jobmanager的IP,端口,并行度,運行程序的位置
env = StreamExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");

第三種方式可以直接從本地代碼中構(gòu)建與遠(yuǎn)程集群的Flink JobManager 的RPC連接,通過指定應(yīng)用程序所在的jar包,將運行程序遠(yuǎn)程拷貝到 JobManager 節(jié)點上,然后將Flink 應(yīng)用運行在遠(yuǎn)程的環(huán)境中,本地程序相當(dāng)于一個客戶端。

批處理

ExecutionEnvironment env = null;
// 構(gòu)建流環(huán)境,如果在本地則創(chuàng)建本地環(huán)境,如果是集群,則創(chuàng)建集群環(huán)境
env  = ExecutionEnvironment.getExecutionEnvironment();
// 創(chuàng)建本地執(zhí)行環(huán)境并設(shè)置并行數(shù)
env = ExecutionEnvironment.createLocalEnvironment(3);
// 創(chuàng)建遠(yuǎn)程執(zhí)行環(huán)境,jobmanager的IP,端口,并行度,運行程序的位置
env = ExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");

構(gòu)建數(shù)據(jù)源

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 從集合中讀取數(shù)據(jù)
        DataStream<Integer> ds = env.fromCollection(Arrays.asList(1,2,3,4,5,6));
        // 直接讀取數(shù)據(jù)
        DataStream<Integer> ds1 = env.fromElements(1,2,3,4,5);
        // 從文件讀取
        DataStream<String> ds2 = env.readTextFile("D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
        // 從 kafka 讀取
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "10.240.30.104:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.commit.interval.ms", "0");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("topic1", new SimpleStringSchema(),properties);
        DataStream<String> ds3 = env.addSource(myConsumer);
        // 自定義數(shù)據(jù)源
        DataStream<String> ds4 = env.addSource(new CustomizeSource());

自定義數(shù)據(jù)源

package com.example.demo;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 * @author big uncle
 * @date 2021/6/3 13:54
 * @module
 **/
public class CustomizeSource implements SourceFunction<String> {

    private boolean running = true;

    /**
     * 讀取數(shù)據(jù)
    **/
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        String str = "a-b-c:";
        int i =0;
        while (running){
            i++;
            ctx.collect(str+i);
            Thread.sleep(1000);
        }
    }

    /**
     * 關(guān)閉
    **/
    @Override
    public void cancel() {
        running = false;
    }
}

Transform

map fliter flatmap

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.fromElements("a,b,c,d,e,1,2,3,4");
        // 讀取值是 "a,b,c,d,e,1,2,3,4" 轉(zhuǎn)變?yōu)?一個個的元素
        dataStream.flatMap(new FlatMapFunction<String, Object>() {
            @Override
            public void flatMap(String s, Collector<Object> collector) throws Exception {
                String[] str = s.split(",");
                for(String ss : str) {
                    collector.collect(ss);
                }
            }
        })
        // 為 true 才會輸出,過濾掉不是 a 的值
        .filter(i -> i.equals("a"))
        // 轉(zhuǎn)黃給 a 添加 字符串 0
        .map(i -> i+"0")
        // 輸出結(jié)果為 a0
        .print();
        env.execute("a");
    }

key

所有的聚合操作只能在 key 分組(分區(qū))之后。sum()、min()、max()、minBy()、maxBy()、reduce() 比較器

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
        
        dataStream.keyBy(new KeySelector<Integer, Object>() {
            @Override
            // 把所有數(shù)據(jù)放到一個分區(qū) 0
            public Object getKey(Integer integer) throws Exception {
                return 0;
            }
        })
        // 對 分區(qū)進(jìn)行聚合,聚合下標(biāo)為 0,注意這里是下標(biāo),最后輸出數(shù)據(jù)139,會顯示累加值
        .sum(0).print();
        env.execute("test");
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);

        // keyBy 可以根據(jù)位置,也可以根據(jù)字段
        dataStream.keyBy(new KeySelector<Integer, Object>() {
            @Override
            // 把所有數(shù)據(jù)放到一個分區(qū) 0
            public Object getKey(Integer integer) throws Exception {
                return 0;
            }
        })
        // 注意這里是下標(biāo),也可以是字段名
        .min(0).print();
        env.execute("test");
    }

min()、minBy() ,針對元祖和對象而言,min 會對比某個字段的大小,但不會更新其他字段值,minBy() 會更新其他字段值。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);

        // keyBy 可以根據(jù)位置,也可以根據(jù)字段
        dataStream.keyBy(new KeySelector<Integer, Object>() {
            @Override
            // 把所有數(shù)據(jù)放到一個分區(qū) 0
            public Object getKey(Integer integer) throws Exception {
                return 0;
            }
        })
        .reduce(new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer t1, Integer t2) throws Exception {
                return t2 - t1 > 0 ? t2 : t1;
            }
        }).print();
        env.execute("test");
    }

reduce 可以比較兩個值,更自由一些

split 和 Select (OutputTag)Connect和CoMap以及Union

從代碼中可以看出幫我們收集了所有數(shù)據(jù),我們可以在做其他操作,多用于做數(shù)據(jù)標(biāo)簽。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);

        final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

        SingleOutputStreamOperator<Object> stream = dataStream.process(new ProcessFunction<Integer, Object>() {

            @Override
            public void processElement(Integer value, Context ctx, Collector<Object> out) throws Exception {
                if(value > 10) {
                    out.collect(value);
                }
                ctx.output(outputTag,"其他類型的流"+value.toString());
            }
        });
        stream.print("原始數(shù)據(jù)進(jìn)行過濾后:");
        DataStream<String> sideOutputStream = stream.getSideOutput(outputTag);
        sideOutputStream.print("標(biāo)簽處理:");

        // 合流
        sideOutputStream.connect(stream).flatMap(new CoFlatMapFunction<String,Object,Integer>() {

            // 第一個流如何處理
            @Override
            public void flatMap1(String value, Collector out) throws Exception {
                String str = value.toString();
                str = str.replaceAll("其他類型的流","");
                out.collect(Integer.valueOf(str));
            }
            // 第二流如何處理
            @Override
            public void flatMap2(Object value, Collector out) throws Exception {
                out.collect(Integer.valueOf(value.toString()));
            }

        }).keyBy(new KeySelector<Integer, Object>() {
            @Override
            public Object getKey(Integer value) throws Exception {
                return 0;
            }
        }).sum(0).print("合并后輸出:");

        env.execute("test");
    }
標(biāo)簽處理::1> 其他類型的流3
標(biāo)簽處理::2> 其他類型的流7
標(biāo)簽處理::3> 其他類型的流1
標(biāo)簽處理::7> 其他類型的流10
合并后輸出::6> 1
合并后輸出::6> 8
合并后輸出::6> 11
合并后輸出::6> 21
原始數(shù)據(jù)進(jìn)行過濾后::8> 27
原始數(shù)據(jù)進(jìn)行過濾后::6> 19
標(biāo)簽處理::8> 其他類型的流27
原始數(shù)據(jù)進(jìn)行過濾后::5> 22
原始數(shù)據(jù)進(jìn)行過濾后::4> 50
標(biāo)簽處理::5> 其他類型的流22
標(biāo)簽處理::6> 其他類型的流19
標(biāo)簽處理::4> 其他類型的流50
合并后輸出::6> 48
合并后輸出::6> 75
合并后輸出::6> 97
合并后輸出::6> 119
合并后輸出::6> 138
合并后輸出::6> 157
合并后輸出::6> 207
合并后輸出::6> 257

該方法只能在以下處理時使用。

flink 1.12 沒有 Split 和 Select,換成了 OutputTag

Union 要求多條流合并,必須是相同的數(shù)據(jù)類型,比較簡單,這里就不說了。

分區(qū)

keyBy 默認(rèn)是按照 hash 分區(qū),除了KeyBy 還有其他的分區(qū)方式。

  • broadcast() 廣播,給下游分區(qū)全部廣播同一份數(shù)據(jù)
  • shuffle() 隨機(jī)把當(dāng)前數(shù)據(jù)發(fā)配個下游分區(qū)
  • foeward() 只放在當(dāng)前分區(qū)做計算
  • rebalance() 輪詢給下游分區(qū)發(fā)數(shù)據(jù)
  • rescale() 給相同分組的下游分區(qū)輪詢發(fā)數(shù)據(jù)
  • global() 丟給下游分區(qū)的第一個實例
  • partitionCustom() 用戶自定義重分區(qū)方式

Sink 輸出

kafka

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
        dataStream.addSink(new FlinkKafkaProducer("topic-test",new SimpleStringSchema(),new Properties()));
        env.execute("test");
    }

自定義

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
        dataStream.addSink(new MySink()).setParallelism(1);
        env.execute("test");
    }

    public static class MySink extends RichSinkFunction<Integer> {
        // 不建議 在open 和 clos 寫類似數(shù)據(jù)庫連接 把握不好..死翹翹,但可以在 open 和 clous 檢查連接狀態(tài),或有鏈接池 獲取也行
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("open 有多少數(shù)據(jù)唄調(diào)用多少次");
        }

        @Override
        public void close() throws Exception {
            System.out.println("close 有多少數(shù)據(jù)唄調(diào)用多少次");
        }

        @Override
        public void invoke(Integer value, Context context) throws Exception {
            // 輸出數(shù)據(jù) 可以用來寫 到 redis kafka  hdfs 等
            System.out.println(value);
        }
    }

想要直到 flink 支持更多的 source 或 sink 可以去 maven庫里 搜索 flink connector

window

window 就是將無限流切割為有限流的一種方式,它會將流數(shù)據(jù)分發(fā)到有限大小的桶(bucket)中進(jìn)行分析。window 不是等有數(shù)據(jù)才去做窗口,而是有一個桶,等數(shù)據(jù),桶可以存在多個。只要桶不關(guān)閉,哪怕晚遲到的數(shù)據(jù),依然可以被分配到對應(yīng)的桶里。

基本類型

時間窗口(Time Window)

按時間段做為窗口,如8:01~8:02 為一個窗口
時間窗口分為:滾動時間窗口(TumBlingWindows)、滑動時間窗口(SlidingWindows)、會話窗口(Session Windows)

  • 滾動時間窗口:時間是對齊的,如8:00~9:00 下一個窗口就是 9:00~10:00,數(shù)據(jù)不會重疊(不重復(fù)數(shù)據(jù)),如果數(shù)據(jù)剛好在整點,按照 [ ) 左開右閉,會屬于新時間。


  • 滑動時間窗口:窗口長度固定,可以有重疊(重復(fù)數(shù)據(jù))


  • 會話窗口:一定時間沒有收到數(shù)據(jù)會話結(jié)束,有新的數(shù)據(jù)來,開啟新的會話。session gap 為最小的間隔。


計數(shù)窗口(Count Window)

按多少各數(shù)做為一個窗口,如10個數(shù) 為一個窗口。
計數(shù)窗口分為:滾動計數(shù)窗口、滑動計數(shù)窗口

代碼演示

如果想要調(diào)用時間窗口,必須在 keyBy 之后,但也可以直接使用 windowALL() 方法,該方法把所有數(shù)據(jù)放到一個分區(qū),并行度為一。

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
        // 滾動窗口,可以看這個方法的實現(xiàn)類
        dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(15)));
        // session 窗口
        dataStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(1)));
        // 滾動窗口
        dataStream.keyBy(0).timeWindow(Time.seconds(15));
        // 滑動窗口
        dataStream.keyBy(0).timeWindow(Time.seconds(20),Time.seconds(5));
        // 滑動窗口
        dataStream.keyBy(0).window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5)));
        // 滾動計數(shù)窗口
        dataStream.keyBy(0).countWindow(10);
        // 滑動計數(shù)窗口
        dataStream.keyBy(0).countWindow(10,3);
    }

窗口函數(shù)

增量聚合函數(shù)

每條數(shù)據(jù)到來進(jìn)行計算,保持一個簡單的狀態(tài)。一定會等到時間點(窗口結(jié)束點)到才會輸出結(jié)果。這種窗口可以做一些max、min等比較,累計的函數(shù)

  • ResuceFunction
  • AggregateFunction
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
        // 滑動窗口
        dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
            @Override
            public Object getKey(Integer value) throws Exception {
                return 0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//               .reduce(new ReduceFunction<Integer>() {
//                   // 比較兩個數(shù)大小等 可以實現(xiàn) max min sum 等操作
//                   @Override
//                   public Integer reduce(Integer value1, Integer value2) throws Exception {
//                       return null;
//                   }
//               });
                // 實現(xiàn)一個計數(shù)器,統(tǒng)計有多少個數(shù)據(jù)
                .aggregate(new AggregateFunction<Integer, Integer, Integer>() {
                    // 創(chuàng)建一個累加器,初始值
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    // 累加規(guī)則
                    @Override
                    public Integer add(Integer value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    // 輸出結(jié)果
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    // merge一般使用的是 session window 做合并操作
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return null;
                    }
                }).print();
        env.execute("test");
    }
全窗口函數(shù)

先把窗口所有數(shù)據(jù)收集起來,等到計算的時候會便利所有數(shù)據(jù)。這種窗口可以做一些中位數(shù)或平均數(shù)等操作。

  • ProcessWindowFunction
  • WindowFunction
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
        // 滑動窗口
        dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
            @Override
            public Object getKey(Integer value) throws Exception {
                return 0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 攢齊5s 的數(shù)據(jù),求平均值
                // 參數(shù)1 輸入類型,2 輸出類型,3 key的類型,4 就是個TimeWindow
                .apply(new WindowFunction<Integer, Integer, Object, TimeWindow>() {
                    // 參數(shù)1 key的類型,2 就是window,3 所有輸入的數(shù)據(jù),4 輸出收集器
                    @Override
                    public void apply(Object o, TimeWindow window, Iterable<Integer> input, Collector<Integer> out) throws Exception {
                        List<Integer> list = IteratorUtils.toList(input.iterator());
                        int count = list.size();
                        int sum = list.stream().reduce((i,y) -> i+y).get();
                        out.collect(sum/count);
                    }
                }).print();

        env.execute("test");
    }

把 apply 換成 process 是一樣的。

                .process(new ProcessWindowFunction<Integer, Integer, Object, TimeWindow>() {
                    // context 會拿到的信息比 window 多,包含了 window
                    @Override
                    public void process(Object o, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
                        List<Integer> list = IteratorUtils.toList(elements.iterator());
                        int count = list.size();
                        int sum = list.stream().reduce((i,y) -> i+y).get();
                        out.collect(sum/count);
                    }
                }).print();

在滑動時間窗口,或滑動計數(shù)中,如下:

我們會想滑動計數(shù),應(yīng)該是10個數(shù),來2個滑動一次,但實際流操作中,當(dāng)你輸入2個數(shù),就會輸出一次,如下:

假設(shè)我們的程序是算平均值,第一次滑動是 (27+10)/2 第二次滑動則是 (27+10+3+7)/4 依此類推。這種方式是等補(bǔ)齊滑動窗口的長度,才會全額計算,就是10個一算。

其他可選API
  • trigger() 觸發(fā)器:定義windows什么時候關(guān)閉,觸發(fā)計算并輸出結(jié)果,分兩步的原因是,考慮數(shù)據(jù)會遲到,遲到的數(shù)據(jù)到windows關(guān)閉的時間,可以先輸出結(jié)果,在等等認(rèn)為會遲到的數(shù)據(jù),然后關(guān)閉。
  • evictor 移除器:定義某些數(shù)據(jù)的邏輯。
  • allowedLateness() 允許處理遲到的數(shù)據(jù),5秒后輸出結(jié)果,等1分鐘才關(guān)閉窗口,但1分鐘內(nèi)該窗口會實時更新數(shù)據(jù),不斷輸出新邏輯數(shù)據(jù)。
window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.minutes(1));
  • sideOutputLateData() 將遲到的數(shù)據(jù)放入側(cè)輸出流
  • getSideOutput() 獲取側(cè)輸出流
        // 標(biāo)記輸出流(側(cè)輸出流)
        OutputTag<Integer> outputTag = new OutputTag<Integer>("late"){};

        SingleOutputStreamOperator<Integer> ds = dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
            @Override
            public Object getKey(Integer value) throws Exception {
                return 0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.minutes(1))
                // 如果1分鐘后還有遲到數(shù)據(jù),就放到一個側(cè)輸出流里,給一個標(biāo)記,也可以不搭配 allowedLateness() 使用
                .sideOutputLateData(outputTag)
                .sum(0);

        ds.getSideOutput(outputTag).print();

這種等的方式,遲到數(shù)據(jù)底屬于 1窗口 還是屬于 2窗口呢?那就需要和數(shù)據(jù)本身的時間作比較了,所以以上只是API的調(diào)用,但不建議這樣使用,針對遲到數(shù)據(jù)我們一定會用遲到數(shù)據(jù)的本身時間,來決定他在哪個窗口。

ProcessFunction API

ProcessFunction 可以訪問時間戳、watermark以及注冊定時事件。還可以輸出特定的一些事件,例如超時事件等。ProcessFunction用來構(gòu)建事件驅(qū)動的應(yīng)用以及實現(xiàn)自定義的業(yè)務(wù)邏輯(使用之前的window函數(shù)和轉(zhuǎn)換算子、Richfunction都無法實現(xiàn))。例如,flink sql 就是使用ProcessFunction 實現(xiàn)的。

flink 提供了8個 ProcessFunction

  • ProcessFunction
  • KeyedProcessFunction 分組之后得到KeyedStream,調(diào)用該 ProcessFunction
  • CoProcessFunction
  • ProcessJooinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

KeyedProcessFunction

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
        // 數(shù)據(jù)轉(zhuǎn)換
        DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
            @Override
            public EventData map(String value) throws Exception {
                String[] strs = value.split(",");
                return new EventData(
                        strs[0],
                        Long.valueOf(strs[1]),
                        String.valueOf((Long.valueOf(strs[1])-5)),
                        Integer.valueOf(strs[3])
                );
            }
        });
        stream.keyBy(new KeySelector<EventData, String>() {
            @Override
            public String getKey(EventData value) throws Exception {
                return value.getId();
            }
        }).process(new MyProcess()).print();
        env.execute("test");
    }

    public static class MyProcess extends KeyedProcessFunction<String,EventData,Integer> {
        @Override
        public void processElement(EventData value, Context ctx, Collector<Integer> out) throws Exception {
            out.collect(value.hashCode());
            // context
            ctx.timestamp(); //獲取數(shù)據(jù)時間戳
            ctx.getCurrentKey();// 獲取key
            //ctx.output();//測輸出流,可以通過構(gòu)造函數(shù)吧 測輸出流傳入
            ctx.timerService().currentProcessingTime();// 獲取處理時間(屬于時間語義的一種)
            ctx.timerService().currentWatermark();// 獲取水位(事件時間)(屬于時間語義的一種)
            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);//處理時間
            // 事件時間,讓定時器 延時10s 處理
            ctx.timerService().registerEventTimeTimer((value.getEventTime() + 10) * 1000L);
            // 刪除處理定時器,根據(jù)相同的時間為標(biāo)準(zhǔn),有多個定時器,每個定時器的時間肯定不同,根據(jù)時間進(jìn)行刪除
            ctx.timerService().deleteProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);
            ctx.timerService().deleteEventTimeTimer((value.getEventTime() + 10) * 1000L);
        }
        // 觸發(fā)定時器 timestamp,就是觸發(fā)時間
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            ctx.getCurrentKey();
//            ctx.output();
            // 查看基于什么類型的時間
            ctx.timeDomain();
        }
    }

(1)處理時間——調(diào)? ctx.timerService().registerProcessingTimeTimer()注冊;onTimer()在系統(tǒng)時間戳達(dá)到Timer設(shè)定的時間戳?xí)r觸發(fā)。
(2)事件時間——調(diào)? ctx.timerService().registerEventTimeTimer()注冊;onTimer()在Flink內(nèi)部?印達(dá)到或超過Timer設(shè)定的時間戳?xí)r觸發(fā)。

ProcessFunction 相關(guān)視頻

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容