
摘要:本文撰寫自阿里云研發(fā)工程師李俊睿(昕程),主要介紹 Flink 1.20 版本中引入了批作業(yè)在 JM failover 后的進(jìn)度恢復(fù)功能。主要分為以下四個(gè)內(nèi)容:
- 背景
- 解決思路
- 使用效果
- 如何啟用
一、背景
在 Flink 1.20 版本之前,如果 Flink 的 JobMaster(JM)發(fā)生故障導(dǎo)致被終止,將會(huì)發(fā)生如下兩種情況:
如果作業(yè)未啟用高可用性(HA),作業(yè)將失敗。
如果作業(yè)啟用了 HA,JM 會(huì)被自動(dòng)重新拉起 (JM failover)。在這種情況下,流作業(yè)將從最后一個(gè)成功的檢查點(diǎn)恢復(fù)。然而,批作業(yè)由于缺乏檢查點(diǎn)機(jī)制,將不得不從頭開始運(yùn)行,導(dǎo) 致之前的所有進(jìn)度丟失。這對(duì)于需要長(zhǎng)時(shí)間運(yùn)行的批作業(yè)來(lái)說(shuō),意味著巨大的回退。
為了解決這一問題,我們?cè)?Flink 1.20 版本中引入了批作業(yè)在 JM failover 后的進(jìn)度恢復(fù)功能。這一功能的目的是使批作業(yè)在 JM failover 后能夠盡可能地恢復(fù)到出錯(cuò)前的進(jìn)度,避免重新運(yùn)行已完成
的任務(wù)。
二、解決思路
為了實(shí)現(xiàn)這一目標(biāo),我們需要能夠?qū)?JM 的狀態(tài)持久化到外部存儲(chǔ),從而在 JM 發(fā)生 failover 后,F(xiàn)link 能夠利用這些狀態(tài)信息恢復(fù)作業(yè)到之前的運(yùn)行進(jìn)度。
我們?cè)O(shè)計(jì)了一種基于事件的 JM 狀態(tài)恢復(fù)機(jī)制,在作業(yè)正常運(yùn)行時(shí),JM 會(huì)將狀態(tài)變更事件寫入外部持久化存儲(chǔ),以確保在 JM failover 后仍能獲得作業(yè)的執(zhí)行進(jìn)度。此外,我們還需要解決 JM failover 后實(shí)際作業(yè)狀態(tài)與狀態(tài)變更事件可能不一致的問題。例如,某些 TaskManager (TM)在運(yùn)行過(guò)程中意外丟失,可能導(dǎo)致中間數(shù)據(jù)結(jié)果無(wú)法訪問。因此,F(xiàn)link 必須從 TM 和 Remote Shuffle Service (RSS)獲取中間結(jié)果數(shù)據(jù)的信息,來(lái)對(duì)作業(yè)運(yùn)行進(jìn)度的恢復(fù)結(jié)果進(jìn)行校準(zhǔn)。
該功能的整體流程分為如下幾個(gè)階段:
-
作業(yè)執(zhí)行時(shí) 我們引入了 JobEventStore 組件,該組件負(fù)責(zé)在作業(yè)正常運(yùn)行期間將 JM 的狀態(tài)變更事件寫入到外部文件系統(tǒng)中。其中需要被寫入的狀態(tài)變更事件分為如下以下幾類:
(1)自適應(yīng)執(zhí)行計(jì)劃優(yōu)化:Flink 會(huì)自適應(yīng)地優(yōu)化批作業(yè)的執(zhí)行計(jì)劃,這些優(yōu)化結(jié)果是基于上游的執(zhí)行結(jié)果來(lái)確定的。如果每次都依賴上游的執(zhí)行結(jié)果進(jìn)行重建,將會(huì)產(chǎn)生較大的開銷。因此,記錄這些優(yōu)化結(jié)果對(duì)于任務(wù)調(diào)度和容錯(cuò)非常重要。
(2)已經(jīng)結(jié)束的 Task 信息:保存已完成任務(wù)的執(zhí)行進(jìn)度,以便在恢復(fù)作業(yè)時(shí)能夠準(zhǔn)確地繼續(xù)從上次執(zhí)行的位置開始。
(3)OperatorCoordinator 狀態(tài):OperatorCoordinator 負(fù)責(zé)協(xié)調(diào)算子,實(shí)行算子之間的通信,其狀態(tài)與數(shù)據(jù)一致性密切相關(guān)。例如,SourceCoordinator 中包含記錄哪些數(shù)據(jù)分片已經(jīng)分發(fā)的狀態(tài)信息。重建該組件的狀態(tài)有助于保證數(shù)據(jù)的一致性。
(4)ShuffleMaster 狀態(tài):Flink 目前支持 RSS,而 RSS 的 Shuffle Master 可能會(huì)保存一些狀態(tài)信息,如 Shuffle 數(shù)據(jù)的元數(shù)據(jù)。為了使新的 JobManager 能夠復(fù)用這些中間結(jié)果,恢復(fù) Shuffle Master 的狀態(tài)是必不可少的。

