1.配置時(shí)間特性
時(shí)間特性是StreamExecutionEnvironment的一個(gè)屬性,可以接受如下值
ProcessingTime
指定算子根據(jù)機(jī)器的系統(tǒng)時(shí)鐘決定數(shù)據(jù)流當(dāng)前的時(shí)間,處理時(shí)間窗口基于機(jī)器時(shí)間觸發(fā),可以涵蓋觸發(fā)時(shí)間點(diǎn)之前到達(dá)算子的任意元素,通常情況下,在窗口算子中使用處理時(shí)間會(huì)導(dǎo)致不確定的結(jié)果,這是因?yàn)榇翱趦?nèi)容取決于元素到達(dá)的速率
EventTime
指定算子根據(jù)自身包含的信息決定當(dāng)前時(shí)間,每個(gè)事件時(shí)間都帶有有個(gè)時(shí)間戳,而系統(tǒng)的邏輯時(shí)間是由水位線來(lái)定義的,時(shí)間戳或是在數(shù)據(jù)進(jìn)入處理管道之前就已經(jīng)存在其中,或是需要由應(yīng)用在數(shù)據(jù)源處分配,只有依靠水位線聲明某個(gè)時(shí)間間隔內(nèi)所有時(shí)間戳都已經(jīng)接受時(shí),事件時(shí)間窗口才會(huì)觸發(fā)
IngestionTime
指定每個(gè)接收的記錄都把在數(shù)據(jù)源算子的處理時(shí)間作為事件時(shí)間的時(shí)間戳,并自動(dòng)生成水位線,IngestionTime表示事件進(jìn)入流處理引擎的時(shí)間
分配時(shí)間戳和生成水位線
通常情況下,應(yīng)該在數(shù)據(jù)源函數(shù)后面立即調(diào)用時(shí)間戳分配器,因?yàn)榇蠖鄶?shù)分配器在生成水位線都會(huì)做出一些有關(guān)元素順序相對(duì)時(shí)間戳的假設(shè),由于元素的讀取過(guò)程通常都是并行的,所以一切引起flink跨并行數(shù)據(jù)流分區(qū)進(jìn)行重新分發(fā)的操作,都會(huì)導(dǎo)致元素的時(shí)間戳發(fā)生亂序,最佳做法就是在盡可能靠近數(shù)據(jù)源的地方,甚至是SourceFunction的內(nèi)部
周期性水位線分配器
public class TestPeriodWatermark implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 1000L;// 延遲時(shí)長(zhǎng)是1s
@Nullable
@Override public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
}
定點(diǎn)水位線分配器
定點(diǎn)水位線不是太常用,主要為輸入流中包含一些用于指示系統(tǒng)進(jìn)度的特殊元組和標(biāo)記,方便根據(jù)輸入元素生成水位線的場(chǎng)景使用的。
由于數(shù)據(jù)流中每一個(gè)遞增的EventTime都會(huì)產(chǎn)生一個(gè)Watermark。在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成。
public class TestPunctuateWatermark implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
return element.f1;
}
}
水位線、延遲及完整性
水位線可以用于平衡延遲和結(jié)果的完整性,它控制著執(zhí)行某些計(jì)算需要等待的時(shí)間。這個(gè)時(shí)間是預(yù)估的,現(xiàn)實(shí)中不存在完美的水位線,因?yàn)榭倳?huì)存在延遲的記錄?,F(xiàn)實(shí)處理中,需要我們足夠了解從數(shù)據(jù)生成到數(shù)據(jù)源的整個(gè)過(guò)程,來(lái)估算延遲的上線,才能更好的設(shè)置水位線。
如果水位線設(shè)置的過(guò)于寬松,好處是計(jì)算時(shí)能保證近可能多的數(shù)據(jù)被收集到,但由于此時(shí)的水位線遠(yuǎn)落后于處理記錄的時(shí)間戳,導(dǎo)致產(chǎn)生的數(shù)據(jù)結(jié)果延遲較大。
如果設(shè)置的水位線過(guò)于緊迫,數(shù)據(jù)結(jié)果的時(shí)效性當(dāng)然會(huì)更好,但由于水位線大于部分記錄的時(shí)間戳,數(shù)據(jù)的完整性就會(huì)打折扣。所以,水位線的設(shè)置需要更多的去了解數(shù)據(jù),并在數(shù)據(jù)時(shí)效性和完整性上有一個(gè)權(quán)衡。
(這是水位線數(shù)據(jù)的延遲到達(dá)時(shí)間,只要當(dāng)前水位線大于窗口的最大觸發(fā)時(shí)間maxEventTime,就會(huì)觸發(fā)一次窗口.所以水位線的設(shè)置和延遲時(shí)間的設(shè)置需要更多的去了解數(shù)據(jù),并在數(shù)據(jù)時(shí)效性和完整性上有一個(gè)權(quán)衡)
2.處理函數(shù)
雖然時(shí)間信息和水位線對(duì)于很多流式應(yīng)用都至關(guān)重要,但是你可能已經(jīng)注意到,我們無(wú)法通過(guò)前面介紹的DataStream轉(zhuǎn)換來(lái)訪問(wèn),DataStream提供了一組相對(duì)底層的轉(zhuǎn)換--處理函數(shù),處了基本功能,它們還可以訪問(wèn)記錄的時(shí)間戳和水位線,并且支持注冊(cè)在將來(lái)某個(gè)特定時(shí)間觸發(fā)的計(jì)時(shí)器.目前flink提供了8種不同的處理函數(shù)
- ProcessFunction:dataStream
- KeyedProcessFunction:用于KeyedStream,keyBy之后的流處理
- CoProcessFunction:用于connect連接的流
- ProcessJoinFunction:用于join流操作
- BroadcastProcessFunction:用于廣播
- KeyedBroadcastProcessFunction:keyBy之后的廣播
- ProcessWindowFunction:窗口增量聚合
- ProcessAllWindowFunction:全窗口聚合
時(shí)間服務(wù)和計(jì)時(shí)器
Context和OnTimerContext對(duì)象中TimerService
- currentProcessingTime():Long 返回當(dāng)前的處理時(shí)間。
- currentWatermark():Long 返回當(dāng)前水位線時(shí)間戳。
- registerProcessingTimeTimer(timestamp:Long):Unit
針對(duì)當(dāng)前鍵值注冊(cè)一個(gè)處理時(shí)間計(jì)時(shí)器,當(dāng)執(zhí)行機(jī)器處理時(shí)間達(dá)到給定的時(shí)間戳,該計(jì)時(shí)器就會(huì)觸發(fā)。 - registerEventTimeTimer(timestamp:Long):Unit 針對(duì)當(dāng)前鍵值注冊(cè)一個(gè)事件時(shí)間計(jì)時(shí)器,當(dāng)更新后水位線時(shí)間戳大于或等于計(jì)時(shí)器時(shí)間戳?xí)r,它就會(huì)觸發(fā)。
- deleteProcessingTimeTimer(timestamp:Long):Unit 針對(duì)當(dāng)前鍵值刪除一個(gè)注冊(cè)過(guò)的處理時(shí)間計(jì)時(shí)器。如果該計(jì)時(shí)器不存在,則方法不會(huì)有任何作用。
- deleteEventTimeTimer(timestamp:Long):Unit 針對(duì)當(dāng)前鍵值刪除一個(gè)注冊(cè)過(guò)事件時(shí)間計(jì)時(shí)器,如果該計(jì)時(shí)器不存在,則方法不會(huì)有任何作用。
計(jì)時(shí)器觸發(fā)時(shí)會(huì)調(diào)用onTimer()回調(diào)函數(shù),系統(tǒng)對(duì)于processElement()和onTimer()兩個(gè)方法調(diào)用同步,防止并發(fā)。
每個(gè)鍵值和時(shí)間戳只能注冊(cè)一個(gè)計(jì)時(shí)器,每個(gè)鍵值可以有多個(gè)計(jì)時(shí)器,但具體到每個(gè)時(shí)間戳就只能有一個(gè)
//某個(gè)傳感器的溫度在1秒的處理時(shí)間內(nèi)持續(xù)上升警告
object KeyedProcessFunctionTemperatureTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("192.168.200.116",9999)
import org.apache.flink.api.scala._
val dataDstream = stream.map(data=>{
val arr = data.split(",")
Record(arr(0),arr(1).trim.toLong,arr(2).trim.toDouble)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Record](Time.seconds(1)){
override def extractTimestamp(element: Record): Long = {
element.timestamp * 1000
}
})
val resultDStrem = dataDstream.keyBy(_.id).process(new TempIncreaseAlertFunction())
dataDstream.print("data")
resultDStrem.print("result")
env.execute("KeyedProcessFunctionTemperatureTest")
}
/**
* 如果某傳感器的溫度在1秒(處理時(shí)間)持續(xù)增加
* 則發(fā)出警告
*/
class TempIncreaseAlertFunction extends KeyedProcessFunction[String, Record, String] {
import org.apache.flink.api.scala._
//定義一個(gè)值狀態(tài),保存上一個(gè)設(shè)備溫度值
lazy val lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",Types.of[Double]))
//保存注冊(cè)的定時(shí)器的時(shí)間戳
lazy val currentTimer = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer",Types.of[Long]))
override def processElement(value: Record, ctx: KeyedProcessFunction[String, Record, String]#Context, out: Collector[String]): Unit = {
//獲取前一個(gè)溫度
val prevTemp = lastTemp.value()
//更新最近一次溫度
lastTemp.update(value.temperature)
//獲取上一個(gè)定時(shí)器的時(shí)間戳
val curTimerTimestamp = currentTimer.value()
if(prevTemp == 0.0 || value.temperature < prevTemp) {
//溫度下降,刪除當(dāng)前計(jì)時(shí)器
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
currentTimer.clear()
}else if(value.temperature > prevTemp && curTimerTimestamp == 0){
//溫度上升,沒(méi)有設(shè)置計(jì)時(shí)器
//以當(dāng)前時(shí)間 +1秒設(shè)置處理時(shí)間計(jì)時(shí)器
val timerTs = ctx.timerService().currentProcessingTime() + 1000
ctx.timerService().registerProcessingTimeTimer(timerTs)
//更新當(dāng)前計(jì)時(shí)器
currentTimer.update(timerTs)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Record, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("設(shè)備 id 為: " + ctx.getCurrentKey + "的設(shè)備溫度值已經(jīng)連續(xù) 1s 上升了。")
currentTimer.clear()
}
}
}
向副輸出發(fā)送數(shù)據(jù)
CoProcessFunction
3.窗口算子
定義窗口算子
新建一個(gè)窗口算子需要指定兩個(gè)窗口組件
- 一個(gè)用于決定輸入流中的元素該如何劃分的窗口分配器(
windowassigner).窗口分配器會(huì)產(chǎn)生一個(gè)windowedStream(如果用在非鍵值分區(qū)的DataStream上則是AllWindowedStream) - 一個(gè)作用于
windowedStream(AllWindowedStream),用于處理分配到窗口中元素的窗口函數(shù)
非鍵值分區(qū)窗口的行為與鍵值分區(qū)窗口的行為完全一致,但是只會(huì)收集全部數(shù)據(jù)且不支持并行計(jì)算
// Keyed Window
stream
.keyBy(...) <- 按照一個(gè)Key進(jìn)行分組
.window(...) <- 將數(shù)據(jù)流中的元素分配到相應(yīng)的窗口中
[.trigger(...)] <- 指定觸發(fā)器Trigger(可選)
[.evictor(...)] <- 指定清除器Evictor(可選)
.reduce/aggregate/process() <- 窗口處理函數(shù)Window Function
// Non-Keyed Window
stream
.windowAll(...) <- 不分組,將數(shù)據(jù)流中的所有元素分配到相應(yīng)的窗口中
[.trigger(...)] <- 指定觸發(fā)器Trigger(可選)
[.evictor(...)] <- 指定清除器Evictor(可選)
.reduce/aggregate/process() <- 窗口處理函數(shù)Window Function
內(nèi)置窗口分配器
- 滾動(dòng)窗口
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function>(...)
// tumbling processing-time windows
input
.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function>(...)
// 1 hour tumbling event-time windows offset by 15 minutes.
input
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.<window function>(...)
- 滑動(dòng)窗口
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)
// sliding processing-time windows
input
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<window function>(...)
- 會(huì)話窗口
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// event-time session windows with dynamic gap
input
.keyBy(...)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {
// determine and return session gap
}
}))
.<window function>(...)
// processing-time session windows with static gap
input
.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// processing-time session windows with dynamic gap
input
.keyBy(...)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {
// determine and return session gap
}
}))
.<window function>(...)
在窗口上應(yīng)用函數(shù)
- 增量聚合函數(shù),它的應(yīng)用場(chǎng)景是窗口內(nèi)以狀態(tài)形式存儲(chǔ)某個(gè)值且需要根據(jù)每個(gè)加入窗口的元素對(duì)該值進(jìn)行更新
- 全量窗口函數(shù),它會(huì)收集窗口內(nèi)的所有元素,并在執(zhí)行計(jì)算時(shí)對(duì)它們進(jìn)行遍歷,雖然全量窗口函數(shù)通常需要占用更多空間,但是它和增量聚合函數(shù)相比,支持更復(fù)雜的邏輯
ReduceFunction
AggregateFunction
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
// 在一次新的aggregate發(fā)起時(shí),創(chuàng)建一個(gè)新的Accumulator,Accumulator是我們所說(shuō)的中間狀態(tài)數(shù)據(jù),簡(jiǎn)稱ACC
// 這個(gè)函數(shù)一般在初始化時(shí)調(diào)用
ACC createAccumulator();
// 當(dāng)一個(gè)新元素流入時(shí),將新元素與狀態(tài)數(shù)據(jù)ACC合并,返回狀態(tài)數(shù)據(jù)ACC
ACC add(IN value, ACC accumulator);
// 將兩個(gè)ACC合并
ACC merge(ACC a, ACC b);
// 將中間數(shù)據(jù)轉(zhuǎn)成結(jié)果數(shù)據(jù)
OUT getResult(ACC accumulator);
}
ProcessWindowFunction
與前兩種方法不同,ProcessWindowFunction要對(duì)窗口內(nèi)的全量數(shù)據(jù)都緩存。在Flink所有API中,process算子以及其對(duì)應(yīng)的函數(shù)是最底層的實(shí)現(xiàn),使用這些函數(shù)能夠訪問(wèn)一些更加底層的數(shù)據(jù),比如,直接操作狀態(tài)等。它在源碼中的定義如下:
/**
* IN 輸入類型
* OUT 輸出類型
* KEY keyBy中按照Key分組,Key的類型
* W 窗口的類型
*/
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
/**
* 對(duì)一個(gè)窗口內(nèi)的元素進(jìn)行處理,窗口內(nèi)的元素緩存在Iterable<IN>,進(jìn)行處理后輸出到Collector<OUT>中
* 我們可以輸出一到多個(gè)結(jié)果
*/
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
* 當(dāng)窗口執(zhí)行完畢被清理時(shí),刪除各類狀態(tài)數(shù)據(jù)。
*/
public void clear(Context context) throws Exception {}
/**
* 一個(gè)窗口的上下文,包含窗口的一些元數(shù)據(jù)、狀態(tài)數(shù)據(jù)等。
*/
public abstract class Context implements java.io.Serializable {
// 返回當(dāng)前正在處理的Window
public abstract W window();
// 返回當(dāng)前Process Time
public abstract long currentProcessingTime();
// 返回當(dāng)前Event Time對(duì)應(yīng)的Watermark
public abstract long currentWatermark();
// 返回某個(gè)Key下的某個(gè)Window的狀態(tài)
public abstract KeyedStateStore windowState();
// 返回某個(gè)Key下的全局狀態(tài)
public abstract KeyedStateStore globalState();
// 遲到數(shù)據(jù)發(fā)送到其他位置
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
ProcessWindowFunction與增量計(jì)算相結(jié)合
當(dāng)我們既想訪問(wèn)窗口里的元數(shù)據(jù),又不想緩存窗口里的所有數(shù)據(jù)時(shí),可以將ProcessWindowFunction與增量計(jì)算函數(shù)相reduce和aggregate結(jié)合。對(duì)于一個(gè)窗口來(lái)說(shuō),F(xiàn)link先增量計(jì)算,窗口關(guān)閉前,將增量計(jì)算結(jié)果發(fā)送給ProcessWindowFunction作為輸入再進(jìn)行處理。
自定義窗口算子(☆)
- 分配器
- 觸發(fā)器
- 移除器
4.基于時(shí)間的雙流Join
基于間隔的join
基于間隔的 Join 會(huì)對(duì)兩條流中擁有相同鍵值以及彼此之間時(shí)間戳不超過(guò)某一指定間隔的事件進(jìn)行 Join
input1
.keyBy(...)
.between(<lower-bound>, <upper-bound>) // 相對(duì)于 input1 的上下界
.process(ProcessJoinFunction) // 處理匹配的事件對(duì)
基于窗口的join
顧名思義,基于窗口的 Join 需要用到 Flink 中的窗口機(jī)制。其原理是將兩條輸入流中的元素分配到公共窗口中并在窗口完成時(shí)進(jìn)行 Join(或 Cogroup)。
input1.join(input2)
.where(...) // 為 input1 指定鍵值屬性
.equalTo(...) // 為 input2 指定鍵值屬性
.window(...) // 指定 WindowAssigner
[.trigger(...)] // 選擇性的指定 Trigger
[.evictor(...)] // 選擇性的指定 Evictor
.apply(...) // 指定 JoinFunction
5.處理遲到數(shù)據(jù)
丟棄遲到事件
重定向遲到事件
側(cè)流輸出(outputtag)
基于遲到事件更新結(jié)果
窗口算子API提供了一個(gè)方法,可以用來(lái)顯式支持遲到的元素,在使用事件時(shí)間窗口,可以指定一個(gè)名為延遲容忍度(allowed lateness)的額外時(shí)間段,配置了該屬性的窗口算子在水位線超過(guò)窗口的結(jié)束時(shí)間戳之后不會(huì)立即刪除窗口,而是會(huì)將窗口繼續(xù)保留該延遲容忍度時(shí)間
水位線與延遲應(yīng)用挺難的,過(guò)段時(shí)間再去看一遍
自定義窗口算子,也挺難的
基于時(shí)間的雙流感覺(jué)挺重要的,但是一直沒(méi)怎么寫(xiě)過(guò)
還有對(duì)延遲數(shù)據(jù)的評(píng)估