概述
2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flink 分支版本)開源,大數(shù)據(jù)領(lǐng)域一夜間從 Spark 獨(dú)步天下走向了兩強(qiáng)爭霸的時(shí)代。Flink 因?yàn)槠涮烊坏牧魇接?jì)算特性以及強(qiáng)大的處理性能成為炙手可熱的大數(shù)據(jù)處理框架。
時(shí)至今日,F(xiàn)link 已經(jīng)發(fā)展到 1.9 版本,在大數(shù)據(jù)開發(fā)領(lǐng)域,面試中對于 Flink 的考察已經(jīng)是大數(shù)據(jù)開發(fā)求職者必須面對的,本文結(jié)合自己作為面試官過程中的經(jīng)驗(yàn)詳細(xì)總結(jié)了近 50 個(gè)關(guān)于 Flink 的面試考察點(diǎn)。
在本場 Chat 中,分為以下幾個(gè)部分:
第一部分:Flink 中的核心概念和基礎(chǔ)篇,包含了 Flink 的整體介紹、核心概念、算子等考察點(diǎn)。
第二部分:Flink 進(jìn)階篇,包含了 Flink 中的數(shù)據(jù)傳輸、容錯(cuò)機(jī)制、序列化、數(shù)據(jù)熱點(diǎn)、反壓等實(shí)際生產(chǎn)環(huán)境中遇到的問題等考察點(diǎn)。
第三部分:Flink 源碼篇,包含了 Flink 的核心代碼實(shí)現(xiàn)、Job 提交流程、數(shù)據(jù)交換、分布式快照機(jī)制、Flink SQL 的原理等考察點(diǎn)。
第一部分:Flink 中的核心概念和基礎(chǔ)考察
**一、 簡單介紹一下 Flink **
Flink 是一個(gè)框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。并且 Flink 提供了數(shù)據(jù)分布、容錯(cuò)機(jī)制以及資源管理等核心功能。
Flink提供了諸多高抽象層的API以便用戶編寫分布式任務(wù):
DataSet API, 對靜態(tài)數(shù)據(jù)進(jìn)行批處理操作,將靜態(tài)數(shù)據(jù)抽象成分布式的數(shù)據(jù)集,用戶可以方便地使用Flink提供的各種操作符對分布式數(shù)據(jù)集進(jìn)行處理,支持Java、Scala和Python。
DataStream API,對數(shù)據(jù)流進(jìn)行流處理操作,將流式的數(shù)據(jù)抽象成分布式的數(shù)據(jù)流,用戶可以方便地對分布式數(shù)據(jù)流進(jìn)行各種操作,支持Java和Scala。
Table API,對結(jié)構(gòu)化數(shù)據(jù)進(jìn)行查詢操作,將結(jié)構(gòu)化數(shù)據(jù)抽象成關(guān)系表,并通過類SQL的DSL對關(guān)系表進(jìn)行各種查詢操作,支持Java和Scala。
此外,F(xiàn)link 還針對特定的應(yīng)用領(lǐng)域提供了領(lǐng)域庫,例如:
Flink ML,F(xiàn)link 的機(jī)器學(xué)習(xí)庫,提供了機(jī)器學(xué)習(xí)Pipelines API并實(shí)現(xiàn)了多種機(jī)器學(xué)習(xí)算法。
Gelly,F(xiàn)link 的圖計(jì)算庫,提供了圖計(jì)算的相關(guān)API及多種圖計(jì)算算法實(shí)現(xiàn)。
根據(jù)官網(wǎng)的介紹,F(xiàn)link 的特性包含:
支持高吞吐、低延遲、高性能的流處理
支持帶有事件時(shí)間的窗口 (Window) 操作
支持有狀態(tài)計(jì)算的 Exactly-once 語義
支持高度靈活的窗口 (Window) 操作,支持基于 time、count、session 以及 data-driven 的窗口操作
支持具有 Backpressure 功能的持續(xù)流模型
支持基于輕量級分布式快照(Snapshot)實(shí)現(xiàn)的容錯(cuò)
一個(gè)運(yùn)行時(shí)同時(shí)支持 Batch on Streaming 處理和 Streaming 處理
Flink 在 JVM 內(nèi)部實(shí)現(xiàn)了自己的內(nèi)存管理
支持迭代計(jì)算
支持程序自動優(yōu)化:避免特定情況下 Shuffle、排序等昂貴操作,中間結(jié)果有必要進(jìn)行緩存
二、 Flink 相比傳統(tǒng)的 Spark Streaming 有什么區(qū)別?
這個(gè)問題是一個(gè)非常宏觀的問題,因?yàn)閮蓚€(gè)框架的不同點(diǎn)非常之多。但是在面試時(shí)有非常重要的一點(diǎn)一定要回答出來:Flink 是標(biāo)準(zhǔn)的實(shí)時(shí)處理引擎,基于事件驅(qū)動。而 Spark Streaming 是微批(Micro-Batch)的模型。
下面我們就分幾個(gè)方面介紹兩個(gè)框架的主要區(qū)別:
1. 架構(gòu)模型
Spark Streaming 在運(yùn)行時(shí)的主要角色包括:Master、Worker、Driver、Executor,F(xiàn)link 在運(yùn)行時(shí)主要包含:Jobmanager、Taskmanager和Slot。
2. 任務(wù)調(diào)度
Spark Streaming 連續(xù)不斷的生成微小的數(shù)據(jù)批次,構(gòu)建有向無環(huán)圖DAG,Spark Streaming 會依次創(chuàng)建 DStreamGraph、JobGenerator、JobScheduler。
Flink 根據(jù)用戶提交的代碼生成 StreamGraph,經(jīng)過優(yōu)化生成 JobGraph,然后提交給 JobManager進(jìn)行處理,JobManager 會根據(jù) JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 調(diào)度最核心的數(shù)據(jù)結(jié)構(gòu),JobManager 根據(jù) ExecutionGraph 對 Job 進(jìn)行調(diào)度。
3. 時(shí)間機(jī)制
Spark Streaming 支持的時(shí)間機(jī)制有限,只支持處理時(shí)間。
Flink 支持了流處理程序在時(shí)間上的三個(gè)定義:處理時(shí)間、事件時(shí)間、注入時(shí)間。同時(shí)也支持 watermark 機(jī)制來處理滯后數(shù)據(jù)。
4. 容錯(cuò)機(jī)制
對于 Spark Streaming 任務(wù),我們可以設(shè)置 checkpoint,然后假如發(fā)生故障并重啟,我們可以從上次 checkpoint 之處恢復(fù),但是這個(gè)行為只能使得數(shù)據(jù)不丟失,可能會重復(fù)處理,不能做到恰一次處理語義。
Flink 則使用兩階段提交協(xié)議來解決這個(gè)問題。
三、 Flink 的組件棧有哪些?
根據(jù) Flink 官網(wǎng)描述,F(xiàn)link 是一個(gè)分層架構(gòu)的系統(tǒng),每一層所包含的組件都提供了特定的抽象,用來服務(wù)于上層組件。

