雜談
有一段時(shí)間沒(méi)有寫(xiě)技術(shù)博客了,正好之前花了一周的時(shí)間解決了一個(gè)Flink在Yarn上部署的問(wèn)題,也將Flink在Yarn上運(yùn)行的機(jī)制與源碼了解了個(gè)大概。Flink在Yarn上部署涉及到的東西比較多,肯定不會(huì)一次寫(xiě)完,我應(yīng)該會(huì)分幾篇文章按幾個(gè)模塊來(lái)詳細(xì)闡述,這樣看起來(lái)也會(huì)比較輕松一些,而版本是基于Flink剛發(fā)布的release-1.5.0。
概述
Flink在Yarn上有兩種模式,一種是cluster模式,即像Yarn申請(qǐng)一定量的資源,有點(diǎn)類似于Standalone模式,當(dāng)然我覺(jué)得缺點(diǎn)應(yīng)該很明顯(這里我也不是很肯定,因?yàn)槲沂褂玫牟皇沁@種模式),就是資源的浪費(fèi),以及擴(kuò)容的時(shí)候需要重啟,影響業(yè)務(wù)。另一種是Single Job模式,即將一個(gè)單獨(dú)的Job提交到Y(jié)arn集群,由Yarn來(lái)根據(jù)配置分配Container。在使用這個(gè)模式的時(shí)候踩了一些坑,以后有機(jī)會(huì)細(xì)講。
在這篇文章中我假定大家對(duì)Yarn已經(jīng)有了一定的認(rèn)識(shí),不會(huì)在Yarn的概念上進(jìn)行展開(kāi)描述。
ApplicationMaster
-
首先是JobManager和ApplicationMaster。在Flink on Yarn中,JobManager和ApplicationMaster是在同一個(gè)jvm進(jìn)程中的,這個(gè)進(jìn)程的入口就是YarnSessionClusterEntrypoint類。首先來(lái)看一下這個(gè)類,它繼承了SessionClusterEntrypoint類。
YarnSessionClusterEntrypointUML
SessionClusterEntrypoint是一個(gè)抽象類,它又繼承了ClusterEntrypoint這個(gè)抽象類。
那么這三個(gè)類到底是干什么的呢?首先是ClusterEntrypoint,它封裝了Cluster啟停的邏輯,還有根據(jù)配置文件來(lái)創(chuàng)建RpcService,HaService, HeartbeatService, MetricRegistry等等服務(wù)的邏輯,同時(shí)它也提供了幾個(gè)抽象方法給不同的模式下的特定的ClusterEntrypoint來(lái)實(shí)現(xiàn)。這四個(gè)方法分別是createDispatcher,createResourceManager, createRestEndpoint,createSerializableExecutionGraphStore。關(guān)于Dispatcher,ResourceManager和SerializableExecutionGraphStore,后面會(huì)有單獨(dú)的文章來(lái)詳細(xì)講述,現(xiàn)在可以顧名思義,根據(jù)它的名字大致就能知道它們提供了怎樣的功能。
而SessionClusterEntrypoint繼承了這個(gè)類,并且實(shí)現(xiàn)了部分方法createSerializableExecutionGraphStore,createRestEndpoint,createDispatcher。為什么沒(méi)有實(shí)現(xiàn)createResourceManager方法呢?從SessionClusterEntrypoint的子類就可以看出來(lái),分別是YarnSessionClusterEntrypoint,MesosSessionClusterEntrypoint,StandaloneSessionClusterEntrypoint,這三者都是跟如何去調(diào)度資源相關(guān)的,所以createResourceManager由這些子類來(lái)實(shí)現(xiàn)。
-
這里我只會(huì)分析YarnSessionClusterEntrypoint。在這個(gè)類中,除了提供啟動(dòng)的main函數(shù)以外,最重要的是實(shí)現(xiàn)createResourceManager方法,在這個(gè)方法中,直接實(shí)例化了一個(gè)YarnResourceManager的對(duì)象并返回。
YarnSessionClusterEntrypoint啟動(dòng)流程
-
下面來(lái)看一下YarnSessionClusterEntrypoint的啟動(dòng)流程。首先入口是main函數(shù),在main函數(shù)中新建了YarnSessionClusterEntrypoint的一個(gè)對(duì)象。
main method of YarnSessionClusterEntrypoint -
構(gòu)造這個(gè)對(duì)象的構(gòu)造參數(shù)是配置信息和目錄。緊接著就調(diào)用了startCluster方法。之前也提到,Cluster的啟停邏輯是在父類ClusterEntrypoint中。至此,YarnSessionClusterEntrypoint的最重要的任務(wù)已經(jīng)完成。
startCluster method of ClusterEntrypoint
在startCluster方法中,異步的調(diào)用了runCluster方法。
-

- 在runCluster函數(shù)中,主要做了三件事:
第一件事是根據(jù)配置文件初始化RpcService,HaService, HeartbeatService, MetricRegistry,BlobServer,ResourceManager,SerializableExecutionGraphStore等模塊,并將它們賦值給ClusterEntrypoint中相應(yīng)的instance variables。其中createSerializableExecutionGraphStore,createRestEndpoint,createDispatcher的邏輯由SessionClusterEntrypoint實(shí)現(xiàn),createResourceManager的邏輯由YarnSessionClusterEntrypoint實(shí)現(xiàn)。
第二件事就是在startClusterComponents函數(shù)中啟動(dòng)這些服務(wù).其中RpcService負(fù)責(zé)各個(gè)模塊之間的rpc調(diào)用,本質(zhì)上是基于Akka的(關(guān)于Akka我在之前的文章中概述過(guò))。ResourceManager主要是負(fù)責(zé)與Yarn的ResourceManager進(jìn)行交互,通過(guò)Yarn提供的AMRMAsyncClient,進(jìn)行一些資源分配與釋放的操作。HaService的任務(wù)主要有ResourceManager,JobManager,Dispatcher,WebMonitor的Leader選舉,checkpoint的元數(shù)據(jù)的持久化以及checkpoint的注冊(cè)與跟蹤,Blob數(shù)據(jù)的持久化,任務(wù)的狀態(tài)監(jiān)控,總而言之就是服務(wù)高可用相關(guān)的功能。HeartbeatService是負(fù)責(zé)心跳的發(fā)送與接收,被多個(gè)模塊用來(lái)監(jiān)控其他節(jié)點(diǎn)是否丟失。MetricRegistry負(fù)責(zé)指標(biāo)的監(jiān)控。BlobServer負(fù)責(zé)處理對(duì)Blob數(shù)據(jù)的請(qǐng)求與返回。而SerializableExecutionGraphStore則負(fù)責(zé)儲(chǔ)存序列化之后的執(zhí)行計(jì)劃。
最后一件事就是就是啟動(dòng)Dispatcher, Dispatcher主要負(fù)責(zé)就收Client提交的任務(wù),啟動(dòng)并將任務(wù)傳遞給JobManager,以及Master節(jié)點(diǎn)掛掉的失敗恢復(fù)。
*對(duì)于上述提到的各個(gè)模塊,以后會(huì)有文章單獨(dú)的去剖析。
總結(jié)
綜上,YarnSessionClusterEntrypoint可以理解為Flink在Yarn上的ApplicationMaster,同時(shí)也是JobManager。它們之間分屬兩個(gè)線程,之間的交互通過(guò)Akka的消息驅(qū)動(dòng)的模式來(lái)實(shí)現(xiàn)任務(wù)調(diào)度與資源分配的分離,而對(duì)應(yīng)的JobManager與ResourceManager也有相應(yīng)的子模塊組成。


