2020-11.04-Flink-11(基于時(shí)間和窗口的算子)

1.配置時(shí)間特性

時(shí)間特性是StreamExecutionEnvironment的一個(gè)屬性,可以接受如下值
ProcessingTime
指定算子根據(jù)機(jī)器的系統(tǒng)時(shí)鐘決定數(shù)據(jù)流當(dāng)前的時(shí)間,處理時(shí)間窗口基于機(jī)器時(shí)間觸發(fā),可以涵蓋觸發(fā)時(shí)間點(diǎn)之前到達(dá)算子的任意元素,通常情況下,在窗口算子中使用處理時(shí)間會(huì)導(dǎo)致不確定的結(jié)果,這是因?yàn)榇翱趦?nèi)容取決于元素到達(dá)的速率
EventTime
指定算子根據(jù)自身包含的信息決定當(dāng)前時(shí)間,每個(gè)事件時(shí)間都帶有有個(gè)時(shí)間戳,而系統(tǒng)的邏輯時(shí)間是由水位線來(lái)定義的,時(shí)間戳或是在數(shù)據(jù)進(jìn)入處理管道之前就已經(jīng)存在其中,或是需要由應(yīng)用在數(shù)據(jù)源處分配,只有依靠水位線聲明某個(gè)時(shí)間間隔內(nèi)所有時(shí)間戳都已經(jīng)接受時(shí),事件時(shí)間窗口才會(huì)觸發(fā)
IngestionTime
指定每個(gè)接收的記錄都把在數(shù)據(jù)源算子的處理時(shí)間作為事件時(shí)間的時(shí)間戳,并自動(dòng)生成水位線,IngestionTime表示事件進(jìn)入流處理引擎的時(shí)間

分配時(shí)間戳和生成水位線

通常情況下,應(yīng)該在數(shù)據(jù)源函數(shù)后面立即調(diào)用時(shí)間戳分配器,因?yàn)榇蠖鄶?shù)分配器在生成水位線都會(huì)做出一些有關(guān)元素順序相對(duì)時(shí)間戳的假設(shè),由于元素的讀取過(guò)程通常都是并行的,所以一切引起flink跨并行數(shù)據(jù)流分區(qū)進(jìn)行重新分發(fā)的操作,都會(huì)導(dǎo)致元素的時(shí)間戳發(fā)生亂序,最佳做法就是在盡可能靠近數(shù)據(jù)源的地方,甚至是SourceFunction的內(nèi)部

周期性水位線分配器

public class TestPeriodWatermark implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> { 
    Long currentMaxTimestamp = 0L; 
    final Long maxOutOfOrderness = 1000L;// 延遲時(shí)長(zhǎng)是1s 

    @Nullable 
    @Override public Watermark getCurrentWatermark() { 
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness); 
    } 

    @Override 
    public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { 
        long timestamp = element.f1;         
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); 
        return timestamp; 
    } 
} 

定點(diǎn)水位線分配器
定點(diǎn)水位線不是太常用,主要為輸入流中包含一些用于指示系統(tǒng)進(jìn)度的特殊元組和標(biāo)記,方便根據(jù)輸入元素生成水位線的場(chǎng)景使用的。
由于數(shù)據(jù)流中每一個(gè)遞增的EventTime都會(huì)產(chǎn)生一個(gè)Watermark。在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成。

public class TestPunctuateWatermark implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> { 
    @Nullable 
    @Override 
    public Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) { 
        return new Watermark(extractedTimestamp); 
    } 
    
    @Override 
    public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { 
        return element.f1; 
    } 
} 
水位線、延遲及完整性