圖片來源于:https://flink.apache.org
自下而上,每一層分別代表:
Deploy 層:該層主要涉及了Flink的部署模式,在上圖中我們可以看出,F(xiàn)link 支持包括local、Standalone、Cluster、Cloud等多種部署模式。
Runtime 層:Runtime層提供了支持 Flink 計(jì)算的核心實(shí)現(xiàn),比如:支持分布式 Stream 處理、JobGraph到ExecutionGraph的映射、調(diào)度等等,為上層API層提供基礎(chǔ)服務(wù)。
API層:API 層主要實(shí)現(xiàn)了面向流(Stream)處理和批(Batch)處理API,其中面向流處理對應(yīng)DataStream API,面向批處理對應(yīng)DataSet API,后續(xù)版本,F(xiàn)link有計(jì)劃將DataStream和DataSet API進(jìn)行統(tǒng)一。
Libraries層:該層稱為Flink應(yīng)用框架層,根據(jù)API層的劃分,在API層之上構(gòu)建的滿足特定應(yīng)用的實(shí)現(xiàn)計(jì)算框架,也分別對應(yīng)于面向流處理和面向批處理兩類。面向流處理支持:CEP(復(fù)雜事件處理)、基于SQL-like的操作(基于Table的關(guān)系操作);面向批處理支持:FlinkML(機(jī)器學(xué)習(xí)庫)、Gelly(圖處理)。
四、Flink 的運(yùn)行必須依賴 Hadoop組件嗎?
Flink可以完全獨(dú)立于Hadoop,在不依賴Hadoop組件下運(yùn)行。
但是做為大數(shù)據(jù)的基礎(chǔ)設(shè)施,Hadoop體系是任何大數(shù)據(jù)框架都繞不過去的。Flink可以集成眾多Hadooop 組件,例如Yarn、Hbase、HDFS等等。例如,F(xiàn)link可以和Yarn集成做資源調(diào)度,也可以讀寫HDFS,或者利用HDFS做檢查點(diǎn)。
五、你們的Flink集群規(guī)模多大?
大家注意,這個(gè)問題看起來是問你實(shí)際應(yīng)用中的Flink集群規(guī)模,其實(shí)還隱藏著另一個(gè)問題:Flink可以支持多少節(jié)點(diǎn)的集群規(guī)模?
在回答這個(gè)問題時(shí)候,可以將自己生產(chǎn)環(huán)節(jié)中的集群規(guī)模、節(jié)點(diǎn)、內(nèi)存情況說明,同時(shí)說明部署模式(一般是Flink on Yarn),除此之外,用戶也可以同時(shí)在小集群(少于5個(gè)節(jié)點(diǎn))和擁有 TB 級別狀態(tài)的上千個(gè)節(jié)點(diǎn)上運(yùn)行 Flink 任務(wù)。
** 六、Flink的基礎(chǔ)編程模型了解嗎? **

上圖是來自Flink官網(wǎng)的運(yùn)行流程圖。
通過上圖我們可以得知,F(xiàn)link 程序的基本構(gòu)建是數(shù)據(jù)輸入來自一個(gè) Source,Source 代表數(shù)據(jù)的輸入端,經(jīng)過 Transformation 進(jìn)行轉(zhuǎn)換,然后在一個(gè)或者多個(gè)Sink接收器中結(jié)束。數(shù)據(jù)流(stream)就是一組永遠(yuǎn)不會停止的數(shù)據(jù)記錄流,而轉(zhuǎn)換(transformation)是將一個(gè)或多個(gè)流作為輸入,并生成一個(gè)或多個(gè)輸出流的操作。執(zhí)行時(shí),F(xiàn)link程序映射到 streaming dataflows,由流(streams)和轉(zhuǎn)換操作(transformation operators)組成。
** 七、Flink集群有哪些角色?各自有什么作用? **

Flink 程序在運(yùn)行時(shí)主要有 TaskManager,JobManager,Client三種角色。
其中JobManager扮演著集群中的管理者M(jìn)aster的角色,它是整個(gè)集群的協(xié)調(diào)者,負(fù)責(zé)接收Flink Job,協(xié)調(diào)檢查點(diǎn),F(xiàn)ailover 故障恢復(fù)等,同時(shí)管理Flink集群中從節(jié)點(diǎn)TaskManager。
TaskManager是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的Worker,在其上執(zhí)行Flink Job的一組Task,每個(gè)TaskManager負(fù)責(zé)管理其所在節(jié)點(diǎn)上的資源信息,如內(nèi)存、磁盤、網(wǎng)絡(luò),在啟動的時(shí)候?qū)①Y源的狀態(tài)向JobManager匯報(bào)。
Client是Flink程序提交的客戶端,當(dāng)用戶提交一個(gè)Flink程序時(shí),會首先創(chuàng)建一個(gè)Client,該Client首先會對用戶提交的Flink程序進(jìn)行預(yù)處理,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,并建立到JobManager的連接,將Flink Job提交給JobManager。
** 八、說說 Flink 資源管理中 Task Slot 的概念**

