Flink運行架構(gòu).
任務(wù)提交流程.

Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置,之后向Yarn ResourceManager提交任務(wù),ResourceManager分配Container資源并通知對應(yīng)的NodeManager啟動ApplicationMaster.
ApplicationMaster啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境,然后啟動JobManager,之后ApplicationMaster向ResourceManager申請資源啟動TaskManager,ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節(jié)點的NodeManager啟動TaskManager,NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager,TaskManager啟動后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)。
TaskManager與Slots
每一個TaskManager是一個JVM進程,它可能會在獨立的線程上執(zhí)行一個或多個subtask。為了控制一個worker能接收多少個task,worker通過task slot來進行控制(一個worker至少有一個task slot)。
每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那么它會將其管理的內(nèi)存分成三份給各個slot。資源slot化意味著一個subtask將不需要跟來自其他job的subtask競爭被管理的內(nèi)存,取而代之的是它將擁有一定數(shù)量的內(nèi)存儲備。需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內(nèi)存。
通過調(diào)整task slot的數(shù)量,允許用戶定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那將意味著每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味著更多的subtask可以共享同一個JVM。
而在同一個JVM進程中的task將共享TCP連接(基于多路復(fù)用)和心跳消息。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),因此這減少了每個task的負載。

Task Slot是靜態(tài)的概念,是指TaskManager具有的并發(fā)執(zhí)行能力,可以通過參數(shù)taskmanager.numberOfTaskSlots進行配置,而并行度parallelism是動態(tài)概念,即TaskManager運行程序時實際使用的并發(fā)能力,可以通過參數(shù)parallelism.default進行配置。
也就是說,假設(shè)一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task,一共9個TaskSlot,如果我們設(shè)置parallelism.default=1,即運行程序默認的并行度為1,9個TaskSlot只用了1個,有8個空閑,因此,設(shè)置合適的并行度才能提高效率。
Dataflow
Flink程序由Source、Transformation、Sink這三個核心組件組成,Source主要負責(zé)數(shù)據(jù)的讀取,Transformation主要負責(zé)對屬于的轉(zhuǎn)換操作,Sink負責(zé)最終數(shù)據(jù)的輸出,在各個組件之間流轉(zhuǎn)的數(shù)據(jù)稱為流(streams)。

Flink程序的基礎(chǔ)構(gòu)建模塊是 流(streams) 與 轉(zhuǎn)換(transformations)(需要注意的是,F(xiàn)link的DataSet API所使用的DataSets其內(nèi)部也是stream)。
一個stream可以看成一個中間結(jié)果,而一個transformations是以一個或多個stream作為輸入的某種operation,該operation利用這些stream進行計算從而產(chǎn)生一個或多個result stream。
在運行時,F(xiàn)link上運行的程序會被映射成streaming dataflows,它包含了streams和transformations operators。每一個dataflow以一個或多個sources開始以一個或多個sinks結(jié)束,dataflow類似于任意的有向無環(huán)圖(DAG)。

并行數(shù)據(jù)流
Flink程序的執(zhí)行具有并行、分布式的特性。在執(zhí)行過程中,一個 stream 包含一個或多個 stream partition ,而每一個 operator 包含一個或多個 operator subtask,這些operator subtasks在不同的線程、不同的物理機或不同的容器中彼此互不依賴得執(zhí)行。
一個特定operator的subtask的個數(shù)被稱之為其parallelism(并行度)。一個stream的并行度總是等同于其producing operator的并行度。一個程序中,不同的operator可能具有不同的并行度。

Stream在operator之間傳輸數(shù)據(jù)的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決于operator的種類。
One-to-one:stream(比如在source和map operator之間)維護著分區(qū)以及元素的順序。那意味著map operator的subtask看到的元素的個數(shù)以及順序跟source operator的subtask生產(chǎn)的元素的個數(shù)、順序相同,map、fliter、flatMap等算子都是one-to-one的對應(yīng)關(guān)系。
Redistributing:這種操作會改變數(shù)據(jù)的分區(qū)個數(shù)。每一個operator subtask依據(jù)所選擇的transformation發(fā)送數(shù)據(jù)到不同的目標subtask。例如,keyBy() 基于hashCode重分區(qū)、broadcast和rebalance會隨機重新分區(qū),這些算子都會引起redistribute過程,而redistribute過程就類似于Spark中的shuffle過程。
task與operator chains
出于分布式執(zhí)行的目的,F(xiàn)link將operator的subtask鏈接在一起形成task,每個task在一個線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換和基于緩存區(qū)的數(shù)據(jù)交換,在減少時延的同時提升吞吐量。鏈接的行為可以在編程API中進行指定。
下面這幅圖,展示了5個subtask以5個并行的線程來執(zhí)行:

任務(wù)調(diào)度流程
{% asset_img 任務(wù)調(diào)度原理.png task與任務(wù)調(diào)度原理 %}
客戶端不是運行時和程序執(zhí)行的一部分,但它用于準備并發(fā)送dataflow給Master,然后,客戶端斷開連接或者維持連接以等待接收計算結(jié)果,客戶端可以以兩種方式運行:要么作為Java/Scala程序的一部分被程序觸發(fā)執(zhí)行,要么以命令行./bin/flink run的方式執(zhí)行。