本文轉(zhuǎn)載Flink官方社區(qū)文章:一張圖輕松掌握 Flink on YARN 基礎(chǔ)架構(gòu)與啟動(dòng)流程
Flink on YARN 模式啟動(dòng)流程圖
Flink on YARN 集群部署模式涉及 YARN 和 Flink 兩大開(kāi)源框架,應(yīng)用啟動(dòng)流程的很多環(huán)節(jié)交織在一起,下圖展示了 Flink on YARN 基礎(chǔ)架構(gòu)和應(yīng)用啟動(dòng)全流程,并對(duì)關(guān)鍵角色和流程進(jìn)行了介紹說(shuō)明。整個(gè)啟動(dòng)流程被劃分成客戶端提交(流程標(biāo)注為紫色)、Flink Cluster 啟動(dòng)和 Job 提交運(yùn)行。

客戶端提交流程
- 執(zhí)行命令:
bin/flink run -d -m yarn-cluster ...
##或
bin/yarn-session.sh ...
來(lái)提交 per-job 運(yùn)行模式或 session 運(yùn)行模式的應(yīng)用;
- 解析命令參數(shù)項(xiàng)并初始化,啟動(dòng)指定運(yùn)行模式,如果是 per-job 運(yùn)行模式將根據(jù)命令行參數(shù)指定的 Job 主類(lèi)創(chuàng)建 job graph;
- 如果可以從命令行參數(shù)(-yid <APPLICATION_ID>)或 YARN properties 臨時(shí)文件(
${java.io.tmpdir}/.yarn-properties-${user.name})中獲取應(yīng)用 ID,向指定的應(yīng)用中提交 Job;否則當(dāng)命令行參數(shù)中包含 -d(表示detached模式)和 -m yarn-cluster(表示指定 YARN 集群模式),啟動(dòng) per-job 運(yùn)行模式;否則當(dāng)命令行參數(shù)項(xiàng)不包含 -yq(表示查詢YARN集群可用資源)時(shí),啟動(dòng) session 運(yùn)行模式;
- 獲取 YARN 集群信息、新應(yīng)用 ID 并啟動(dòng)運(yùn)行前檢查;
- 通過(guò) YarnClient 向 YARN ResourceManager (下文縮寫(xiě)為:YARN RM,YARN Master 節(jié)點(diǎn),負(fù)責(zé)整個(gè)集群資源的管理和調(diào)度)請(qǐng)求創(chuàng)建一個(gè)新應(yīng)用(YARN RM 收到創(chuàng)建應(yīng)用請(qǐng)求后生成新應(yīng)用 ID 和 container 申請(qǐng)的資源上限后返回),并且獲取 YARN Slave 節(jié)點(diǎn)報(bào)告(YARN RM 返回全部 slave 節(jié)點(diǎn)的 ID、狀態(tài)、rack、http 地址、總資源、已使用資源等信息);
- 運(yùn)行前檢查:
(1) 簡(jiǎn)單驗(yàn)證YARN集群能否訪問(wèn);
(2) 最大 node 資源能否滿足 flink JobManager/TaskManager vcores 資源申請(qǐng)需求;
(3) 指定 queue 是否存在(不存在也只是打印WARN信息,后續(xù)向YARN提交時(shí)排除異常并退出);
(4)當(dāng)預(yù)期應(yīng)用申請(qǐng)的Container資源會(huì)超出YARN資源限制時(shí)拋出異常并退出;
(5) 當(dāng)預(yù)期應(yīng)用申請(qǐng)不能被滿足時(shí)(例如總資源超出YARN集群可用資源總量、Container申請(qǐng)資源超出NM可用資源最大值等)提供一些參考信息。
將應(yīng)用配置(flink-conf.yaml、logback.xml、log4j.properties)和相關(guān)文件(flink jars、ship files、user jars、job graph等)上傳至分布式存儲(chǔ)(例如 HDFS)的應(yīng)用暫存目錄(
/user/${user.name}/.flink/);準(zhǔn)備應(yīng)用提交上下文(ApplicationSubmissionContext,包括應(yīng)用的名稱(chēng)、類(lèi)型、隊(duì)列、標(biāo)簽等信息和應(yīng)用 Master 的 container 的環(huán)境變量、classpath、資源大小等),注冊(cè)處理部署失敗的 shutdown hook(清理應(yīng)用對(duì)應(yīng)的 HDFS 目錄),然后通過(guò) YarnClient 向 YARN RM 提交應(yīng)用;
循環(huán)等待直到應(yīng)用狀態(tài)為 RUNNING,包含兩個(gè)階段:
- 循環(huán)等待應(yīng)用提交成功(SUBMITTED):默認(rèn)每隔 200ms 通過(guò) YarnClient 獲取應(yīng)用報(bào)告,如果應(yīng)用狀態(tài)不是 NEW 和 NEW_SAVING 則認(rèn)為提交成功并退出循環(huán),每循環(huán) 10 次會(huì)將當(dāng)前的應(yīng)用狀態(tài)輸出至日志:"Application submission is not finished, submitted application <APPLICATION_ID> is still in <APP_STATE>",提交成功后輸出日志:"Submitted application <APPLICATION_ID>"
- 循環(huán)等待應(yīng)用正常運(yùn)行(RUNNING):每隔 250 ms 通過(guò) YarnClient 獲取應(yīng)用報(bào)告,每輪循環(huán)也會(huì)將當(dāng)前的應(yīng)用狀態(tài)輸出至日志:"Deploying cluster, current state <APP_STATE>"。應(yīng)用狀態(tài)成功變?yōu)?RUNNING 后將輸出日志"YARN application has been deployed successfully."并退出循環(huán),如果等到的是非預(yù)期狀態(tài)如 FAILED/FINISHED/KILLED,就會(huì)在輸出 YARN 返回的診斷信息("The YARN application unexpectedly switched to state <APP_STATE> during deployment. Diagnostics from YARN: ...")之后拋出異常并退出。
Flink Cluster 啟動(dòng)流程
YARN RM 中的 ClientRMService(為普通用戶提供的 RPC 服務(wù)組件,處理來(lái)自客戶端的各種 RPC 請(qǐng)求,比如查詢 YARN 集群信息,提交、終止應(yīng)用等)接收到應(yīng)用提交請(qǐng)求,簡(jiǎn)單校驗(yàn)后將請(qǐng)求轉(zhuǎn)交給 RMAppManager(YARN RM 內(nèi)部管理應(yīng)用生命周期的組件);
RMAppManager 根據(jù)應(yīng)用提交上下文內(nèi)容創(chuàng)建初始狀態(tài)為 NEW 的應(yīng)用,將應(yīng)用狀態(tài)持久化到 RM 狀態(tài)存儲(chǔ)服務(wù)(例如 ZooKeeper 集群,RM 狀態(tài)存儲(chǔ)服務(wù)用來(lái)保證 RM 重啟、HA 切換或發(fā)生故障后集群應(yīng)用能夠正?;謴?fù),后續(xù)流程中的涉及狀態(tài)存儲(chǔ)時(shí)不再贅述),應(yīng)用狀態(tài)變?yōu)?NEW_SAVING;
應(yīng)用狀態(tài)存儲(chǔ)完成后,應(yīng)用狀態(tài)變?yōu)?SUBMITTED;RMAppManager 開(kāi)始向 ResourceScheduler(YARN RM 可拔插資源調(diào)度器,YARN 自帶三種調(diào)度器 FifoScheduler/FairScheduler/CapacityScheduler,其中 CapacityScheduler 支持功能最多使用最廣泛,F(xiàn)ifoScheduler 功能最簡(jiǎn)單基本不可用,今年社區(qū)已明確不再繼續(xù)支持 FairScheduler,建議已有用戶遷至 CapacityScheduler)提交應(yīng)用,如果無(wú)法正常提交(例如隊(duì)列不存在、不是葉子隊(duì)列、隊(duì)列已停用、超出隊(duì)列最大應(yīng)用數(shù)限制等)則拋出拒絕該應(yīng)用,應(yīng)用狀態(tài)先變?yōu)?FINAL_SAVING 觸發(fā)應(yīng)用狀態(tài)存儲(chǔ)流程并在完成后變?yōu)?FAILED;如果提交成功,應(yīng)用狀態(tài)變?yōu)?ACCEPTED;
開(kāi)始創(chuàng)建應(yīng)用運(yùn)行實(shí)例(ApplicationAttempt,由于一次運(yùn)行實(shí)例中最重要的組件是 ApplicationMaster,下文簡(jiǎn)稱(chēng) AM,它的狀態(tài)代表了 ApplicationAttempt 的當(dāng)前狀態(tài),所以 ApplicationAttempt 實(shí)際也代表了AM),初始狀態(tài)為 NEW;
初始化應(yīng)用運(yùn)行實(shí)例信息,并向 ApplicationMasterService(AM&RM 協(xié)議接口服務(wù),處理來(lái)自 AM 的請(qǐng)求,主要包括注冊(cè)和心跳)注冊(cè),應(yīng)用實(shí)例狀態(tài)變?yōu)?SUBMITTED;
RMAppManager 維護(hù)的應(yīng)用實(shí)例開(kāi)始初始化 AM 資源申請(qǐng)信息并重新校驗(yàn)隊(duì)列,然后向 ResourceScheduler 申請(qǐng) AM Container(Container 是 YARN 中資源的抽象,包含了內(nèi)存、CPU 等多維度資源),應(yīng)用實(shí)例狀態(tài)變?yōu)?ACCEPTED;
ResourceScheduler 會(huì)根據(jù)優(yōu)先級(jí)(隊(duì)列/應(yīng)用/請(qǐng)求每個(gè)維度都有優(yōu)先級(jí)配置)從根隊(duì)列開(kāi)始層層遞進(jìn),先后選擇當(dāng)前優(yōu)先級(jí)最高的子隊(duì)列、應(yīng)用直至具體某個(gè)請(qǐng)求,然后結(jié)合集群資源分布等情況作出分配決策,AM Container 分配成功后,應(yīng)用實(shí)例狀態(tài)變?yōu)?ALLOCATED_SAVING,并觸發(fā)應(yīng)用實(shí)例狀態(tài)存儲(chǔ)流程,存儲(chǔ)成功后應(yīng)用實(shí)例狀態(tài)變?yōu)?ALLOCATED;
RMAppManager 維護(hù)的應(yīng)用實(shí)例開(kāi)始通知 ApplicationMasterLauncher(AM 生命周期管理服務(wù),負(fù)責(zé)啟動(dòng)或清理 AM container)啟動(dòng) AM container,ApplicationMasterLauncher 與 YARN NodeManager(下文簡(jiǎn)稱(chēng) YARN NM,與 YARN RM 保持通信,負(fù)責(zé)管理單個(gè)節(jié)點(diǎn)上的全部資源、Container 生命周期、附屬服務(wù)等,監(jiān)控節(jié)點(diǎn)健康狀況和 Container 資源使用)建立通信并請(qǐng)求啟動(dòng) AM container;
ContainerManager(YARN NM 核心組件,管理所有 Container 的生命周期)接收到 AM container 啟動(dòng)請(qǐng)求,YARN NM 開(kāi)始校驗(yàn) Container Token 及資源文件,創(chuàng)建應(yīng)用實(shí)例和 Container 實(shí)例并存儲(chǔ)至本地,結(jié)果返回后應(yīng)用實(shí)例狀態(tài)變?yōu)?LAUNCHED;
ResourceLocalizationService(資源本地化服務(wù),負(fù)責(zé) Container 所需資源的本地化。它能夠按照描述從 HDFS 上下載 Container 所需的文件資源,并盡量將它們分?jǐn)偟礁鱾€(gè)磁盤(pán)上以防止出現(xiàn)訪問(wèn)熱點(diǎn))初始化各種服務(wù)組件、創(chuàng)建工作目錄、從 HDFS 下載運(yùn)行所需的各種資源至 Container 工作目錄(路徑為:
${yarn.nodemanager.localdirs}/usercache/${user}/appcache/<APPLICATION_ID>/<CONTAINER_ID>);ContainersLauncher(負(fù)責(zé)container的具體操作,包括啟動(dòng)、重啟、恢復(fù)和清理等)將待運(yùn)行 Container 所需的環(huán)境變量和運(yùn)行命令寫(xiě)到 Container 工作目錄下的 launch_container.sh 腳本中,然后運(yùn)行該腳本啟動(dòng) Container;
Container 進(jìn)程加載并運(yùn)行 ClusterEntrypoint(Flink JobManager 入口類(lèi),每種集群部署模式和應(yīng)用運(yùn)行模式都有相應(yīng)的實(shí)現(xiàn),例如在 YARN 集群部署模式下, per-job 應(yīng)用運(yùn)行模式實(shí)現(xiàn)類(lèi)是 YarnJobClusterEntrypoint,session 應(yīng)用運(yùn)行模式實(shí)現(xiàn)類(lèi)是 YarnSessionClusterEntrypoint),首先初始化相關(guān)運(yùn)行環(huán)境:
- 輸出各軟件版本及運(yùn)行環(huán)境信息、命令行參數(shù)項(xiàng)、classpath 等信息;
- 注冊(cè)處理各種 SIGNAL 的 handler :記錄到日志
- 注冊(cè) JVM 關(guān)閉保障的 shutdown hook:避免 JVM 退出時(shí)被其他 shutdown - hook 阻塞打印 YARN 運(yùn)行環(huán)境信息:用戶名
- 從運(yùn)行目錄中加載 flink conf
- 初始化文件系統(tǒng)
- 創(chuàng)建并啟動(dòng)各類(lèi)內(nèi)部服務(wù)(包括 RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore 等)
- 將 RPC address 和 port 更新到 flink conf 配置
啟動(dòng) ResourceManager(Flink 資源管理核心組件,包含 YarnResourceManager 和 SlotManager 兩個(gè)子組件,YarnResourceManager 負(fù)責(zé)外部資源管理,與 YARN RM 建立通信并保持心跳,申請(qǐng)或釋放 TaskManager 資源,注銷(xiāo)應(yīng)用等;SlotManager 則負(fù)責(zé)內(nèi)部資源管理,維護(hù)全部 Slot 信息和狀態(tài))及相關(guān)服務(wù),創(chuàng)建異步 AMRMClient,開(kāi)始注冊(cè) AM,注冊(cè)成功后每隔一段時(shí)間(心跳間隔配置項(xiàng):
${yarn.heartbeat.interval},默認(rèn) 5s)向 YARN RM 發(fā)送心跳來(lái)發(fā)送資源更新請(qǐng)求和接受資源變更結(jié)果。YARN RM 內(nèi)部該應(yīng)用和應(yīng)用運(yùn)行實(shí)例的狀態(tài)都變?yōu)?RUNNING,并通知 AMLivelinessMonitor 服務(wù)監(jiān)控 AM 是否存活狀態(tài),當(dāng)心跳超過(guò)一定時(shí)間(默認(rèn) 10 分鐘)觸發(fā) AM failover 流程;啟動(dòng) Dispatcher(負(fù)責(zé)接收用戶提供的作業(yè),并且負(fù)責(zé)為這個(gè)新提交的作業(yè)拉起一個(gè)新的 JobManager)及相關(guān)服務(wù)(包括 REST endpoint 等),在 per-job 運(yùn)行模式下,Dispatcher 將直接從 Container 工作目錄加載 JobGraph 文件;在 session 運(yùn)行模式下,Dispatcher 將在接收客戶端提交的 Job(_通過(guò) BlockServer 接收 job graph 文件)后再進(jìn)行后續(xù)流程;
根據(jù) JobGraph 啟動(dòng) JobManager(負(fù)責(zé)作業(yè)調(diào)度、管理 Job 和 Task 的生命周期),構(gòu)建 ExecutionGraph(JobGraph 的并行化版本,調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu));
JobManager 開(kāi)始執(zhí)行 ExecutionGraph,向 ResourceManager 申請(qǐng)資源;
ResourceManager 將資源請(qǐng)求加入等待請(qǐng)求隊(duì)列,并通過(guò)心跳向 YARN RM 申請(qǐng)新的 Container 資源來(lái)啟動(dòng) TaskManager 進(jìn)程;后續(xù)流程如果有空閑 Slot 資源,SlotManager 將其分配給等待請(qǐng)求隊(duì)列中匹配的請(qǐng)求,不用再通過(guò) 18. YarnResourceManager 申請(qǐng)新的 TaskManager;
YARN ApplicationMasterService 接收到資源請(qǐng)求后,解析出新的資源請(qǐng)求并更新應(yīng)用請(qǐng)求信息;
YARN ResourceScheduler 成功為該應(yīng)用分配資源后更新應(yīng)用信息,ApplicationMasterService 接收到 Flink JobManager 的下一次心跳時(shí)返回新分配資源信息;
Flink ResourceManager 接收到新分配的 Container 資源后,準(zhǔn)備好 TaskManager 啟動(dòng)上下文(ContainerLauncherContext,生成 TaskManager 配置并上傳至分布式存儲(chǔ),配置其他依賴(lài)和環(huán)境變量等),然后向 YARN NM 申請(qǐng)啟動(dòng) TaskManager 進(jìn)程,YARN NM 啟動(dòng) Container 的流程與 AM Container 啟動(dòng)流程基本類(lèi)似,區(qū)別在于應(yīng)用實(shí)例在 NM 上已存在并未 RUNNING 狀態(tài)時(shí)則跳過(guò)應(yīng)用實(shí)例初始化流程,這里不再贅述;
TaskManager 進(jìn)程加載并運(yùn)行 YarnTaskExecutorRunner(Flink TaskManager入口類(lèi)),初始化流程完成后啟動(dòng) TaskExecutor(負(fù)責(zé)執(zhí)行Task相關(guān)操作);
TaskExecutor 啟動(dòng)后先向 ResourceManager 注冊(cè),成功后再向 SlotManager 匯報(bào)自己的 Slot 資源與狀態(tài);
SlotManager 接收到 Slot 空閑資源后主動(dòng)觸發(fā) Slot 分配,從等待請(qǐng)求隊(duì)列中選出合適的資源請(qǐng)求后,向 TaskManager 請(qǐng)求該 Slot 資源TaskManager 收到請(qǐng)求后檢查該 Slot 是否可分配(不存在則返回異常信息)、 Job 是否已注冊(cè)(沒(méi)有則先注冊(cè)再分配 Slot),檢查通過(guò)后將 Slot 分配給 JobManager;
JobManager 檢查 Slot 分配是否重復(fù),通過(guò)后通知 Execution 執(zhí)行部署 task 流程,向 TaskExecutor 提交 task;TaskExecutor 啟動(dòng)新的線程運(yùn)行 Task。