在Flink架構(gòu)角色中我們提到,TaskManager是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的Worker,TaskManager 是一個(gè) JVM 進(jìn)程,并會以獨(dú)立的線程來執(zhí)行一個(gè)task或多個(gè)subtask。為了控制一個(gè) TaskManager 能接受多少個(gè) task,F(xiàn)link 提出了 Task Slot 的概念。
簡單的說,TaskManager會將自己節(jié)點(diǎn)上管理的資源分為不同的Slot:固定大小的資源子集。
這樣就避免了不同Job的Task互相競爭內(nèi)存資源,但是需要主要的是,Slot只會做內(nèi)存的隔離。沒有做CPU的隔離。
** 九、說說 Flink 的常用算子? **
Flink 最常用的常用算子包括:
Map:DataStream → DataStream,輸入一個(gè)參數(shù)產(chǎn)生一個(gè)參數(shù),map的功能是對輸入的參數(shù)進(jìn)行轉(zhuǎn)換操作。
Filter:過濾掉指定條件的數(shù)據(jù)。
KeyBy:按照指定的key進(jìn)行分組。
Reduce:用來進(jìn)行結(jié)果匯總合并。
Window:窗口函數(shù),根據(jù)某些特性將每個(gè)key的數(shù)據(jù)進(jìn)行分組(例如:在5s內(nèi)到達(dá)的數(shù)據(jù))
** 十、說說你知道的Flink分區(qū)策略? **
什么要搞懂什么是分區(qū)策略。
分區(qū)策略是用來決定數(shù)據(jù)如何發(fā)送至下游。目前 Flink 支持了8中分區(qū)策略的實(shí)現(xiàn)。

上圖是整個(gè)Flink實(shí)現(xiàn)的分區(qū)策略繼承圖:
GlobalPartitioner
數(shù)據(jù)會被分發(fā)到下游算子的第一個(gè)實(shí)例中進(jìn)行處理。
ShufflePartitioner
數(shù)據(jù)會被隨機(jī)分發(fā)到下游算子的每一個(gè)實(shí)例中進(jìn)行處理。
RebalancePartitioner
數(shù)據(jù)會被循環(huán)發(fā)送到下游的每一個(gè)實(shí)例中進(jìn)行處理。
RescalePartitioner
這種分區(qū)器會根據(jù)上下游算子的并行度,循環(huán)的方式輸出到下游算子的每個(gè)實(shí)例。
這里有點(diǎn)難以理解,假設(shè)上游并行度為2,編號為A和B。下游并行度為4,編號為1,2,3,4。
那么A則把數(shù)據(jù)循環(huán)發(fā)送給1和2,B則把數(shù)據(jù)循環(huán)發(fā)送給3和4。
假設(shè)上游并行度為4,編號為A,B,C,D。下游并行度為2,編號為1,2。那么A和B則把數(shù)據(jù)發(fā)送給1,C和D則把數(shù)據(jù)發(fā)送給2。
BroadcastPartitioner
廣播分區(qū)會將上游數(shù)據(jù)輸出到下游算子的每個(gè)實(shí)例中。適合于大數(shù)據(jù)集和小數(shù)據(jù)集做Jion的場景。
ForwardPartitioner
ForwardPartitioner 用于將記錄輸出到下游本地的算子實(shí)例。它要求上下游算子并行度一樣。
簡單的說,F(xiàn)orwardPartitioner用來做數(shù)據(jù)的控制臺打印。
KeyGroupStreamPartitioner
Hash分區(qū)器。會將數(shù)據(jù)按 Key 的 Hash 值輸出到下游算子實(shí)例中。
CustomPartitionerWrapper
用戶自定義分區(qū)器。需要用戶自己實(shí)現(xiàn)Partitioner接口,來定義自己的分區(qū)邏輯。
例如:
static class CustomPartitioner implements Partitioner<String> {
@Override
public int partition(String key, int numPartitions) {
switch (key){
case "1":
return 1;
case "2":
return 2;
case "3":
return 3;
default:
return 4;
}
}
}
** 十一、Flink的并行度了解嗎?Flink的并行度設(shè)置是怎樣的? **
Flink中的任務(wù)被分為多個(gè)并行任務(wù)來執(zhí)行,其中每個(gè)并行的實(shí)例處理一部分?jǐn)?shù)據(jù)。這些并行實(shí)例的數(shù)量被稱為并行度。
我們在實(shí)際生產(chǎn)環(huán)境中可以從四個(gè)不同層面設(shè)置并行度:
- 操作算子層面(Operator Level)
- 執(zhí)行環(huán)境層面(Execution Environment Level)
- 客戶端層面(Client Level)
- 系統(tǒng)層面(System Level)
需要注意的優(yōu)先級:算子層面>環(huán)境層面>客戶端層面>系統(tǒng)層面。
** 十二、Flink的Slot和parallelism有什么區(qū)別?**
官網(wǎng)上十分經(jīng)典的圖:

slot是指taskmanager的并發(fā)執(zhí)行能力,假設(shè)我們將 taskmanager.numberOfTaskSlots 配置為3
那么每一個(gè) taskmanager 中分配3個(gè) TaskSlot, 3個(gè) taskmanager 一共有9個(gè)TaskSlot。

