Apache Flink——處理函數(shù)

前言

在更底層,可以不定義任何具體的算子(比如 map,filter,或者 window),而只是提煉出一個(gè)統(tǒng)一的“處理”(process)操作——它是所有轉(zhuǎn)換算子的一個(gè)概括性的表達(dá),可以自定義處理邏輯,所以這一層接口就被叫作“處理函數(shù)”(process function)。

在處理函數(shù)中,我們直面的就是數(shù)據(jù)流中最基本的元素:數(shù)據(jù)事件(event)、狀態(tài)(state)以及時(shí)間(time)。這就相當(dāng)于對(duì)流有了完全的控制權(quán)。處理函數(shù)比較抽象,沒有具體的操作,所以對(duì)于一些常見的簡(jiǎn)單應(yīng)用(比如求和、開窗口)會(huì)顯得有些麻煩;不過正是因?yàn)樗幌薅ň唧w做什么,所以理論上我們可以做任何事情,實(shí)現(xiàn)所有需求。所以可以說,處理函數(shù)是我們進(jìn)行 Flink 編程的“大招”,輕易不用,一旦放出來必然會(huì)掃平一切。

一、基本處理函數(shù)(ProcessFunction)

處理函數(shù)主要是定義數(shù)據(jù)流的轉(zhuǎn)換操作,所以也可以把它歸到轉(zhuǎn)換算子中。我們知道在
Flink 中幾乎所有轉(zhuǎn)換算子都提供了對(duì)應(yīng)的函數(shù)類接口,處理函數(shù)也不例外;它所對(duì)應(yīng)的函數(shù)類,就叫作 ProcessFunction。

1.1 處理函數(shù)的功能和使用

轉(zhuǎn)換算子一般只是針對(duì)某種具體操作來定義的,能夠拿到信息比較有限。比如map算子,獲取轉(zhuǎn)換之后形式;而像窗口聚合這樣的復(fù)雜操作,AggregateFunction中除數(shù)據(jù)外,還可以獲取到當(dāng)前的狀態(tài)(以累加器Accumulator形式出現(xiàn))。

另外富函數(shù)類,可以獲取運(yùn)行時(shí)上下文的方法getRuntimeContext(),還可以拿到狀態(tài)、并行度、任務(wù)名稱之類的運(yùn)行時(shí)信息。但這些算子無法獲得訪問事件的時(shí)間戳、當(dāng)前的水位線等信息。

處理函數(shù)提供了一個(gè)定時(shí)服務(wù)(TimeService),我們可以通過它訪問流中的事件(event)、時(shí)間戳(timestamp)、水位線(watermark),甚至可以注冊(cè)“定時(shí)事件”。而且處理函數(shù)繼承了AbstractRichFunction抽象類,故擁有富函數(shù)類的所有特性,同樣可以訪問狀態(tài)(state)和其他運(yùn)行時(shí)信息。此外,處理函數(shù)還可以直接將數(shù)據(jù)輸出到側(cè)輸出流(side output)中。故處理函數(shù)是最為靈活的處理方法,還可以實(shí)現(xiàn)各種自定義的業(yè)務(wù)邏輯,同時(shí)也是整個(gè)DataStream API的底層基礎(chǔ)。

處理函數(shù)的使用與基本的轉(zhuǎn)換操作類似,只需要直接基于 DataStream 調(diào)用.process()方法就可以了。方法需要傳入一個(gè) ProcessFunction 作為參數(shù),用來定義處理邏輯:

stream.process(new MyProcessFunction)

這里 ProcessFunction 不是接口,而是一個(gè)抽象類,繼承了 AbstractRichFunction;
MyProcessFunction 是它的一個(gè)具體實(shí)現(xiàn)。所以所有的處理函數(shù),都是富函數(shù)(RichFunction),富函數(shù)可以調(diào)用的東西這里同樣都可以調(diào)用。

下面是一個(gè)具體的應(yīng)用示例:

public class ProcessFunctionTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //設(shè)置生成水位線的時(shí)間間隔
        env.getConfig().setAutoWatermarkInterval(100);

        //亂序流的Watermark生成
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                // 插入水位線的邏輯 設(shè)置 watermark 延遲時(shí)間,2 秒
                .assignTimestampsAndWatermarks(
                        // 針對(duì)亂序流插入水位線,延遲時(shí)間設(shè)置為 2s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                // 抽取時(shí)間戳的邏輯
                                .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
                );

        stream.process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event event, Context context, Collector<String> out) throws Exception {
                if("Mary".equals(event.getUser())){
                    out.collect(event.getUser() + "clicks: " + event.getUrl());
                }else if("Bobo".equals(event.getUser())){
                    out.collect(event.getUser());
                    out.collect(event.getUser());
                }
                out.collect(event.toString());
                Long timestamp = context.timestamp();
                TimerService timerService = context.timerService();
                System.out.println("timestamp: " + timestamp);
                System.out.println("Watermark: " + timerService.currentWatermark());
            }
        }).print();

        env.execute();
    }
}

這里在 ProcessFunction 中重寫了.processElement()方法,自定義了一種處理邏輯:當(dāng)數(shù)據(jù)的 user 為“Mary”時(shí),將其輸出一次;而如果為“Bob”時(shí),將 user 輸出兩次。這里的
輸 出 , 是 通 過 調(diào) 用 out.collect() 來實(shí)現(xiàn)的。另外我們還可以調(diào)用
ctx.timerService().currentWatermark() 來 獲 取 當(dāng) 前 的 水 位 線 打 印 輸 出 。 所 以 可 以 看 到 ,ProcessFunction 函數(shù)有點(diǎn)像 FlatMapFunction 的升級(jí)版。可以實(shí)現(xiàn) Map、Filter、FlatMap 的所有功能。很明顯,處理函數(shù)非常強(qiáng)大,能夠做很多之前做不到的事情。

1.1.1 ProcessFunction 解析

在源碼中可以看到,抽象類 ProcessFunction 繼承了 AbstractRichFunction,有兩個(gè)泛型類型參數(shù):I 表示 Input ,也就是輸入的數(shù)據(jù)類型;O 表示 Output ,也就是處理完成之后輸出的數(shù)據(jù)類型。內(nèi)部單獨(dú)定義了兩個(gè)方法:一個(gè)是必須要實(shí)現(xiàn)的抽象方法 .processElement();另一個(gè)是非抽象方法 .onTimer()。

@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;
    
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    
    ......
    
}
抽象方法.processElement()

用于“處理元素”,定義了處理的核心邏輯。這個(gè)方法對(duì)于流中的每個(gè)元素都會(huì)調(diào)用一次,
參數(shù)包括三個(gè):輸入數(shù)據(jù)值 value,上下文 ctx,以及“收集器”(Collector)out。方法沒有返回值,處理之后的輸出數(shù)據(jù)是通過收集器 out 來定義的。

  • value:當(dāng)前流中的輸入元素,也就是正在處理的數(shù)據(jù),類型與流中數(shù)據(jù)類
    型一致。
  • ctx:類型是 ProcessFunction 中定義的內(nèi)部抽象類 Context,表示當(dāng)前運(yùn)行的
    上下文,可以獲取到當(dāng)前的時(shí)間戳,并提供了用于查詢時(shí)間和注冊(cè)定時(shí)器的“定時(shí)服
    務(wù)”(TimerService),以及可以將數(shù)據(jù)發(fā)送到“側(cè)輸出流”(side output)的方法.output()。
    Context 抽象類定義如下:
public abstract class Context {

    public abstract Long timestamp();

    public abstract TimerService timerService();

    public abstract <X> void output(OutputTag<X> outputTag, X value);
}
  • out:“收集器”(類型為 Collector),用于返回輸出數(shù)據(jù)。使用方式與 flatMap
    算子中的收集器完全一樣,直接調(diào)用 out.collect()方法就可以向下游發(fā)出一個(gè)數(shù)據(jù)。
    這個(gè)方法可以多次調(diào)用,也可以不調(diào)用。

通過幾個(gè)參數(shù)的分析不難發(fā)現(xiàn),ProcessFunction 可以輕松實(shí)現(xiàn) flatMap 這樣的基本轉(zhuǎn)換功能(當(dāng)然 map、filter 更不在話下);而通過富函數(shù)提供的獲取上下文方法.getRuntimeContext(),也可以自定義狀態(tài)(state)進(jìn)行處理,這也就能實(shí)現(xiàn)聚合操作的功能了。

非抽象方法.onTimer()

用于定義定時(shí)觸發(fā)的操作,這是一個(gè)非常強(qiáng)大、也非常有趣的功能。

這個(gè)方法只有在注冊(cè)好的定時(shí)器觸發(fā)的時(shí)候才會(huì)調(diào)用,而定時(shí)器是通過“定時(shí)服務(wù)”TimerService 來注冊(cè)的。

打個(gè)比方,注冊(cè)定時(shí)器(timer)就是設(shè)了一個(gè)鬧鐘,到了設(shè)定時(shí)間就會(huì)響;而.onTimer()中定義的,就是鬧鐘響的時(shí)候要做的事。所以它本質(zhì)上是一個(gè)基于時(shí)間的“回調(diào)”(callback)方法,通過時(shí)間的進(jìn)展來觸發(fā);在事件時(shí)間語義下就是由水位線(watermark)來觸發(fā)了。

與.processElement()類似,定時(shí)方法.onTimer()也有三個(gè)參數(shù):時(shí)間戳(timestamp),上下文(ctx),以及收集器(out)。這里的 timestamp 是指設(shè)定好的觸發(fā)時(shí)間,事件時(shí)間語義下當(dāng)然就是水位線了。另外這里同樣有上下文和收集器,所以也可以調(diào)用定時(shí)服務(wù)(TimerService),以及任意輸出處理之后的數(shù)據(jù)。

既然有.onTimer()方法做定時(shí)觸發(fā),我們用 ProcessFunction 也可以自定義數(shù)據(jù)按照時(shí)間分組、定時(shí)觸發(fā)計(jì)算輸出結(jié)果;這其實(shí)就實(shí)現(xiàn)了窗口(window)的功能。所以說 ProcessFunction是真正意義上的終極奧義,用它可以實(shí)現(xiàn)一切功能。

我們也可以看到,處理函數(shù)都是基于事件觸發(fā)的。水位線就如同插入流中的一條數(shù)據(jù)一樣;只不過處理真正的數(shù)據(jù)事件調(diào)用的是.processElement()方法,而處理水位線事件調(diào)用的是.onTimer()。

這里需要注意的是,上面的.onTimer()方法只是定時(shí)器觸發(fā)時(shí)的操作,而定時(shí)器(timer)
真正的設(shè)置需要用到上下文 ctx 中的定時(shí)服務(wù)。在 Flink 中,只有“按鍵分區(qū)流”KeyedStream才支持設(shè)置定時(shí)器的操作,所以之前的代碼中我們并沒有使用定時(shí)器。所以基于不同類型的流,可以使用不同的處理函數(shù),它們之間還是有一些微小的區(qū)別的。

1.1.2 處理函數(shù)的分類

Flink 中的處理函數(shù)其實(shí)是一個(gè)大家族,ProcessFunction 只是其中一員。

我們知道,DataStream 在調(diào)用一些轉(zhuǎn)換方法之后,有可能生成新的流類型;例如調(diào)
用.keyBy()之后得到 KeyedStream,進(jìn)而再調(diào)用.window()之后得到 WindowedStream。對(duì)于不同類型的流,其實(shí)都可以直接調(diào)用.process()方法進(jìn)行自定義處理,這時(shí)傳入的參數(shù)就都叫作處理函數(shù)。當(dāng)然,它們盡管本質(zhì)相同,都是可以訪問狀態(tài)和時(shí)間信息的底層 API,可彼此之間也會(huì)有所差異。

Flink 提供了 8 個(gè)不同的處理函數(shù):

  • 1 ProcessFunction
    最基本的處理函數(shù),基于 DataStream 直接調(diào)用.process()時(shí)作為參數(shù)傳入。

  • 2 KeyedProcessFunction
    對(duì)流按鍵分區(qū)后的處理函數(shù),基于 KeyedStream 調(diào)用.process()時(shí)作為參數(shù)傳入。要想使用定時(shí)器,比如基于 KeyedStream。

  • 3 ProcessWindowFunction
    開窗之后的處理函數(shù),也是全窗口函數(shù)的代表。基于 WindowedStream 調(diào)用.process()時(shí)作為參數(shù)傳入。

  • 4 ProcessAllWindowFunction
    同樣是開窗之后的處理函數(shù),基于 AllWindowedStream 調(diào)用.process()時(shí)作為參數(shù)傳入。

  • 5 CoProcessFunction
    合并(connect)兩條流之后的處理函數(shù),基于 ConnectedStreams 調(diào)用.process()時(shí)作為參數(shù)傳入。

  • 6 ProcessJoinFunction
    間隔連接(interval join)兩條流之后的處理函數(shù),基于 IntervalJoined 調(diào)用.process()時(shí)作為參數(shù)傳入。

  • 7 BroadcastProcessFunction
    廣播連接流處理函數(shù),基于 BroadcastConnectedStream 調(diào)用.process()時(shí)作為參數(shù)傳入。這里的“廣播連接流”BroadcastConnectedStream,是一個(gè)未 keyBy 的普通 DataStream 與一個(gè)廣播流(BroadcastStream)做連接(conncet)之后的產(chǎn)物。

  • 8 KeyedBroadcastProcessFunction
    按鍵分區(qū)的廣播連接流處理函數(shù),同樣是基于 BroadcastConnectedStream 調(diào)用.process()時(shí)作為參數(shù)傳入。與 BroadcastProcessFunction 不同的是,這時(shí)的廣播連接流,是一個(gè) KeyedStream與廣播流(BroadcastStream)做連接之后的產(chǎn)物。

接下來,我們就對(duì) KeyedProcessFunction 和 ProcessWindowFunction 的具體用法展開詳細(xì)說明。

1.2 按鍵分區(qū)處理函數(shù)(KeyedProcessFunction)