水位線可以用于平衡延遲和結(jié)果的完整性,它控制著執(zhí)行某些計(jì)算需要等待的時(shí)間。這個(gè)時(shí)間是預(yù)估的,現(xiàn)實(shí)中不存在完美的水位線,因?yàn)榭倳?huì)存在延遲的記錄?,F(xiàn)實(shí)處理中,需要我們足夠了解從數(shù)據(jù)生成到數(shù)據(jù)源的整個(gè)過(guò)程,來(lái)估算延遲的上線,才能更好的設(shè)置水位線。
如果水位線設(shè)置的過(guò)于寬松,好處是計(jì)算時(shí)能保證近可能多的數(shù)據(jù)被收集到,但由于此時(shí)的水位線遠(yuǎn)落后于處理記錄的時(shí)間戳,導(dǎo)致產(chǎn)生的數(shù)據(jù)結(jié)果延遲較大。
如果設(shè)置的水位線過(guò)于緊迫,數(shù)據(jù)結(jié)果的時(shí)效性當(dāng)然會(huì)更好,但由于水位線大于部分記錄的時(shí)間戳,數(shù)據(jù)的完整性就會(huì)打折扣。所以,水位線的設(shè)置需要更多的去了解數(shù)據(jù),并在數(shù)據(jù)時(shí)效性和完整性上有一個(gè)權(quán)衡。

(這是水位線數(shù)據(jù)的延遲到達(dá)時(shí)間,只要當(dāng)前水位線大于窗口的最大觸發(fā)時(shí)間maxEventTime,就會(huì)觸發(fā)一次窗口.所以水位線的設(shè)置和延遲時(shí)間的設(shè)置需要更多的去了解數(shù)據(jù),并在數(shù)據(jù)時(shí)效性和完整性上有一個(gè)權(quán)衡)

2.處理函數(shù)

雖然時(shí)間信息和水位線對(duì)于很多流式應(yīng)用都至關(guān)重要,但是你可能已經(jīng)注意到,我們無(wú)法通過(guò)前面介紹的DataStream轉(zhuǎn)換來(lái)訪問(wèn),DataStream提供了一組相對(duì)底層的轉(zhuǎn)換--處理函數(shù),處了基本功能,它們還可以訪問(wèn)記錄的時(shí)間戳和水位線,并且支持注冊(cè)在將來(lái)某個(gè)特定時(shí)間觸發(fā)的計(jì)時(shí)器.目前flink提供了8種不同的處理函數(shù)

  1. ProcessFunction:dataStream
  2. KeyedProcessFunction:用于KeyedStream,keyBy之后的流處理
  3. CoProcessFunction:用于connect連接的流
  4. ProcessJoinFunction:用于join流操作
  5. BroadcastProcessFunction:用于廣播
  6. KeyedBroadcastProcessFunction:keyBy之后的廣播
  7. ProcessWindowFunction:窗口增量聚合
  8. ProcessAllWindowFunction:全窗口聚合
時(shí)間服務(wù)和計(jì)時(shí)器

Context和OnTimerContext對(duì)象中TimerService

  1. currentProcessingTime():Long 返回當(dāng)前的處理時(shí)間。
  2. currentWatermark():Long 返回當(dāng)前水位線時(shí)間戳。
  3. registerProcessingTimeTimer(timestamp:Long):Unit
    針對(duì)當(dāng)前鍵值注冊(cè)一個(gè)處理時(shí)間計(jì)時(shí)器,當(dāng)執(zhí)行機(jī)器處理時(shí)間達(dá)到給定的時(shí)間戳,該計(jì)時(shí)器就會(huì)觸發(fā)。
  4. registerEventTimeTimer(timestamp:Long):Unit 針對(duì)當(dāng)前鍵值注冊(cè)一個(gè)事件時(shí)間計(jì)時(shí)器,當(dāng)更新后水位線時(shí)間戳大于或等于計(jì)時(shí)器時(shí)間戳?xí)r,它就會(huì)觸發(fā)。
  5. deleteProcessingTimeTimer(timestamp:Long):Unit 針對(duì)當(dāng)前鍵值刪除一個(gè)注冊(cè)過(guò)的處理時(shí)間計(jì)時(shí)器。如果該計(jì)時(shí)器不存在,則方法不會(huì)有任何作用。
  6. deleteEventTimeTimer(timestamp:Long):Unit 針對(duì)當(dāng)前鍵值刪除一個(gè)注冊(cè)過(guò)事件時(shí)間計(jì)時(shí)器,如果該計(jì)時(shí)器不存在,則方法不會(huì)有任何作用。