parallelism是指taskmanager實(shí)際使用的并發(fā)能力。假設(shè)我們把 parallelism.default 設(shè)置為1,那么9個(gè) TaskSlot 只能用1個(gè),有8個(gè)空閑。
** 十三、Flink有沒有重啟策略?說說有哪幾種?**
Flink 實(shí)現(xiàn)了多種重啟策略。
- 固定延遲重啟策略(Fixed Delay Restart Strategy)
- 故障率重啟策略(Failure Rate Restart Strategy)
- 沒有重啟策略(No Restart Strategy)
- Fallback重啟策略(Fallback Restart Strategy)
** 十四、用過Flink中的分布式緩存嗎?如何使用? **
Flink實(shí)現(xiàn)的分布式緩存和Hadoop有異曲同工之妙。目的是在本地讀取文件,并把他放在 taskmanager 節(jié)點(diǎn)中,防止task重復(fù)拉取。
val env = ExecutionEnvironment.getExecutionEnvironment
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()
** 十五、說說Flink中的廣播變量,使用時(shí)需要注意什么? **
我們知道Flink是并行的,計(jì)算過程可能不在一個(gè) Slot 中進(jìn)行,那么有一種情況即:當(dāng)我們需要訪問同一份數(shù)據(jù)。那么Flink中的廣播變量就是為了解決這種情況。
我們可以把廣播變量理解為是一個(gè)公共的共享變量,我們可以把一個(gè)dataset 數(shù)據(jù)集廣播出去,然后不同的task在節(jié)點(diǎn)上都能夠獲取到,這個(gè)數(shù)據(jù)在每個(gè)節(jié)點(diǎn)上只會存在一份。
** 十六、說說Flink中的窗口? **
來一張官網(wǎng)經(jīng)典的圖:

Flink 支持兩種劃分窗口的方式,按照time和count。如果根據(jù)時(shí)間劃分窗口,那么它就是一個(gè)time-window 如果根據(jù)數(shù)據(jù)劃分窗口,那么它就是一個(gè)count-window。
flink支持窗口的兩個(gè)重要屬性(size和interval)
如果size=interval,那么就會形成tumbling-window(無重疊數(shù)據(jù))
如果size>interval,那么就會形成sliding-window(有重疊數(shù)據(jù))
如果size< interval, 那么這種窗口將會丟失數(shù)據(jù)。比如每5秒鐘,統(tǒng)計(jì)過去3秒的通過路口汽車的數(shù)據(jù),將會漏掉2秒鐘的數(shù)據(jù)。
通過組合可以得出四種基本窗口:
- time-tumbling-window 無重疊數(shù)據(jù)的時(shí)間窗口,設(shè)置方式舉例:timeWindow(Time.seconds(5))
- time-sliding-window 有重疊數(shù)據(jù)的時(shí)間窗口,設(shè)置方式舉例:timeWindow(Time.seconds(5), Time.seconds(3))
- count-tumbling-window無重疊數(shù)據(jù)的數(shù)量窗口,設(shè)置方式舉例:countWindow(5)
- count-sliding-window 有重疊數(shù)據(jù)的數(shù)量窗口,設(shè)置方式舉例:countWindow(5,3)
** 十七、說說Flink中的狀態(tài)存儲? **
Flink在做計(jì)算的過程中經(jīng)常需要存儲中間狀態(tài),來避免數(shù)據(jù)丟失和狀態(tài)恢復(fù)。選擇的狀態(tài)存儲策略不同,會影響狀態(tài)持久化如何和 checkpoint 交互。
Flink提供了三種狀態(tài)存儲方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
** 十八、Flink 中的時(shí)間有哪幾類 **
Flink 中的時(shí)間和其他流式計(jì)算系統(tǒng)的時(shí)間一樣分為三類:事件時(shí)間,攝入時(shí)間,處理時(shí)間三種。
如果以 EventTime 為基準(zhǔn)來定義時(shí)間窗口將形成EventTimeWindow,要求消息本身就應(yīng)該攜帶EventTime。
如果以 IngesingtTime 為基準(zhǔn)來定義時(shí)間窗口將形成 IngestingTimeWindow,以 source 的systemTime為準(zhǔn)。
如果以 ProcessingTime 基準(zhǔn)來定義時(shí)間窗口將形成 ProcessingTimeWindow,以 operator 的systemTime 為準(zhǔn)。
** 十九、Flink 中水印是什么概念,起到什么作用? **
Watermark 是 Apache Flink 為了處理 EventTime 窗口計(jì)算提出的一種機(jī)制, 本質(zhì)上是一種時(shí)間戳。
一般來講Watermark經(jīng)常和Window一起被用來處理亂序事件。
** 二十、Flink Table & SQL 熟悉嗎?TableEnvironment這個(gè)類有什么作用 **
TableEnvironment是Table API和SQL集成的核心概念。
這個(gè)類主要用來:
- 在內(nèi)部catalog中注冊表
- 注冊外部catalog
- 執(zhí)行SQL查詢
- 注冊用戶定義(標(biāo)量,表或聚合)函數(shù)
- 將DataStream或DataSet轉(zhuǎn)換為表
- 持有對ExecutionEnvironment或StreamExecutionEnvironment的引用
** 二十、Flink SQL的實(shí)現(xiàn)原理是什么? 是如何實(shí)現(xiàn) SQL 解析的呢? **
首先大家要知道 Flink 的SQL解析是基于Apache Calcite這個(gè)開源框架。

基于此,一次完整的SQL解析過程如下:
- 用戶使用對外提供Stream SQL的語法開發(fā)業(yè)務(wù)應(yīng)用
- 用calcite對StreamSQL進(jìn)行語法檢驗(yàn),語法檢驗(yàn)通過后,轉(zhuǎn)換成calcite的邏輯樹節(jié)點(diǎn);最終形成calcite的邏輯計(jì)劃
- 采用Flink自定義的優(yōu)化規(guī)則和calcite火山模型、啟發(fā)式模型共同對邏輯樹進(jìn)行優(yōu)化,生成最優(yōu)的Flink物理計(jì)劃
- 對物理計(jì)劃采用janino codegen生成代碼,生成用低階API DataStream 描述的流應(yīng)用,提交到Flink平臺執(zhí)行
第二部分:Flink 面試進(jìn)階篇
** 一、Flink是如何支持批流一體的? **