在 Flink 程序中,為了實(shí)現(xiàn)數(shù)據(jù)的聚合統(tǒng)計(jì),或者開窗計(jì)算之類的功能,我們一般都要先
用 keyBy 算子對(duì)數(shù)據(jù)流進(jìn)行“按鍵分區(qū)”,得到一個(gè) KeyedStream。也就是指定一個(gè)鍵(key),按照它的哈希值(hash code)將數(shù)據(jù)分成不同的“組”,然后分配到不同的并行子任務(wù)上執(zhí)行計(jì)算;這相當(dāng)于做了一個(gè)邏輯分流的操作,從而可以充分利用并行計(jì)算的優(yōu)勢(shì)實(shí)時(shí)處理海量數(shù)據(jù)。

另外只有在 KeyedStream 中才支持使用 TimerService 設(shè)置定時(shí)器的操作。所以一般情況下,都是先做了 keyBy 分區(qū)之后,再去定義處理操作;代碼中更加常見的處理函數(shù)是 KeyedProcessFunction,最基本的 ProcessFunction 反而出鏡率沒那么高。

1.2.1 定時(shí)器(Timer)和定時(shí)服務(wù)(TimerService)

KeyedProcessFunction 的一個(gè)特色,就是可以靈活地使用定時(shí)器。

定時(shí)器(timers)是處理函數(shù)中進(jìn)行時(shí)間相關(guān)操作的主要機(jī)制。在.onTimer()方法中可以實(shí)現(xiàn)定時(shí)處理的邏輯,而它能觸發(fā)的前提,就是之前曾經(jīng)注冊(cè)過定時(shí)器、并且現(xiàn)在已經(jīng)到了觸發(fā)時(shí)間。注冊(cè)定時(shí)器的功能,是通過上下文中提供的“定時(shí)服務(wù)”(TimerService)來實(shí)現(xiàn)的。

定時(shí)服務(wù)與當(dāng)前運(yùn)行的環(huán)境有關(guān)。前面已經(jīng)介紹過,ProcessFunction 的上下文(Context)中提供了.timerService()方法,可以直接返回一個(gè) TimerService 對(duì)象:

public abstract TimerService timerService();

TimerService 是 Flink 關(guān)于時(shí)間和定時(shí)器的基礎(chǔ)服務(wù)接口,包含以下六個(gè)方法:

// 獲取當(dāng)前的處理時(shí)間
long currentProcessingTime();

// 獲取當(dāng)前的水位線(事件時(shí)間)
long currentWatermark();

// 注冊(cè)處理時(shí)間定時(shí)器,當(dāng)處理時(shí)間超過 time 時(shí)觸發(fā)
void registerProcessingTimeTimer(long time);

// 注冊(cè)事件時(shí)間定時(shí)器,當(dāng)水位線超過 time 時(shí)觸發(fā)
void registerEventTimeTimer(long time);

// 刪除觸發(fā)時(shí)間為 time 的處理時(shí)間定時(shí)器
void deleteProcessingTimeTimer(long time);

// 刪除觸發(fā)時(shí)間為 time 的處理時(shí)間定時(shí)器
void deleteEventTimeTimer(long time);

六個(gè)方法可以分成兩大類:基于處理時(shí)間和基于事件時(shí)間。
而對(duì)應(yīng)的操作主要有三個(gè):獲取當(dāng)前時(shí)間,注冊(cè)定時(shí)器,以及刪除定時(shí)器。

需要注意,盡管處理函數(shù)中都可以直接訪問TimerService,不過只有基于 KeyedStream 的處理函數(shù),才能去調(diào)用注冊(cè)和刪除定時(shí)器的方法;未作按鍵分區(qū)的 DataStream 不支持定時(shí)器操作,只能獲取當(dāng)前時(shí)間。

對(duì)于處理時(shí)間和事件時(shí)間這兩種類型的定時(shí)器,TimerService 內(nèi)部會(huì)用一個(gè)優(yōu)先隊(duì)列將它們的時(shí)間戳(timestamp)保存起來,排隊(duì)等待執(zhí)行??梢哉J(rèn)為,定時(shí)器其實(shí)是 KeyedStream上處理算子的一個(gè)狀態(tài),它以時(shí)間戳作為區(qū)分。所以 TimerService 會(huì)以鍵(key)和時(shí)間戳為標(biāo)準(zhǔn),對(duì)定時(shí)器進(jìn)行去重;也就是說對(duì)于每個(gè) key 和時(shí)間戳,最多只有一個(gè)定時(shí)器,如果注冊(cè)了多次,onTimer()方法也將只被調(diào)用一次。這樣一來,我們?cè)诖a中就方便了很多,可以肆無忌憚地對(duì)一個(gè) key 注冊(cè)定時(shí)器,而不用擔(dān)心重復(fù)定義——因?yàn)橐粋€(gè)時(shí)間戳上的定時(shí)器只會(huì)觸發(fā)一次。

基于 KeyedStream 注冊(cè)定時(shí)器時(shí),會(huì)傳入一個(gè)定時(shí)器觸發(fā)的時(shí)間戳,這個(gè)時(shí)間戳的定時(shí)器對(duì)于每個(gè) key 都是有效的。這樣,我們的代碼并不需要做額外的處理,底層就可以直接對(duì)不同key 進(jìn)行獨(dú)立的處理操作了。

利用這個(gè)特性,有時(shí)我們可以故意降低時(shí)間戳的精度,來減少定時(shí)器的數(shù)量,從而提高處理性能。比如我們可以在設(shè)置定時(shí)器時(shí)只保留整秒數(shù),那么定時(shí)器的觸發(fā)頻率就是最多 1 秒一次。

long coalescedTime = time / 1000 * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);

這里注意定時(shí)器的時(shí)間戳必須是毫秒數(shù),所以我們得到整秒之后還要乘以 1000。定時(shí)器
默認(rèn)的區(qū)分精度是毫秒。

另外 Flink 對(duì).onTimer()和.processElement()方法是同步調(diào)用的(synchronous),所以也不會(huì)出現(xiàn)狀態(tài)的并發(fā)修改。

Flink 的定時(shí)器同樣具有容錯(cuò)性,它和狀態(tài)一起都會(huì)被保存到一致性檢查點(diǎn)(checkpoint)中。當(dāng)發(fā)生故障時(shí),F(xiàn)link 會(huì)重啟并讀取檢查點(diǎn)中的狀態(tài),恢復(fù)定時(shí)器。如果是處理時(shí)間的定時(shí)器,有可能會(huì)出現(xiàn)已經(jīng)“過期”的情況,這時(shí)它們會(huì)在重啟時(shí)被立刻觸發(fā)。

1.2.2 KeyedProcessFunction 的使用

KeyedProcessFunction 是 ProcessFunction 的一個(gè)擴(kuò)展。我們只要基于 keyBy 之后的 KeyedStream,直接調(diào)用.process()方法,這時(shí)需要傳入的參數(shù)就是 KeyedProcessFunction 的實(shí)現(xiàn)類。

stream.keyBy( t -> t.f0 )
    .process(new MyKeyedProcessFunction())

類似地,KeyedProcessFunction 也是繼承自 AbstractRichFunction 的一個(gè)抽象類,源碼中定義如下:

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {

    ...
    
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    public abstract class Context {...}
    
    ...

}