計(jì)時(shí)器觸發(fā)時(shí)會(huì)調(diào)用onTimer()回調(diào)函數(shù),系統(tǒng)對(duì)于processElement()和onTimer()兩個(gè)方法調(diào)用同步,防止并發(fā)。
每個(gè)鍵值和時(shí)間戳只能注冊(cè)一個(gè)計(jì)時(shí)器,每個(gè)鍵值可以有多個(gè)計(jì)時(shí)器,但具體到每個(gè)時(shí)間戳就只能有一個(gè)

//某個(gè)傳感器的溫度在1秒的處理時(shí)間內(nèi)持續(xù)上升警告
object KeyedProcessFunctionTemperatureTest {
  def main(args: Array[String]): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.socketTextStream("192.168.200.116",9999)
    import org.apache.flink.api.scala._
    val dataDstream = stream.map(data=>{
      val arr = data.split(",")
      Record(arr(0),arr(1).trim.toLong,arr(2).trim.toDouble)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Record](Time.seconds(1)){
      override def extractTimestamp(element: Record): Long = {
        element.timestamp * 1000
      }
    })
    val resultDStrem = dataDstream.keyBy(_.id).process(new TempIncreaseAlertFunction())

    dataDstream.print("data")
    resultDStrem.print("result")
     env.execute("KeyedProcessFunctionTemperatureTest")
  }
  /**
   * 如果某傳感器的溫度在1秒(處理時(shí)間)持續(xù)增加
   * 則發(fā)出警告
   */
  class TempIncreaseAlertFunction extends  KeyedProcessFunction[String, Record, String] {
    import org.apache.flink.api.scala._
    //定義一個(gè)值狀態(tài),保存上一個(gè)設(shè)備溫度值
    lazy val lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",Types.of[Double]))
    //保存注冊(cè)的定時(shí)器的時(shí)間戳
    lazy val currentTimer = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer",Types.of[Long]))

    override def processElement(value: Record, ctx: KeyedProcessFunction[String, Record, String]#Context, out: Collector[String]): Unit = {
      //獲取前一個(gè)溫度
      val prevTemp = lastTemp.value()
      //更新最近一次溫度
      lastTemp.update(value.temperature)
      //獲取上一個(gè)定時(shí)器的時(shí)間戳
      val curTimerTimestamp =  currentTimer.value()
      if(prevTemp == 0.0 || value.temperature < prevTemp) {
          //溫度下降,刪除當(dāng)前計(jì)時(shí)器
        ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
        currentTimer.clear()
      }else if(value.temperature > prevTemp && curTimerTimestamp == 0){
        //溫度上升,沒(méi)有設(shè)置計(jì)時(shí)器
        //以當(dāng)前時(shí)間 +1秒設(shè)置處理時(shí)間計(jì)時(shí)器
        val timerTs = ctx.timerService().currentProcessingTime() + 1000
        ctx.timerService().registerProcessingTimeTimer(timerTs)
        //更新當(dāng)前計(jì)時(shí)器
        currentTimer.update(timerTs)
      }
    }
    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Record, String]#OnTimerContext, out: Collector[String]): Unit = {
      out.collect("設(shè)備 id 為: " + ctx.getCurrentKey + "的設(shè)備溫度值已經(jīng)連續(xù) 1s 上升了。")
      currentTimer.clear()
    }
  }
}
向副輸出發(fā)送數(shù)據(jù)
CoProcessFunction

3.窗口算子

定義窗口算子

新建一個(gè)窗口算子需要指定兩個(gè)窗口組件

  1. 一個(gè)用于決定輸入流中的元素該如何劃分的窗口分配器(windowassigner).窗口分配器會(huì)產(chǎn)生一個(gè)windowedStream(如果用在非鍵值分區(qū)的DataStream上則是AllWindowedStream)
  2. 一個(gè)作用于windowedStream(AllWindowedStream),用于處理分配到窗口中元素的窗口函數(shù)