本道面試題考察的其實(shí)就是一句話:Flink的開發(fā)者認(rèn)為批處理是流處理的一種特殊情況。批處理是有限的流處理。Flink 使用一個(gè)引擎支持了DataSet API 和 DataStream API。
** 二、Flink是如何做到高效的數(shù)據(jù)交換的? **
在一個(gè)Flink Job中,數(shù)據(jù)需要在不同的task中進(jìn)行交換,整個(gè)數(shù)據(jù)交換是有 TaskManager 負(fù)責(zé)的,TaskManager 的網(wǎng)絡(luò)組件首先從緩沖buffer中收集records,然后再發(fā)送。Records 并不是一個(gè)一個(gè)被發(fā)送的,二是積累一個(gè)批次再發(fā)送,batch 技術(shù)可以更加高效的利用網(wǎng)絡(luò)資源。
** 三、Flink是如何做容錯(cuò)的? **
Flink 實(shí)現(xiàn)容錯(cuò)主要靠強(qiáng)大的CheckPoint機(jī)制和State機(jī)制。Checkpoint 負(fù)責(zé)定時(shí)制作分布式快照、對程序中的狀態(tài)進(jìn)行備份;State 用來存儲計(jì)算過程中的中間狀態(tài)。
** 四、Flink 分布式快照的原理是什么? **
Flink的分布式快照是根據(jù)Chandy-Lamport算法量身定做的。簡單來說就是持續(xù)創(chuàng)建分布式數(shù)據(jù)流及其狀態(tài)的一致快照。