可以看到與 ProcessFunction 的定義幾乎完全一樣,區(qū)別只是在于類型參數(shù)多了一個(gè) K,這是當(dāng)前按鍵分區(qū)的 key 的類型。同樣地,我們必須實(shí)現(xiàn)一個(gè).processElement()抽象方法,用來處理流中的每一個(gè)數(shù)據(jù);另外還有一個(gè)非抽象方法.onTimer(),用來定義定時(shí)器觸發(fā)時(shí)的回調(diào)操作。由于定時(shí)器只能在 KeyedStream 上使用,所以到了 KeyedProcessFunction 這里,我們才真正對(duì)時(shí)間有了精細(xì)的控制,定時(shí)方法.onTimer()才真正派上了用場(chǎng)。

下面是一個(gè)使用處理時(shí)間定時(shí)器的具體示例:

public class ProcessingTimeTimerTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //設(shè)置生成水位線的時(shí)間間隔
        env.getConfig().setAutoWatermarkInterval(100);

        //亂序流的Watermark生成
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                // 插入水位線的邏輯 設(shè)置 watermark 延遲時(shí)間,2 秒
                .assignTimestampsAndWatermarks(
                        // 針對(duì)亂序流插入水位線,延遲時(shí)間設(shè)置為 2s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                // 抽取時(shí)間戳的邏輯
                                .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
                );

        //處理時(shí)間定時(shí)器
        stream.keyBy(Event::getUser)
            .process(new KeyedProcessFunction<String, Event, String>() {
                @Override
                public void processElement(Event event, Context context, Collector<String> out) throws Exception {
                    long processingTime = context.timerService().currentProcessingTime();
                    out.collect(context.getCurrentKey() + "數(shù)據(jù)到達(dá)時(shí)間: " + new Timestamp(processingTime));

                    //注冊(cè)一個(gè)10秒后的定時(shí)器
                    context.timerService().registerProcessingTimeTimer(processingTime + 10000);

                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                    out.collect(ctx.getCurrentKey() + "定時(shí)器觸發(fā)時(shí)間: " + new Timestamp(timestamp));
                }
            }).print();

        env.execute();
    }
}

在上面的代碼中,由于定時(shí)器只能在 KeyedStream 上使用,所以先要進(jìn)行 keyBy;這里
的.keyBy(data -> true)是將所有數(shù)據(jù)的 key 都指定為了 true,其實(shí)就是所有數(shù)據(jù)擁有相同的 key,會(huì)分配到同一個(gè)分區(qū)。

之后我們自定義了一個(gè) KeyedProcessFunction,其中.processElement()方法是每來一個(gè)數(shù)據(jù)都會(huì)調(diào)用一次,主要是定義了一個(gè) 10 秒之后的定時(shí)器;而.onTimer()方法則會(huì)在定時(shí)器觸發(fā)時(shí)調(diào)用。所以我們會(huì)看到,程序運(yùn)行后先在控制臺(tái)輸出“數(shù)據(jù)到達(dá)”的信息,等待 10 秒之后,又會(huì)輸出“定時(shí)器觸發(fā)”的信息,打印出的時(shí)間間隔正是 10 秒。

當(dāng)然,上面的例子是處理時(shí)間的定時(shí)器,所以我們是真的需要等待 10 秒才會(huì)看到結(jié)果。
事件時(shí)間語義下,又會(huì)有什么不同呢?我們可以對(duì)上面的代碼略作修改,做一個(gè)測(cè)試:

import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class EventTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        // 基于KeyedStream定義事件時(shí)間定時(shí)器
        stream.keyBy(data -> true)
                .process(new KeyedProcessFunction<Boolean, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("數(shù)據(jù)到達(dá),時(shí)間戳為:" + ctx.timestamp());
                        out.collect("數(shù)據(jù)到達(dá),水位線為:" + ctx.timerService().currentWatermark() + "\n -------分割線-------");
                        // 注冊(cè)一個(gè)10秒后的定時(shí)器
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定時(shí)器觸發(fā),觸發(fā)時(shí)間:" + timestamp);
                    }
                })
                .print();

        env.execute();
    }

    // 自定義測(cè)試數(shù)據(jù)源
    public static class CustomSource implements SourceFunction<Event> {
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            // 直接發(fā)出測(cè)試數(shù)據(jù)
            ctx.collect(new Event("Mary", "./home", 1000L));
            // 為了更加明顯,中間停頓5秒鐘
            Thread.sleep(5000L);

            // 發(fā)出10秒后的數(shù)據(jù)
            ctx.collect(new Event("Mary", "./home", 11000L));
            Thread.sleep(5000L);

            // 發(fā)出10秒+1ms后的數(shù)據(jù)
            ctx.collect(new Event("Alice", "./cart", 11001L));
            Thread.sleep(5000L);
        }

        @Override
        public void cancel() { }
    }
}

由于是事件時(shí)間語義,所以我們必須從數(shù)據(jù)中提取出數(shù)據(jù)產(chǎn)生的時(shí)間戳。這里為了更清楚地看到程序行為,我們自定義了一個(gè)數(shù)據(jù)源,發(fā)出三條測(cè)試數(shù)據(jù),時(shí)間戳分別為 1000、11000和 11001,并且發(fā)出數(shù)據(jù)后都會(huì)停頓 5 秒。
在代碼中,我們依然將所有數(shù)據(jù)分到同一分區(qū),然后在自定義的 KeyedProcessFunction 中使用定時(shí)器。同樣地,每來一條數(shù)據(jù),我們就將當(dāng)前的數(shù)據(jù)時(shí)間戳和水位線信息輸出,并注冊(cè)一個(gè) 10 秒后(以當(dāng)前數(shù)據(jù)時(shí)間戳為基準(zhǔn))的事件時(shí)間定時(shí)器。執(zhí)行程序結(jié)果如下:

數(shù)據(jù)到達(dá),時(shí)間戳為:1000
數(shù)據(jù)到達(dá),水位線為:-9223372036854775808
-------分割線-------
數(shù)據(jù)到達(dá),時(shí)間戳為:11000
數(shù)據(jù)到達(dá),水位線為:999
-------分割線-------
數(shù)據(jù)到達(dá),時(shí)間戳為:11001
數(shù)據(jù)到達(dá),水位線為:10999
-------分割線-------
定時(shí)器觸發(fā),觸發(fā)時(shí)間:11000
定時(shí)器觸發(fā),觸發(fā)時(shí)間:21000
定時(shí)器觸發(fā),觸發(fā)時(shí)間:21001

每來一條數(shù)據(jù),都會(huì)輸出兩行“數(shù)據(jù)到達(dá)”的信息,并以分割線隔開;兩條數(shù)據(jù)到達(dá)的時(shí)
間間隔為 5 秒。當(dāng)?shù)谌龡l數(shù)據(jù)到達(dá)后,隨后立即輸出一條定時(shí)器觸發(fā)的信息;再過 5 秒之后,剩余兩條定時(shí)器信息輸出,程序運(yùn)行結(jié)束。