非鍵值分區(qū)窗口的行為與鍵值分區(qū)窗口的行為完全一致,但是只會(huì)收集全部數(shù)據(jù)且不支持并行計(jì)算

// Keyed Window
stream
       .keyBy(...)               <-  按照一個(gè)Key進(jìn)行分組
       .window(...)              <-  將數(shù)據(jù)流中的元素分配到相應(yīng)的窗口中
      [.trigger(...)]            <-  指定觸發(fā)器Trigger(可選)
      [.evictor(...)]            <-  指定清除器Evictor(可選)
       .reduce/aggregate/process()      <-  窗口處理函數(shù)Window Function

// Non-Keyed Window
stream
       .windowAll(...)           <-  不分組,將數(shù)據(jù)流中的所有元素分配到相應(yīng)的窗口中
      [.trigger(...)]            <-  指定觸發(fā)器Trigger(可選)
      [.evictor(...)]            <-  指定清除器Evictor(可選)
       .reduce/aggregate/process()      <-  窗口處理函數(shù)Window Function
內(nèi)置窗口分配器
  1. 滾動(dòng)窗口
val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<window function>(...)

// tumbling processing-time windows
input
    .keyBy(...)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<window function>(...)

// 1 hour tumbling event-time windows offset by 15 minutes.
input
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
    .<window function>(...)
  1. 滑動(dòng)窗口
val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)

// sliding processing-time windows
input
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<window function>(...)
  1. 會(huì)話窗口
val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<window function>(...)

// event-time session windows with dynamic gap
input
    .keyBy(...)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
      override def extract(element: T): Long = {
        // determine and return session gap
      }
    }))
    .<window function>(...)

// processing-time session windows with static gap
input
    .keyBy(...)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<window function>(...)


// processing-time session windows with dynamic gap
input
    .keyBy(...)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
      override def extract(element: T): Long = {
        // determine and return session gap
      }
    }))
    .<window function>(...)
在窗口上應(yīng)用函數(shù)
  1. 增量聚合函數(shù),它的應(yīng)用場(chǎng)景是窗口內(nèi)以狀態(tài)形式存儲(chǔ)某個(gè)值且需要根據(jù)每個(gè)加入窗口的元素對(duì)該值進(jìn)行更新
  2. 全量窗口函數(shù),它會(huì)收集窗口內(nèi)的所有元素,并在執(zhí)行計(jì)算時(shí)對(duì)它們進(jìn)行遍歷,雖然全量窗口函數(shù)通常需要占用更多空間,但是它和增量聚合函數(shù)相比,支持更復(fù)雜的邏輯
ReduceFunction
AggregateFunction
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

   // 在一次新的aggregate發(fā)起時(shí),創(chuàng)建一個(gè)新的Accumulator,Accumulator是我們所說(shuō)的中間狀態(tài)數(shù)據(jù),簡(jiǎn)稱ACC
   // 這個(gè)函數(shù)一般在初始化時(shí)調(diào)用
   ACC createAccumulator();

   // 當(dāng)一個(gè)新元素流入時(shí),將新元素與狀態(tài)數(shù)據(jù)ACC合并,返回狀態(tài)數(shù)據(jù)ACC
   ACC add(IN value, ACC accumulator);

   // 將兩個(gè)ACC合并
   ACC merge(ACC a, ACC b);

   // 將中間數(shù)據(jù)轉(zhuǎn)成結(jié)果數(shù)據(jù)
   OUT getResult(ACC accumulator);

}
ProcessWindowFunction

與前兩種方法不同,ProcessWindowFunction要對(duì)窗口內(nèi)的全量數(shù)據(jù)都緩存。在Flink所有API中,process算子以及其對(duì)應(yīng)的函數(shù)是最底層的實(shí)現(xiàn),使用這些函數(shù)能夠訪問(wèn)一些更加底層的數(shù)據(jù),比如,直接操作狀態(tài)等。它在源碼中的定義如下:

/**
 * IN   輸入類型
 * OUT  輸出類型
 * KEY  keyBy中按照Key分組,Key的類型
 * W    窗口的類型
 */
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {

  /**
   * 對(duì)一個(gè)窗口內(nèi)的元素進(jìn)行處理,窗口內(nèi)的元素緩存在Iterable<IN>,進(jìn)行處理后輸出到Collector<OUT>中
   * 我們可以輸出一到多個(gè)結(jié)果
   */
    public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

  /** 
    * 當(dāng)窗口執(zhí)行完畢被清理時(shí),刪除各類狀態(tài)數(shù)據(jù)。
    */
    public void clear(Context context) throws Exception {}

  /**
   * 一個(gè)窗口的上下文,包含窗口的一些元數(shù)據(jù)、狀態(tài)數(shù)據(jù)等。
   */
    public abstract class Context implements java.io.Serializable {

    // 返回當(dāng)前正在處理的Window
        public abstract W window();

    // 返回當(dāng)前Process Time
        public abstract long currentProcessingTime();

    // 返回當(dāng)前Event Time對(duì)應(yīng)的Watermark
        public abstract long currentWatermark();

    // 返回某個(gè)Key下的某個(gè)Window的狀態(tài)
        public abstract KeyedStateStore windowState();

    // 返回某個(gè)Key下的全局狀態(tài)
        public abstract KeyedStateStore globalState();

    // 遲到數(shù)據(jù)發(fā)送到其他位置
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}
ProcessWindowFunction與增量計(jì)算相結(jié)合

當(dāng)我們既想訪問(wèn)窗口里的元數(shù)據(jù),又不想緩存窗口里的所有數(shù)據(jù)時(shí),可以將ProcessWindowFunction與增量計(jì)算函數(shù)相reduce和aggregate結(jié)合。對(duì)于一個(gè)窗口來(lái)說(shuō),F(xiàn)link先增量計(jì)算,窗口關(guān)閉前,將增量計(jì)算結(jié)果發(fā)送給ProcessWindowFunction作為輸入再進(jìn)行處理。

自定義窗口算子(☆)
  1. 分配器
  2. 觸發(fā)器
  3. 移除器

4.基于時(shí)間的雙流Join

基于間隔的join

基于間隔的 Join 會(huì)對(duì)兩條流中擁有相同鍵值以及彼此之間時(shí)間戳不超過(guò)某一指定間隔的事件進(jìn)行 Join

input1
.keyBy(...)
.between(<lower-bound>, <upper-bound>) // 相對(duì)于 input1 的上下界
.process(ProcessJoinFunction) // 處理匹配的事件對(duì)
基于窗口的join

顧名思義,基于窗口的 Join 需要用到 Flink 中的窗口機(jī)制。其原理是將兩條輸入流中的元素分配到公共窗口中并在窗口完成時(shí)進(jìn)行 Join(或 Cogroup)。

input1.join(input2)
.where(...) // 為 input1 指定鍵值屬性
.equalTo(...) // 為 input2 指定鍵值屬性
.window(...) // 指定 WindowAssigner
[.trigger(...)] // 選擇性的指定 Trigger
[.evictor(...)] // 選擇性的指定 Evictor
.apply(...) // 指定 JoinFunction

5.處理遲到數(shù)據(jù)

丟棄遲到事件
重定向遲到事件

側(cè)流輸出(outputtag)

基于遲到事件更新結(jié)果

窗口算子API提供了一個(gè)方法,可以用來(lái)顯式支持遲到的元素,在使用事件時(shí)間窗口,可以指定一個(gè)名為延遲容忍度(allowed lateness)的額外時(shí)間段,配置了該屬性的窗口算子在水位線超過(guò)窗口的結(jié)束時(shí)間戳之后不會(huì)立即刪除窗口,而是會(huì)將窗口繼續(xù)保留該延遲容忍度時(shí)間

水位線與延遲應(yīng)用挺難的,過(guò)段時(shí)間再去看一遍
自定義窗口算子,也挺難的
基于時(shí)間的雙流感覺(jué)挺重要的,但是一直沒(méi)怎么寫(xiě)過(guò)
還有對(duì)延遲數(shù)據(jù)的評(píng)估

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容