核心思想是在 input source 端插入 barrier,控制 barrier 的同步來實(shí)現(xiàn) snapshot 的備份和 exactly-once 語義。
** 五、Flink 是如何保證Exactly-once語義的? **
Flink通過實(shí)現(xiàn)兩階段提交和狀態(tài)保存來實(shí)現(xiàn)端到端的一致性語義。
分為以下幾個(gè)步驟:
- 開始事務(wù)(beginTransaction)創(chuàng)建一個(gè)臨時(shí)文件夾,來寫把數(shù)據(jù)寫入到這個(gè)文件夾里面
- 預(yù)提交(preCommit)將內(nèi)存中緩存的數(shù)據(jù)寫入文件并關(guān)閉
- 正式提交(commit)將之前寫完的臨時(shí)文件放入目標(biāo)目錄下。這代表著最終的數(shù)據(jù)會有一些延遲
- 丟棄(abort)丟棄臨時(shí)文件
若失敗發(fā)生在預(yù)提交成功后,正式提交前??梢愿鶕?jù)狀態(tài)來提交預(yù)提交的數(shù)據(jù),也可刪除預(yù)提交的數(shù)據(jù)。
** 六、Flink 的 kafka 連接器有什么特別的地方? **
Flink源碼中有一個(gè)獨(dú)立的connector模塊,所有的其他connector都依賴于此模塊,F(xiàn)link 在1.9版本發(fā)布的全新kafka連接器,摒棄了之前連接不同版本的kafka集群需要依賴不同版本的connector這種做法,只需要依賴一個(gè)connector即可。
** 七、說說 Flink的內(nèi)存管理是如何做的? **
Flink 并不是將大量對象存在堆上,而是將對象都序列化到一個(gè)預(yù)分配的內(nèi)存塊上。此外,F(xiàn)link大量的使用了堆外內(nèi)存。如果需要處理的數(shù)據(jù)超出了內(nèi)存限制,則會將部分?jǐn)?shù)據(jù)存儲到硬盤上。
Flink 為了直接操作二進(jìn)制數(shù)據(jù)實(shí)現(xiàn)了自己的序列化框架。
理論上Flink的內(nèi)存管理分為三部分:
- Network Buffers:這個(gè)是在TaskManager啟動的時(shí)候分配的,這是一組用于緩存網(wǎng)絡(luò)數(shù)據(jù)的內(nèi)存,每個(gè)塊是32K,默認(rèn)分配2048個(gè),可以通過“taskmanager.network.numberOfBuffers”修改
- Memory Manage pool:大量的Memory Segment塊,用于運(yùn)行時(shí)的算法(Sort/Join/Shuffle等),這部分啟動的時(shí)候就會分配。下面這段代碼,根據(jù)配置文件中的各種參數(shù)來計(jì)算內(nèi)存的分配方法。(heap or off-heap,這個(gè)放到下節(jié)談),內(nèi)存的分配支持預(yù)分配和lazy load,默認(rèn)懶加載的方式。
- User Code,這部分是除了Memory Manager之外的內(nèi)存用于User code和TaskManager本身的數(shù)據(jù)結(jié)構(gòu)。
** 八、說說 Flink的序列化如何做的? **
Java本身自帶的序列化和反序列化的功能,但是輔助信息占用空間比較大,在序列化對象時(shí)記錄了過多的類信息。
Apache Flink摒棄了Java原生的序列化方法,以獨(dú)特的方式處理數(shù)據(jù)類型和序列化,包含自己的類型描述符,泛型類型提取和類型序列化框架。
TypeInformation 是所有類型描述符的基類。它揭示了該類型的一些基本屬性,并且可以生成序列化器。TypeInformation 支持以下幾種類型:
- BasicTypeInfo: 任意Java 基本類型或 String 類型
- BasicArrayTypeInfo: 任意Java基本類型數(shù)組或 String 數(shù)組
- WritableTypeInfo: 任意 Hadoop Writable 接口的實(shí)現(xiàn)類
- TupleTypeInfo: 任意的 Flink Tuple 類型(支持Tuple1 to Tuple25)。Flink tuples 是固定長度固定類型的Java Tuple實(shí)現(xiàn)
- CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
- PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java對象的所有成員變量,要么是 public 修飾符定義,要么有 getter/setter 方法
- GenericTypeInfo: 任意無法匹配之前幾種類型的類
針對前六種類型數(shù)據(jù)集,F(xiàn)link皆可以自動生成對應(yīng)的TypeSerializer,能非常高效地對數(shù)據(jù)集進(jìn)行序列化和反序列化。
** 九、 Flink中的Window出現(xiàn)了數(shù)據(jù)傾斜,你有什么解決辦法? **
window產(chǎn)生數(shù)據(jù)傾斜指的是數(shù)據(jù)在不同的窗口內(nèi)堆積的數(shù)據(jù)量相差過多。本質(zhì)上產(chǎn)生這種情況的原因是數(shù)據(jù)源頭發(fā)送的數(shù)據(jù)量速度不同導(dǎo)致的。出現(xiàn)這種情況一般通過兩種方式來解決:
- 在數(shù)據(jù)進(jìn)入窗口前做預(yù)聚合
- 重新設(shè)計(jì)窗口聚合的key
** 十、 Flink中在使用聚合函數(shù) GroupBy、Distinct、KeyBy 等函數(shù)時(shí)出現(xiàn)數(shù)據(jù)熱點(diǎn)該如何解決? **
數(shù)據(jù)傾斜和數(shù)據(jù)熱點(diǎn)是所有大數(shù)據(jù)框架繞不過去的問題。處理這類問題主要從3個(gè)方面入手:
- 在業(yè)務(wù)上規(guī)避這類問題
例如一個(gè)假設(shè)訂單場景,北京和上海兩個(gè)城市訂單量增長幾十倍,其余城市的數(shù)據(jù)量不變。這時(shí)候我們在進(jìn)行聚合的時(shí)候,北京和上海就會出現(xiàn)數(shù)據(jù)堆積,我們可以單獨(dú)數(shù)據(jù)北京和上海的數(shù)據(jù)。
- Key的設(shè)計(jì)上
把熱key進(jìn)行拆分,比如上個(gè)例子中的北京和上海,可以把北京和上海按照地區(qū)進(jìn)行拆分聚合。
- 參數(shù)設(shè)置
Flink 1.9.0 SQL(Blink Planner) 性能優(yōu)化中一項(xiàng)重要的改進(jìn)就是升級了微批模型,即 MiniBatch。原理是緩存一定的數(shù)據(jù)后再觸發(fā)處理,以減少對State的訪問,從而提升吞吐和減少數(shù)據(jù)的輸出量。
** 十一、Flink任務(wù)延遲高,想解決這個(gè)問題,你會如何入手? **
在Flink的后臺任務(wù)管理中,我們可以看到Flink的哪個(gè)算子和task出現(xiàn)了反壓。最主要的手段是資源調(diào)優(yōu)和算子調(diào)優(yōu)。資源調(diào)優(yōu)即是對作業(yè)中的Operator的并發(fā)數(shù)(parallelism)、CPU(core)、堆內(nèi)存(heap_memory)等參數(shù)進(jìn)行調(diào)優(yōu)。作業(yè)參數(shù)調(diào)優(yōu)包括:并行度的設(shè)置,State的設(shè)置,checkpoint的設(shè)置。
十二、Flink是如何處理反壓的?
Flink 內(nèi)部是基于 producer-consumer 模型來進(jìn)行消息傳遞的,F(xiàn)link的反壓設(shè)計(jì)也是基于這個(gè)模型。Flink 使用了高效有界的分布式阻塞隊(duì)列,就像 Java 通用的阻塞隊(duì)列(BlockingQueue)一樣。下游消費(fèi)者消費(fèi)變慢,上游就會受到阻塞。
十三、Flink的反壓和Strom有哪些不同?
Storm 是通過監(jiān)控 Bolt 中的接收隊(duì)列負(fù)載情況,如果超過高水位值就會將反壓信息寫到 Zookeeper ,Zookeeper 上的 watch 會通知該拓?fù)涞乃?Worker 都進(jìn)入反壓狀態(tài),最后 Spout 停止發(fā)送 tuple。
Flink中的反壓使用了高效有界的分布式阻塞隊(duì)列,下游消費(fèi)變慢會導(dǎo)致發(fā)送端阻塞。
二者最大的區(qū)別是Flink是逐級反壓,而Storm是直接從源頭降速。
十四、 Operator Chains(算子鏈)這個(gè)概念你了解嗎?
為了更高效地分布式執(zhí)行,F(xiàn)link會盡可能地將operator的subtask鏈接(chain)在一起形成task。每個(gè)task在一個(gè)線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時(shí)提高整體的吞吐量。這就是我們所說的算子鏈。
十五、 Flink什么情況下才會把Operator chain在一起形成算子鏈?
兩個(gè)operator chain在一起的的條件:
- 上下游的并行度一致
- 下游節(jié)點(diǎn)的入度為1 (也就是說下游節(jié)點(diǎn)沒有來自其他節(jié)點(diǎn)的輸入)
- 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會解釋 slot group)
- 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認(rèn)是ALWAYS)
- 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認(rèn)是HEAD)
- 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
- 用戶沒有禁用 chain
十六、 說說Flink1.9的新特性?
- 支持hive讀寫,支持UDF
- Flink SQL TopN和GroupBy等優(yōu)化
- Checkpoint跟savepoint針對實(shí)際業(yè)務(wù)場景做了優(yōu)化
- Flink state查詢
十七、消費(fèi)kafka數(shù)據(jù)的時(shí)候,如何處理臟數(shù)據(jù)?
可以在處理前加一個(gè)fliter算子,將不符合規(guī)則的數(shù)據(jù)過濾出去。
第三部分:Flink 面試源碼篇
** 一、Flink Job的提交流程 **
用戶提交的Flink Job會被轉(zhuǎn)化成一個(gè)DAG任務(wù)運(yùn)行,分別是:StreamGraph、JobGraph、ExecutionGraph,F(xiàn)link中JobManager與TaskManager,JobManager與Client的交互是基于Akka工具包的,是通過消息驅(qū)動。整個(gè)Flink Job的提交還包含著ActorSystem的創(chuàng)建,JobManager的啟動,TaskManager的啟動和注冊。
二、Flink所謂"三層圖"結(jié)構(gòu)是哪幾個(gè)"圖"?
一個(gè)Flink任務(wù)的DAG生成計(jì)算圖大致經(jīng)歷以下三個(gè)過程:
- StreamGraph
最接近代碼所表達(dá)的邏輯層面的計(jì)算拓?fù)浣Y(jié)構(gòu),按照用戶代碼的執(zhí)行順序向StreamExecutionEnvironment添加StreamTransformation構(gòu)成流式圖。 - JobGraph
從StreamGraph生成,將可以串聯(lián)合并的節(jié)點(diǎn)進(jìn)行合并,設(shè)置節(jié)點(diǎn)之間的邊,安排資源共享slot槽位和放置相關(guān)聯(lián)的節(jié)點(diǎn),上傳任務(wù)所需的文件,設(shè)置檢查點(diǎn)配置等。相當(dāng)于經(jīng)過部分初始化和優(yōu)化處理的任務(wù)圖。 - ExecutionGraph
由JobGraph轉(zhuǎn)換而來,包含了任務(wù)具體執(zhí)行所需的內(nèi)容,是最貼近底層實(shí)現(xiàn)的執(zhí)行圖。
** 三、JobManger在集群中扮演了什么角色? **
JobManager 負(fù)責(zé)整個(gè) Flink 集群任務(wù)的調(diào)度以及資源的管理,從客戶端中獲取提交的應(yīng)用,然后根據(jù)集群中 TaskManager 上 TaskSlot 的使用情況,為提交的應(yīng)用分配相應(yīng)的 TaskSlot 資源并命令 TaskManager 啟動從客戶端中獲取的應(yīng)用。
JobManager 相當(dāng)于整個(gè)集群的 Master 節(jié)點(diǎn),且整個(gè)集群有且只有一個(gè)活躍的 JobManager ,負(fù)責(zé)整個(gè)集群的任務(wù)管理和資源管理。
JobManager 和 TaskManager 之間通過 Actor System 進(jìn)行通信,獲取任務(wù)執(zhí)行的情況并通過 Actor System 將應(yīng)用的任務(wù)執(zhí)行情況發(fā)送給客戶端。
同時(shí)在任務(wù)執(zhí)行的過程中,F(xiàn)link JobManager 會觸發(fā) Checkpoint 操作,每個(gè) TaskManager 節(jié)點(diǎn) 收到 Checkpoint 觸發(fā)指令后,完成 Checkpoint 操作,所有的 Checkpoint 協(xié)調(diào)過程都是在 Fink JobManager 中完成。
當(dāng)任務(wù)完成后,F(xiàn)link 會將任務(wù)執(zhí)行的信息反饋給客戶端,并且釋放掉 TaskManager 中的資源以供下一次提交任務(wù)使用。
** 四、JobManger在集群啟動過程中起到什么作用? **
JobManager的職責(zé)主要是接收Flink作業(yè),調(diào)度Task,收集作業(yè)狀態(tài)和管理TaskManager。它包含一個(gè)Actor,并且做如下操作:
- RegisterTaskManager: 它由想要注冊到JobManager的TaskManager發(fā)送。注冊成功會通過AcknowledgeRegistration消息進(jìn)行Ack。
- SubmitJob: 由提交作業(yè)到系統(tǒng)的Client發(fā)送。提交的信息是JobGraph形式的作業(yè)描述信息。
- CancelJob: 請求取消指定id的作業(yè)。成功會返回CancellationSuccess,否則返回CancellationFailure。
- UpdateTaskExecutionState: 由TaskManager發(fā)送,用來更新執(zhí)行節(jié)點(diǎn)(ExecutionVertex)的狀態(tài)。成功則返回true,否則返回false。
- RequestNextInputSplit: TaskManager上的Task請求下一個(gè)輸入split,成功則返回NextInputSplit,否則返回null。
- JobStatusChanged: 它意味著作業(yè)的狀態(tài)(RUNNING, CANCELING, FINISHED,等)發(fā)生變化。這個(gè)消息由ExecutionGraph發(fā)送。
** 五、TaskManager在集群中扮演了什么角色?**
TaskManager 相當(dāng)于整個(gè)集群的 Slave 節(jié)點(diǎn),負(fù)責(zé)具體的任務(wù)執(zhí)行和對應(yīng)任務(wù)在每個(gè)節(jié)點(diǎn)上的資源申請和管理。
客戶端通過將編寫好的 Flink 應(yīng)用編譯打包,提交到 JobManager,然后 JobManager 會根據(jù)已注冊在 JobManager 中 TaskManager 的資源情況,將任務(wù)分配給有資源的 TaskManager節(jié)點(diǎn),然后啟動并運(yùn)行任務(wù)。
TaskManager 從 JobManager 接收需要部署的任務(wù),然后使用 Slot 資源啟動 Task,建立數(shù)據(jù)接入的網(wǎng)絡(luò)連接,接收數(shù)據(jù)并開始數(shù)據(jù)處理。同時(shí) TaskManager 之間的數(shù)據(jù)交互都是通過數(shù)據(jù)流的方式進(jìn)行的。
可以看出,F(xiàn)link 的任務(wù)運(yùn)行其實(shí)是采用多線程的方式,這和 MapReduce 多 JVM 進(jìn)行的方式有很大的區(qū)別,F(xiàn)link 能夠極大提高 CPU 使用效率,在多個(gè)任務(wù)和 Task 之間通過 TaskSlot 方式共享系統(tǒng)資源,每個(gè) TaskManager 中通過管理多個(gè) TaskSlot 資源池進(jìn)行對資源進(jìn)行有效管理。
** 六、TaskManager在集群啟動過程中起到什么作用? **
TaskManager的啟動流程較為簡單:
啟動類:org.apache.flink.runtime.taskmanager.TaskManager
核心啟動方法 : selectNetworkInterfaceAndRunTaskManager
啟動后直接向JobManager注冊自己,注冊完成后,進(jìn)行部分模塊的初始化。
七、Flink 計(jì)算資源的調(diào)度是如何實(shí)現(xiàn)的?
TaskManager中最細(xì)粒度的資源是Task slot,代表了一個(gè)固定大小的資源子集,每個(gè)TaskManager會將其所占有的資源平分給它的slot。
通過調(diào)整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的。每個(gè) TaskManager 有一個(gè)slot,也就意味著每個(gè)task運(yùn)行在獨(dú)立的 JVM 中。每個(gè) TaskManager 有多個(gè)slot的話,也就是說多個(gè)task運(yùn)行在同一個(gè)JVM中。
而在同一個(gè)JVM進(jìn)程中的task,可以共享TCP連接(基于多路復(fù)用)和心跳消息,可以減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸,也能共享一些數(shù)據(jù)結(jié)構(gòu),一定程度上減少了每個(gè)task的消耗。
每個(gè)slot可以接受單個(gè)task,也可以接受多個(gè)連續(xù)task組成的pipeline,如下圖所示,F(xiàn)latMap函數(shù)占用一個(gè)taskslot,而key Agg函數(shù)和sink函數(shù)共用一個(gè)taskslot:

八、簡述Flink的數(shù)據(jù)抽象及數(shù)據(jù)交換過程?
Flink 為了避免JVM的固有缺陷例如java對象存儲密度低,F(xiàn)GC影響吞吐和響應(yīng)等,實(shí)現(xiàn)了自主管理內(nèi)存。MemorySegment就是Flink的內(nèi)存抽象。默認(rèn)情況下,一個(gè)MemorySegment可以被看做是一個(gè)32kb大的內(nèi)存塊的抽象。這塊內(nèi)存既可以是JVM里的一個(gè)byte[],也可以是堆外內(nèi)存(DirectByteBuffer)。
在MemorySegment這個(gè)抽象之上,F(xiàn)link在數(shù)據(jù)從operator內(nèi)的數(shù)據(jù)對象在向TaskManager上轉(zhuǎn)移,預(yù)備被發(fā)給下個(gè)節(jié)點(diǎn)的過程中,使用的抽象或者說內(nèi)存對象是Buffer。
對接從Java對象轉(zhuǎn)為Buffer的中間對象是另一個(gè)抽象StreamRecord。
九、Flink 中的分布式快照機(jī)制是如何實(shí)現(xiàn)的?
Flink的容錯(cuò)機(jī)制的核心部分是制作分布式數(shù)據(jù)流和操作算子狀態(tài)的一致性快照。 這些快照充當(dāng)一致性checkpoint,系統(tǒng)可以在發(fā)生故障時(shí)回滾。 Flink用于制作這些快照的機(jī)制在“分布式數(shù)據(jù)流的輕量級異步快照”中進(jìn)行了描述。 它受到分布式快照的標(biāo)準(zhǔn)Chandy-Lamport算法的啟發(fā),專門針對Flink的執(zhí)行模型而定制。