我們可以發(fā)現(xiàn),數(shù)據(jù)到來之后,當(dāng)前的水位線與時(shí)間戳并不是一致的。當(dāng)?shù)谝粭l數(shù)據(jù)到來,時(shí)間戳為 1000,可水位線的生成是周期性的(默認(rèn) 200ms 一次),不會(huì)立即發(fā)生改變,所以依然是最小值 Long.MIN_VALUE;隨后只要到了水位線生成的時(shí)間點(diǎn)(200ms 到了),就會(huì)依據(jù)當(dāng)前的最大時(shí)間戳 1000 來生成水位線了。這里我們沒有設(shè)置水位線延遲,默認(rèn)需要減去 1 毫秒,所以水位線推進(jìn)到了 999。而當(dāng)時(shí)間戳為 11000 的第二條數(shù)據(jù)到來之后,水位線同樣沒有立即改變,仍然是 999,就好像總是“滯后”數(shù)據(jù)一樣。

這樣程序的行為就可以得到合理解釋了。事件時(shí)間語義下,定時(shí)器觸發(fā)的條件就是水位線推進(jìn)到設(shè)定的時(shí)間。第一條數(shù)據(jù)到來后,設(shè)定的定時(shí)器時(shí)間為 1000 + 10 * 1000 = 11000;而當(dāng)時(shí)間戳為 11000 的第二條數(shù)據(jù)到來,水位線還處在 999 的位置,當(dāng)然不會(huì)立即觸發(fā)定時(shí)器;而之后水位線會(huì)推進(jìn)到 10999,同樣是無法觸發(fā)定時(shí)器的。必須等到第三條數(shù)據(jù)到來,將水位線真正推進(jìn)到 11000,就可以觸發(fā)第一個(gè)定時(shí)器了。第三條數(shù)據(jù)發(fā)出后再過 5 秒,沒有更多的數(shù)據(jù)生成了,整個(gè)程序運(yùn)行結(jié)束將要退出,此時(shí) Flink 會(huì)自動(dòng)將水位線推進(jìn)到長(zhǎng)整型的最大值(Long.MAX_VALUE)。于是所有尚未觸發(fā)的定時(shí)器這時(shí)就統(tǒng)一觸發(fā)了,我們就在控制臺(tái)看到了后兩個(gè)定時(shí)器的觸發(fā)信息。

二、 窗口處理函數(shù)

除 了 KeyedProcessFunction , 另 外 一 大 類 常 用 的 處 理 函 數(shù) , 就 是 基 于 窗 口 的ProcessWindowFunction 和 ProcessAllWindowFunction 了。

2.1 窗口處理函數(shù)的使用

進(jìn)行窗口計(jì)算,我們可以直接調(diào)用現(xiàn)成的簡(jiǎn)單聚合方法(sum/max/min),也可以通過調(diào)
用.reduce()或.aggregate()來自定義一般的增量聚合函數(shù)(ReduceFunction/AggregateFucntion);而對(duì)于更加復(fù)雜、需要窗口信息和額外狀態(tài)的一些場(chǎng)景,我們還可以直接使用全窗口函數(shù)、把數(shù)據(jù)全部收集保存在窗口內(nèi),等到觸發(fā)窗口計(jì)算時(shí)再統(tǒng)一處理。窗口處理函數(shù)就是一種典型的全窗口函數(shù)。

窗 口 處 理 函 數(shù) ProcessWindowFunction 的 使 用 與 其 他 窗 口 函 數(shù) 類 似 , 也 是 基 于WindowedStream 直接調(diào)用方法就可以,只不過這時(shí)調(diào)用的是.process()。

stream.keyBy( t -> t.f0 )
    .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
    .process(new MyProcessWindowFunction())

2.2 ProcessWindowFunction 解析

ProcessWindowFunction 既是處理函數(shù)又是全窗口函數(shù)。從名字上也可以推測(cè)出,它的本質(zhì)似乎更傾向于“窗口函數(shù)”一些。事實(shí)上它的用法也確實(shí)跟其他處理函數(shù)有很大不同。我們可以從源碼中的定義看到這一點(diǎn):

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    ...

    public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    public void clear(Context context) throws Exception {}

    public abstract class Context implements java.io.Serializable {...}

}

ProcessWindowFunction 依然是一個(gè)繼承了 AbstractRichFunction 的抽象類,它有四個(gè)類型參數(shù):

  • IN:input,數(shù)據(jù)流中窗口任務(wù)的輸入數(shù)據(jù)類型。
  • OUT:output,窗口任務(wù)進(jìn)行計(jì)算之后的輸出數(shù)據(jù)類型。
  • KEY:數(shù)據(jù)中鍵 key 的類型。
  • W:窗口的類型,是Window 的子類型。一般情況下我們定義時(shí)間窗口,W 就是TimeWindow。

而內(nèi)部定義的方法,跟我們之前熟悉的處理函數(shù)就有所區(qū)別了。因?yàn)槿翱诤瘮?shù)不是逐個(gè)處理元素的,所以處理數(shù)據(jù)的方法在這里并不是.processElement(),而是改成了.process()。方法包含四個(gè)參數(shù)。

  • key:窗口做統(tǒng)計(jì)計(jì)算基于的鍵,也就是之前 keyBy 用來分區(qū)的字段。
  • context:當(dāng)前窗口進(jìn)行計(jì)算的上下文,它的類型就是 ProcessWindowFunction
    內(nèi)部定義的抽象類Context。
  • elements:窗口收集到用來計(jì)算的所有數(shù)據(jù),這是一個(gè)可迭代的集合類型。
  • out:用來發(fā)送數(shù)據(jù)輸出計(jì)算結(jié)果的收集器,類型為Collector。

可以明顯看出,這里的參數(shù)不再是一個(gè)輸入數(shù)據(jù),而是窗口中所有數(shù)據(jù)的集合。而上下文context 所包含的內(nèi)容也跟其他處理函數(shù)有所差別:

public abstract class Context implements java.io.Serializable {

    public abstract W window();
    
    public abstract long currentProcessingTime(); 
    
    public abstract long currentWatermark();
    
    public abstract KeyedStateStore windowState();
    
    public abstract KeyedStateStore globalState();
    
    public abstract <X> void output(OutputTag<X> outputTag, X value);
}

除了可以通過.output()方法定義側(cè)輸出流不變外,其他部分都有所變化。這里不再持有TimerService 對(duì)象,只能通過 currentProcessingTime()和 currentWatermark()來獲取當(dāng)前時(shí)間,所以失去了設(shè)置定時(shí)器的功能;另外由于當(dāng)前不是只處理一個(gè)數(shù)據(jù),所以也不再提供.timestamp() 方法。與此同時(shí),也增加了一些獲取其他信息的方法:比如可以通過.window()直接獲取到當(dāng)前的窗口對(duì)象,也可以通過.windowState()和.globalState()獲取到當(dāng)前自定義的窗口狀態(tài)和全局狀態(tài)。注意這里的“窗口狀態(tài)”是自定義的,不包括窗口本身已經(jīng)有的狀態(tài),針對(duì)當(dāng)前 key、當(dāng)前窗口有效;而“全局狀態(tài)”同樣是自定義的狀態(tài),針對(duì)當(dāng)前 key 的所有窗口有效。

所以我們會(huì)發(fā)現(xiàn),ProcessWindowFunction 中除了.process()方法外,并沒有.onTimer()方法, 而是多出了一個(gè).clear()方法。從名字就可以看出,這主要是方便我們進(jìn)行窗口的清理工作。如果我們自定義了窗口狀態(tài),那么必須在.clear()方法中進(jìn)行顯式地清除,避免內(nèi)存溢出。