- JM failover 期間 Flink 批作業(yè)在運(yùn)行過(guò)程中,其中間結(jié)果數(shù)據(jù)會(huì)保存在 TM 上和 RSS 上。當(dāng) JM 發(fā)生故障時(shí),TM 和 RSS 將保留與作業(yè)相關(guān)的中間結(jié)果數(shù)據(jù),并不斷地嘗試重新連接到 JM。一旦新的 JM 重新被拉起來(lái)后,TM 和 RSS 將重新與 JM 建立連接,然后 TM 和 RSS 會(huì)主動(dòng)上報(bào)它們持有的中間結(jié)果數(shù)據(jù)。

- JM failover 后的作業(yè)進(jìn)度恢復(fù)
一旦 JM 重啟,它會(huì)與 TM 和 RSS 重新建立連接,利用 JobEventStore 中記錄的事件以及 TM 和 RSS 保留的中間結(jié)果數(shù)據(jù),來(lái)重建作業(yè)的執(zhí)行進(jìn)度。
JM 首先會(huì)利用 JobEventStore 中記錄的事件,恢復(fù)作業(yè)各個(gè)節(jié)點(diǎn)的執(zhí)行狀態(tài)。
然后根據(jù) OperatorCoordinator 的狀態(tài),JM 會(huì)恢復(fù)尚未處理的 Source 數(shù)據(jù)分片,以避免數(shù)據(jù)丟失或重復(fù)。
隨后,JM 將根據(jù)匯報(bào)上來(lái)的可用中間數(shù)據(jù)進(jìn)一步校正執(zhí)行進(jìn)度。如果某個(gè) task 產(chǎn)生的中間數(shù)據(jù)丟失,但這些數(shù)據(jù)仍被下游 task 所需要,那么該 task 將被重置并重新執(zhí)行。
最后作業(yè)將從恢復(fù)出來(lái)的進(jìn)度繼續(xù)執(zhí)行。

三、使用效果
以下是一個(gè) JM 出錯(cuò)重啟后進(jìn)度恢復(fù)的效果示例。
該批作業(yè)的拓?fù)浣Y(jié)構(gòu)為 Source -> Map -> Sink ,當(dāng)作業(yè)運(yùn)行到 Map 節(jié)點(diǎn)時(shí),因?yàn)橥獠糠?wù)的原因?qū)е?JM 所在機(jī)器下線,從而造成了 JM failover。

隨后,高可用服務(wù)將會(huì)自動(dòng)拉起新的 JM 進(jìn)程,作業(yè)將進(jìn)入 RECONCILING 狀態(tài),表示作業(yè)進(jìn)入了恢復(fù)運(yùn)行進(jìn)展的階段。

當(dāng)作業(yè)恢復(fù)完成后,將進(jìn)入 RUNNING 狀態(tài)。

點(diǎn)進(jìn)作業(yè)詳情頁(yè)后,可以觀察到作業(yè)已經(jīng)恢復(fù)到 JM failover 前到進(jìn)展了。

四、如何啟用
要使用 Flink 批作業(yè)的狀態(tài)恢復(fù)功能,用戶需要:
- 確保已啟用集群高可用:目前 Flink 提供了基于 Zookeeper 和 Kubernetes 的兩種高可用服務(wù),更多細(xì)節(jié)詳見官方文檔。
- 配置 execution.batch.job-recovery.enabled: true
所有 new source 都支持批處理作業(yè)在 JM 出錯(cuò)后進(jìn)行進(jìn)度恢復(fù)。然而,為了實(shí)現(xiàn)細(xì)粒度的進(jìn)度恢復(fù),new source的 SplitEnumerator 需要實(shí)現(xiàn) SupportsBatchSnapshot 接口,否則只有在該 source 的所有并發(fā)任務(wù)完成后,才能在 JM 出錯(cuò)恢復(fù)后避免重新執(zhí)行這個(gè) source 的 task。當(dāng)前,F(xiàn)ileSource 和 HiveSource 已經(jīng)實(shí)現(xiàn)了該接口。詳細(xì)信息請(qǐng)參見官方文檔。
考慮到不同集群和作業(yè)的差異,為了讓批作業(yè)在 job master failover 后能夠盡可能的恢復(fù)出錯(cuò)前的進(jìn)度,避免重新運(yùn)行已完成的任務(wù),用戶可以參考此文檔進(jìn)行配置項(xiàng)調(diào)優(yōu)。