barriers在數(shù)據(jù)流源處被注入并行數(shù)據(jù)流中??煺課的barriers被插入的位置(我們稱之為Sn)是快照所包含的數(shù)據(jù)在數(shù)據(jù)源中最大位置。例如,在Apache Kafka中,此位置將是分區(qū)中最后一條記錄的偏移量。 將該位置Sn報(bào)告給checkpoint協(xié)調(diào)器(Flink的JobManager)。
然后barriers向下游流動。當(dāng)一個(gè)中間操作算子從其所有輸入流中收到快照n的barriers時(shí),它會為快照n發(fā)出barriers進(jìn)入其所有輸出流中。 一旦sink操作算子(流式DAG的末端)從其所有輸入流接收到barriers n,它就向checkpoint協(xié)調(diào)器確認(rèn)快照n完成。在所有sink確認(rèn)快照后,意味快照著已完成。
一旦完成快照n,job將永遠(yuǎn)不再向數(shù)據(jù)源請求Sn之前的記錄,因?yàn)榇藭r(shí)這些記錄(及其后續(xù)記錄)將已經(jīng)通過整個(gè)數(shù)據(jù)流拓?fù)?,也即是已?jīng)被處理結(jié)束。
十、簡單說說FlinkSQL的是如何實(shí)現(xiàn)的?
Flink 將 SQL 校驗(yàn)、SQL 解析以及 SQL 優(yōu)化交給了Apache Calcite。Calcite 在其他很多開源項(xiàng)目里也都應(yīng)用到了,譬如 Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架構(gòu)中處于核心的地位,如下圖所示。

構(gòu)建抽象語法樹的事情交給了 Calcite 去做。SQL query 會經(jīng)過 Calcite 解析器轉(zhuǎn)變成 SQL 節(jié)點(diǎn)樹,通過驗(yàn)證后構(gòu)建成 Calcite 的抽象語法樹(也就是圖中的 Logical Plan)。另一邊,Table API 上的調(diào)用會構(gòu)建成 Table API 的抽象語法樹,并通過 Calcite 提供的 RelBuilder 轉(zhuǎn)變成 Calcite 的抽象語法樹。然后依次被轉(zhuǎn)換成邏輯執(zhí)行計(jì)劃和物理執(zhí)行計(jì)劃。
在提交任務(wù)后會分發(fā)到各個(gè) TaskManager 中運(yùn)行,在運(yùn)行時(shí)會使用 Janino 編譯器編譯代碼后運(yùn)行。
歡迎關(guān)注作者其他 Chat:
大數(shù)據(jù)開發(fā)面試指南 Flink 最鋒利的武器:Flink SQL 入門和實(shí)戰(zhàn)
實(shí)時(shí)數(shù)倉 | 你需要的是一款強(qiáng)大的 OLAP 引擎
關(guān)注我的公眾號,后臺回復(fù)【JAVAPDF】獲取200頁面試題!
5萬人關(guān)注的大數(shù)據(jù)成神之路,不來了解一下嗎?
5萬人關(guān)注的大數(shù)據(jù)成神之路,真的不來了解一下嗎?
5萬人關(guān)注的大數(shù)據(jù)成神之路,確定真的不來了解一下嗎?