這里有一個(gè)問題:沒有了定時(shí)器,那窗口處理函數(shù)就失去了一個(gè)最給力的武器,如果我們希望有一些定時(shí)操作又該怎么做呢?其實(shí)仔細(xì)思考會(huì)發(fā)現(xiàn),對(duì)于窗口而言,它本身的定義就包含了一個(gè)觸發(fā)計(jì)算的時(shí)間點(diǎn),其實(shí)一般情況下是沒有必要再去做定時(shí)操作的。如果非要這么干, Flink 也提供了另外的途徑——使用窗口觸發(fā)器(Trigger)。在觸發(fā)器中也有一個(gè)TriggerContext,它可以起到類似TimerService 的作用:獲取當(dāng)前時(shí)間、注冊(cè)和刪除定時(shí)器,另外還可以獲取當(dāng)前的狀態(tài)。這樣設(shè)計(jì)無疑會(huì)讓處理流程更加清晰——定時(shí)操作也是一種“觸發(fā)”,所以我們就讓所有的觸發(fā)操作歸觸發(fā)器管,而所有處理數(shù)據(jù)的操作則歸窗口函數(shù)管。

至于另一種窗口處理函數(shù) ProcessAllWindowFunction,它的用法非常類似。區(qū)別在于它基于的是 AllWindowedStream,相當(dāng)于對(duì)沒有 keyBy 的數(shù)據(jù)流直接開窗并調(diào)用.process()方法:

stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
    .process(new MyProcessAllWindowFunction())

三、應(yīng)用案例——Top N

窗口的計(jì)算處理,在實(shí)際應(yīng)用中非常常見。對(duì)于一些比較復(fù)雜的需求,如果增量聚合函數(shù)無法滿足,我們就需要考慮使用窗口處理函數(shù)這樣的“大招”了。

網(wǎng)站中一個(gè)非常經(jīng)典的例子,就是實(shí)時(shí)統(tǒng)計(jì)一段時(shí)間內(nèi)的熱門 url。例如,需要統(tǒng)計(jì)最近
10 秒鐘內(nèi)最熱門的兩個(gè) url 鏈接,并且每 5 秒鐘更新一次。我們知道,這可以用一個(gè)滑動(dòng)窗口來實(shí)現(xiàn),而“熱門度”一般可以直接用訪問量來表示。于是就需要開滑動(dòng)窗口收集 url 的訪問數(shù)據(jù),按照不同的 url 進(jìn)行統(tǒng)計(jì),而后匯總排序并最終輸出前兩名。這其實(shí)就是著名的“Top N”問題。

很顯然,簡(jiǎn)單的增量聚合可以得到 url 鏈接的訪問量,但是后續(xù)的排序輸出 Top N 就很難
實(shí)現(xiàn)了。所以接下來我們用窗口處理函數(shù)進(jìn)行實(shí)現(xiàn)。

3.1 使用 ProcessAllWindowFunction

一種最簡(jiǎn)單的想法是,我們干脆不區(qū)分 url 鏈接,而是將所有訪問數(shù)據(jù)都收集起來,統(tǒng)一
進(jìn)行統(tǒng)計(jì)計(jì)算。所以可以不做 keyBy,直接基于 DataStream 開窗,然后使用全窗口函數(shù)ProcessAllWindowFunction 來進(jìn)行處理。

在窗口中可以用一個(gè) HashMap 來保存每個(gè) url 的訪問次數(shù),只要遍歷窗口中的所有數(shù)據(jù),自然就能得到所有 url 的熱門度。最后把 HashMap 轉(zhuǎn)成一個(gè)列表 ArrayList,然后進(jìn)行排序、取出前兩名輸出就可以了。

代碼具體實(shí)現(xiàn)如下:

import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;


public class TopNExample_ProcessAiiWindowFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //亂序流的Watermark生成
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                // 插入水位線的邏輯 設(shè)置 watermark 延遲時(shí)間,2 秒
                .assignTimestampsAndWatermarks(
                        // 針對(duì)亂序流插入水位線,延遲時(shí)間設(shè)置為 2s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 抽取時(shí)間戳的邏輯
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        }));

        //直接開窗, 收集所有數(shù)據(jù)排序
        stream.map(Event::getUrl)
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlHashMapCountAgg(), new UrlAllWindowResult())
                .print();
        
        env.execute();
    }

    /**
     * 實(shí)現(xiàn)自定義的增量聚合函數(shù)
     * 自定義實(shí)現(xiàn)AggregateFunction, 增量計(jì)算url頁面的訪問量,來一條數(shù)據(jù)就 +1
     */
    public static class UrlHashMapCountAgg implements AggregateFunction<String, HashMap<String, Long>, List<Tuple2<String, Long>>> {

        @Override
        public HashMap<String, Long> createAccumulator() {
            return new HashMap<>();
        }

        @Override
        public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
            if(accumulator.containsKey(value)){
                Long count = accumulator.get(value);
                accumulator.put(value,count + 1);
            }else {
                accumulator.put(value, 1L);
            }
            return accumulator;
        }

        @Override
        public List<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
            List<Tuple2<String, Long>> result = new ArrayList<>();
            accumulator.forEach((url, count) -> {
                result.add(Tuple2.of(url, count));
            });
            result.sort((o1, o2) -> o2.f1.intValue() - o1.f1.intValue());
            return result;
        }

        @Override
        public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
            return null;
        }
    }


    /**
     * 自定義全窗口函數(shù), 包裝信息輸出結(jié)果
     */
    public static class UrlAllWindowResult extends ProcessAllWindowFunction<List<Tuple2<String, Long>>, String, TimeWindow> {

        @Override
        public void process(Context context, Iterable<List<Tuple2<String, Long>>> iterable, Collector<String> out) throws Exception {
            List<Tuple2<String, Long>> list = iterable.iterator().next();
            StringBuilder result = new StringBuilder();
            result.append("-------------------------------------\n");
            result.append("窗口結(jié)束時(shí)間: " + new Timestamp(context.window().getEnd()) + "\n");

            //取list前兩個(gè)包裝信息輸出
            for (int i = 0; i < 2; i++) {
                Tuple2<String, Long> curTuple2 = list.get(i);
                String info = "NO. " + (i + 1) + ": " + curTuple2.f0 +"訪問量:" + curTuple2.f1 +"\n";
                result.append(info);
            }
            result.append("-------------------------------------\n");
            out.collect(result.toString());
        }
    }
}

運(yùn)行結(jié)果如下所示:

-------------------------------------
窗口結(jié)束時(shí)間: 2022-07-17 23:07:40.0
NO. 1: ./cart訪問量:6
NO. 2: ./home訪問量:2
-------------------------------------

3.2 使用 KeyedProcessFunction

在上面實(shí)現(xiàn)過程中,我們沒有進(jìn)行按鍵分區(qū),直接將所有數(shù)據(jù)放在一個(gè)分區(qū)上進(jìn)行
了開窗操作。這相當(dāng)于將并行度強(qiáng)行設(shè)置為 1,在實(shí)際應(yīng)用中是要盡量避免的,所以 Flink 官方也并不推薦使用 AllWindowedStream 進(jìn)行處理。另外,我們?cè)谌翱诤瘮?shù)中定義了 HashMap來統(tǒng)計(jì) url 鏈接的瀏覽量,計(jì)算過程是要先收集齊所有數(shù)據(jù)、然后再逐一遍歷更新 HashMap,這顯然不夠高效。如果我們可以利用增量聚合函數(shù)的特性,每來一條數(shù)據(jù)就更新一次對(duì)應(yīng) url的瀏覽量,那么到窗口觸發(fā)計(jì)算時(shí)只需要做排序輸出就可以了。

基于這樣的想法,我們可以從兩個(gè)方面去做優(yōu)化:一是對(duì)數(shù)據(jù)進(jìn)行按鍵分區(qū),分別統(tǒng)計(jì)瀏覽量;二是進(jìn)行增量聚合,得到結(jié)果最后再做排序輸出。所以,我們可以使用增量聚合函數(shù)AggregateFunction 進(jìn)行瀏覽量的統(tǒng)計(jì),然后結(jié)合 ProcessWindowFunction 排序輸出來實(shí)現(xiàn) Top N的需求。

具體實(shí)現(xiàn)思路就是,先按照 url 對(duì)數(shù)據(jù)進(jìn)行 keyBy 分區(qū),然后開窗進(jìn)行增量聚合。這里就會(huì)發(fā)現(xiàn)一個(gè)問題:我們進(jìn)行按鍵分區(qū)之后,窗口的計(jì)算就會(huì)只針對(duì)當(dāng)前 key 有效了;也就是說,每個(gè)窗口的統(tǒng)計(jì)結(jié)果中,只會(huì)有一個(gè) url 的瀏覽量,這是無法直接用 ProcessWindowFunction進(jìn)行排序的。所以我們只能分成兩步:先對(duì)每個(gè) url 鏈接統(tǒng)計(jì)出瀏覽量,然后再將統(tǒng)計(jì)結(jié)果收集起來,排序輸出最終結(jié)果。因?yàn)樽詈蟮呐判蜻€是基于每個(gè)時(shí)間窗口的,所以為了讓輸出的統(tǒng)計(jì)結(jié)果中包含窗口信息,我們可以借用第六章中定義的 POJO 類 UrlViewCount 來表示,它包含了 url、瀏覽量(count)以及窗口的起始結(jié)束時(shí)間。之后對(duì) UrlViewCount 的處理,可以先按窗口分區(qū),然后用 KeyedProcessFunction 來實(shí)現(xiàn)。

總結(jié)處理流程如下:

  • (1)讀取數(shù)據(jù)源;
  • (2)篩選瀏覽行為(pv);
  • (3)提取時(shí)間戳并生成水位線;
  • (4)按照 url 進(jìn)行 keyBy 分區(qū)操作;
  • (5)開長(zhǎng)度為 1 小時(shí)、步長(zhǎng)為 5 分鐘的事件時(shí)間滑動(dòng)窗口;
  • (6)使用增量聚合函數(shù) AggregateFunction,并結(jié)合全窗口函數(shù) WindowFunction 進(jìn)行窗口聚合,得到每個(gè) url、在每個(gè)統(tǒng)計(jì)窗口內(nèi)的瀏覽量,包裝成 UrlViewCount;
  • (7)按照窗口進(jìn)行 keyBy 分區(qū)操作;
  • (8)對(duì)同一窗口的統(tǒng)計(jì)結(jié)果數(shù)據(jù),使用 KeyedProcessFunction 進(jìn)行收集并排序輸出。

糟糕的是,這里又會(huì)帶來另一個(gè)問題。最后我們用 KeyedProcessFunction 來收集數(shù)據(jù)做排序,這時(shí)面對(duì)的就是窗口聚合之后的數(shù)據(jù)流,而窗口已經(jīng)不存在了;那到底什么時(shí)候會(huì)收集齊所有數(shù)據(jù)呢?這問題聽起來似乎有些沒道理。我們統(tǒng)計(jì)瀏覽量的窗口已經(jīng)關(guān)閉,就說明了當(dāng)前已經(jīng)到了要輸出結(jié)果的時(shí)候,直接輸出不就行了嗎?

沒有這么簡(jiǎn)單。因?yàn)閿?shù)據(jù)流中的元素是逐個(gè)到來的,所以即使理論上我們應(yīng)該“同時(shí)”收
到很多 url 的瀏覽量統(tǒng)計(jì)結(jié)果,實(shí)際也是有先后的、只能一條一條處理。下游任務(wù)(就是我們定義的 KeyedProcessFunction)看到一個(gè) url 的統(tǒng)計(jì)結(jié)果,并不能保證這個(gè)時(shí)間段的統(tǒng)計(jì)數(shù)據(jù)不會(huì)再來了,所以也不能貿(mào)然進(jìn)行排序輸出。解決的辦法,自然就是要等所有數(shù)據(jù)到齊了——這很容易讓我們聯(lián)想起水位線設(shè)置延遲時(shí)間的方法。這里我們也可以“多等一會(huì)兒”,等到水位線真正超過了窗口結(jié)束時(shí)間,要統(tǒng)計(jì)的數(shù)據(jù)就肯定到齊了。
具體實(shí)現(xiàn)上,可以采用一個(gè)延遲觸發(fā)的事件時(shí)間定時(shí)器?;诖翱诘慕Y(jié)束時(shí)間來設(shè)定延遲,其實(shí)并不需要等太久——因?yàn)槲覀兪强克痪€的推進(jìn)來觸發(fā)定時(shí)器,而水位線的含義就是“之前的數(shù)據(jù)都到齊了”。所以我們只需要設(shè)置 1 毫秒的延遲,就一定可以保證這一點(diǎn)。

而在等待過程中,之前已經(jīng)到達(dá)的數(shù)據(jù)應(yīng)該緩存起來,我們這里用一個(gè)自定義的“列表狀
態(tài)”(ListState)來進(jìn)行存儲(chǔ)。這個(gè)狀態(tài)需要使用富函數(shù)類的 getRuntimeContext()方法獲取運(yùn)行時(shí)上下文來定義,我們一般把它放在 open()生命周期方法中。之后每來一個(gè)UrlViewCount,就把它添加到當(dāng)前的列表狀態(tài)中,并注冊(cè)一個(gè)觸發(fā)時(shí)間為窗口結(jié)束時(shí)間加 1毫秒(windowEnd + 1)的定時(shí)器。待到水位線到達(dá)這個(gè)時(shí)間,定時(shí)器觸發(fā),我們可以保證當(dāng)前窗口所有 url 的統(tǒng)計(jì)結(jié)果 UrlViewCount 都到齊了;于是從狀態(tài)中取出進(jìn)行排序輸出。

具體代碼實(shí)現(xiàn)如下:

import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import com.yibo.flink.window.UrlViewCount;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;


public class TopNExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //亂序流的Watermark生成
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                // 插入水位線的邏輯 設(shè)置 watermark 延遲時(shí)間,2 秒
                .assignTimestampsAndWatermarks(
                        // 針對(duì)亂序流插入水位線,延遲時(shí)間設(shè)置為 2s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 抽取時(shí)間戳的邏輯
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        }));
        //按url分組, 統(tǒng)計(jì)窗口內(nèi)每個(gè)url的訪問量
        SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream.keyBy(Event::getUrl)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        urlCountStream.print("urlCount");

        //對(duì)同一窗口統(tǒng)計(jì)出的訪問量, 進(jìn)行收集和和排序
        urlCountStream.keyBy(UrlViewCount::getWindowEnd)
                .process(new TopNProcessResult(2))
                .print();
        
        env.execute();
    }

    //實(shí)現(xiàn)自定義的KeyedProcessFunction
    public static class TopNProcessResult extends KeyedProcessFunction<Long, UrlViewCount, String>{

        //定義一個(gè)屬性 n
        private Integer n;

        //定義列表狀態(tài)
        private ListState<UrlViewCount> urlViewCountListState;

        public TopNProcessResult(Integer n){
            this.n = n;
        }

        //在環(huán)境中獲取狀態(tài)
        @Override
        public void open(Configuration parameters) throws Exception {
            urlViewCountListState = getRuntimeContext().getListState(
                    new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class))
            );
        }

        @Override
        public void processElement(UrlViewCount urlCountView, Context context, Collector<String> collector) throws Exception {
            //將數(shù)據(jù)保存到狀態(tài)中
            urlViewCountListState.add(urlCountView);
            //注冊(cè)windowEnd + 1ms定時(shí)器
            context.timerService().registerEventTimeTimer(context.getCurrentKey() + 1);

        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            List<UrlViewCount> list = new ArrayList<>();
            for (UrlViewCount urlViewCount : urlViewCountListState.get()) {
                list.add(urlViewCount);
            }
            List<UrlViewCount> urlViewCountList = list.stream().sorted(Comparator.comparing(UrlViewCount::getCount).reversed()).collect(Collectors.toList());

            StringBuilder result = new StringBuilder();
            result.append("-------------------------------------\n");
            result.append("窗口結(jié)束時(shí)間: " + new Timestamp(ctx.getCurrentKey()) + "\n");

            //取list前兩個(gè)包裝信息輸出
            for (int i = 0; i < 2; i++) {
                UrlViewCount urlViewCount = urlViewCountList.get(i);
                String info = "NO. " + (i + 1) + ": " + urlViewCount.getUrl() +"訪問量:" + urlViewCount.getCount() +"\n";
                result.append(info);
            }
            result.append("-------------------------------------\n");
            out.collect(result.toString());
        }
    }

    /**
     * 自定義實(shí)現(xiàn)AggregateFunction, 增量計(jì)算url頁面的訪問量,來一條數(shù)據(jù)就 +1
     */
    public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }


    /**
     * 自定義實(shí)現(xiàn)ProcessWindowFunction, 包裝窗口信息輸出
     */
    public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {

        @Override
        public void process(String url, Context context, Iterable<Long> iterable, Collector<UrlViewCount> out) throws Exception {
            Long urlCount = iterable.iterator().next();
            //集合窗口信息輸出
            long start = context.window().getStart();
            long end = context.window().getEnd();
            UrlViewCount urlCountView = new UrlViewCount();
            urlCountView.setUrl(url);
            urlCountView.setCount(urlCount);
            urlCountView.setWindowStart(start);
            urlCountView.setWindowEnd(end);
            out.collect(urlCountView);
        }
    }
}

代碼中,我們還利用了定時(shí)器的特性:針對(duì)同一 key、同一時(shí)間戳?xí)M(jìn)行去重。所以對(duì)于
同一個(gè)窗口而言,我們接到統(tǒng)計(jì)結(jié)果數(shù)據(jù)后設(shè)定的 windowEnd + 1 的定時(shí)器都是一樣的,最終只會(huì)觸發(fā)一次計(jì)算。而對(duì)于不同的 key(這里 key 是 windowEnd),定時(shí)器和狀態(tài)都是獨(dú)立的,所以我們也不用擔(dān)心不同窗口間數(shù)據(jù)的干擾。

我們?cè)谏厦娴拇a中使用了后面要講解的 ListState。這里可以先簡(jiǎn)單說明一下。我們先聲明一個(gè)列表狀態(tài)變量:

private ListState<UrlViewCount> urlViewCountListState;

然后在 open 方法中初始化了列表狀態(tài)變量,我們初始化的時(shí)候使用了 ListStateDescriptor描述符,這個(gè)描述符用來告訴 Flink 列表狀態(tài)變量的名字和類型。列表狀態(tài)變量是單例,也就是說只會(huì)被實(shí)例化一次。這個(gè)列表狀態(tài)變量的作用域是當(dāng)前 key 所對(duì)應(yīng)的邏輯分區(qū)。我們使用add 方法向列表狀態(tài)變量中添加數(shù)據(jù),使用 get 方法讀取列表狀態(tài)變量中的所有元素。

另外,根據(jù)水位線的定義,我們這里的延遲時(shí)間設(shè)為 0 事實(shí)上也是可以保證數(shù)據(jù)都到齊的。

四、側(cè)輸出流(Side Output)

處理函數(shù)還有另外一個(gè)特有功能,就是將自定義的數(shù)據(jù)放入“側(cè)輸出流”(side output)輸
出。這個(gè)概念我們并不陌生,之前在窗口處理遲到數(shù)據(jù)時(shí),最后一招就是輸出到側(cè)輸出流。而這種處理方式的本質(zhì),其實(shí)就是處理函數(shù)的側(cè)輸出流功能。

絕大多數(shù)轉(zhuǎn)換算子,輸出的都是單一流,流里的數(shù)據(jù)類型只能有一種。而側(cè)輸出流可以認(rèn)為是“主流”上分叉出的“支流”,所以可以由一條流產(chǎn)生出多條流,而且這些流中的數(shù)據(jù)類型還可以不一樣。利用這個(gè)功能可以很容易地實(shí)現(xiàn)“分流”操作。

具體應(yīng)用時(shí),只要在處理函數(shù)的.processElement()或者.onTimer()方法中,調(diào)用上下文
的.output()方法就可以了。

DataStream<Integer> stream = env.addSource(...);

SingleOutputStreamOperator<Long> longStream stream.process(new ProcessFunction<Integer, Long>() {

    @Override
    public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception {
        // 轉(zhuǎn)換成 Long,輸出到主流中
        out.collect(Long.valueOf(value));
        // 轉(zhuǎn)換成 String,輸出到側(cè)輸出流中
        ctx.output(outputTag, "side-output: " + String.valueOf(value));
    }

});

這里 output()方法需要傳入兩個(gè)參數(shù),第一個(gè)是一個(gè)“輸出標(biāo)簽”O(jiān)utputTag,用來標(biāo)識(shí)側(cè)輸出流,一般會(huì)在外部統(tǒng)一聲明;第二個(gè)就是要輸出的數(shù)據(jù)。
我們可以在外部先將OutputTag 聲明出來:

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

如果想要獲取這個(gè)側(cè)輸出流,可以基于處理之后的 DataStream 直接調(diào)用.getSideOutput() 方法,傳入對(duì)應(yīng)的OutputTag,這個(gè)方式與窗口API 中獲取側(cè)輸出流是完全一樣的。

DataStream<String> stringStream = longStream.getSideOutput(outputTag);

參考:
https://blog.csdn.net/mynameisgt/article/details/124313485

https://blog.csdn.net/mengxianglong123/article/details/123872220

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容