Flink?批作業(yè)如何在?Master?節(jié)點(diǎn)出錯(cuò)重啟后恢復(fù)執(zhí)行進(jìn)度?

摘要:本文撰寫自阿里云研發(fā)工程師李俊睿(昕程),主要介紹 Flink 1.20 版本中引入了批作業(yè)在 JM failover 后的進(jìn)度恢復(fù)功能。主要分為以下四個(gè)內(nèi)容:

  1. 背景
  2. 解決思路
  3. 使用效果
  4. 如何啟用

一、背景

在 Flink 1.20 版本之前,如果 Flink 的 JobMaster(JM)發(fā)生故障導(dǎo)致被終止,將會(huì)發(fā)生如下兩種情況:

  • 如果作業(yè)未啟用高可用性(HA),作業(yè)將失敗。

  • 如果作業(yè)啟用了 HA,JM 會(huì)被自動(dòng)重新拉起 (JM failover)。在這種情況下,流作業(yè)將從最后一個(gè)成功的檢查點(diǎn)恢復(fù)。然而,批作業(yè)由于缺乏檢查點(diǎn)機(jī)制,將不得不從頭開始運(yùn)行,導(dǎo) 致之前的所有進(jìn)度丟失。這對(duì)于需要長(zhǎng)時(shí)間運(yùn)行的批作業(yè)來(lái)說(shuō),意味著巨大的回退。

為了解決這一問題,我們?cè)?Flink 1.20 版本中引入了批作業(yè)在 JM failover 后的進(jìn)度恢復(fù)功能。這一功能的目的是使批作業(yè)在 JM failover 后能夠盡可能地恢復(fù)到出錯(cuò)前的進(jìn)度,避免重新運(yùn)行已完成

的任務(wù)。

二、解決思路

為了實(shí)現(xiàn)這一目標(biāo),我們需要能夠?qū)?JM 的狀態(tài)持久化到外部存儲(chǔ),從而在 JM 發(fā)生 failover 后,F(xiàn)link 能夠利用這些狀態(tài)信息恢復(fù)作業(yè)到之前的運(yùn)行進(jìn)度。

我們?cè)O(shè)計(jì)了一種基于事件的 JM 狀態(tài)恢復(fù)機(jī)制,在作業(yè)正常運(yùn)行時(shí),JM 會(huì)將狀態(tài)變更事件寫入外部持久化存儲(chǔ),以確保在 JM failover 后仍能獲得作業(yè)的執(zhí)行進(jìn)度。此外,我們還需要解決 JM failover 后實(shí)際作業(yè)狀態(tài)與狀態(tài)變更事件可能不一致的問題。例如,某些 TaskManager (TM)在運(yùn)行過(guò)程中意外丟失,可能導(dǎo)致中間數(shù)據(jù)結(jié)果無(wú)法訪問。因此,F(xiàn)link 必須從 TM 和 Remote Shuffle Service (RSS)獲取中間結(jié)果數(shù)據(jù)的信息,來(lái)對(duì)作業(yè)運(yùn)行進(jìn)度的恢復(fù)結(jié)果進(jìn)行校準(zhǔn)。

該功能的整體流程分為如下幾個(gè)階段:

  1. 作業(yè)執(zhí)行時(shí) 我們引入了 JobEventStore 組件,該組件負(fù)責(zé)在作業(yè)正常運(yùn)行期間將 JM 的狀態(tài)變更事件寫入到外部文件系統(tǒng)中。其中需要被寫入的狀態(tài)變更事件分為如下以下幾類:

    (1)自適應(yīng)執(zhí)行計(jì)劃優(yōu)化:Flink 會(huì)自適應(yīng)地優(yōu)化批作業(yè)的執(zhí)行計(jì)劃,這些優(yōu)化結(jié)果是基于上游的執(zhí)行結(jié)果來(lái)確定的。如果每次都依賴上游的執(zhí)行結(jié)果進(jìn)行重建,將會(huì)產(chǎn)生較大的開銷。因此,記錄這些優(yōu)化結(jié)果對(duì)于任務(wù)調(diào)度和容錯(cuò)非常重要。

    (2)已經(jīng)結(jié)束的 Task 信息:保存已完成任務(wù)的執(zhí)行進(jìn)度,以便在恢復(fù)作業(yè)時(shí)能夠準(zhǔn)確地繼續(xù)從上次執(zhí)行的位置開始。

    (3)OperatorCoordinator 狀態(tài):OperatorCoordinator 負(fù)責(zé)協(xié)調(diào)算子,實(shí)行算子之間的通信,其狀態(tài)與數(shù)據(jù)一致性密切相關(guān)。例如,SourceCoordinator 中包含記錄哪些數(shù)據(jù)分片已經(jīng)分發(fā)的狀態(tài)信息。重建該組件的狀態(tài)有助于保證數(shù)據(jù)的一致性。

    (4)ShuffleMaster 狀態(tài):Flink 目前支持 RSS,而 RSS 的 Shuffle Master 可能會(huì)保存一些狀態(tài)信息,如 Shuffle 數(shù)據(jù)的元數(shù)據(jù)。為了使新的 JobManager 能夠復(fù)用這些中間結(jié)果,恢復(fù) Shuffle Master 的狀態(tài)是必不可少的。

  1. JM failover 期間 Flink 批作業(yè)在運(yùn)行過(guò)程中,其中間結(jié)果數(shù)據(jù)會(huì)保存在 TM 上和 RSS 上。當(dāng) JM 發(fā)生故障時(shí),TM 和 RSS 將保留與作業(yè)相關(guān)的中間結(jié)果數(shù)據(jù),并不斷地嘗試重新連接到 JM。一旦新的 JM 重新被拉起來(lái)后,TM 和 RSS 將重新與 JM 建立連接,然后 TM 和 RSS 會(huì)主動(dòng)上報(bào)它們持有的中間結(jié)果數(shù)據(jù)。
  1. JM failover 后的作業(yè)進(jìn)度恢復(fù)

一旦 JM 重啟,它會(huì)與 TM 和 RSS 重新建立連接,利用 JobEventStore 中記錄的事件以及 TM 和 RSS 保留的中間結(jié)果數(shù)據(jù),來(lái)重建作業(yè)的執(zhí)行進(jìn)度。

JM 首先會(huì)利用 JobEventStore 中記錄的事件,恢復(fù)作業(yè)各個(gè)節(jié)點(diǎn)的執(zhí)行狀態(tài)。

然后根據(jù) OperatorCoordinator 的狀態(tài),JM 會(huì)恢復(fù)尚未處理的 Source 數(shù)據(jù)分片,以避免數(shù)據(jù)丟失或重復(fù)。

隨后,JM 將根據(jù)匯報(bào)上來(lái)的可用中間數(shù)據(jù)進(jìn)一步校正執(zhí)行進(jìn)度。如果某個(gè) task 產(chǎn)生的中間數(shù)據(jù)丟失,但這些數(shù)據(jù)仍被下游 task 所需要,那么該 task 將被重置并重新執(zhí)行。

最后作業(yè)將從恢復(fù)出來(lái)的進(jìn)度繼續(xù)執(zhí)行。

三、使用效果

以下是一個(gè) JM 出錯(cuò)重啟后進(jìn)度恢復(fù)的效果示例。

該批作業(yè)的拓?fù)浣Y(jié)構(gòu)為 Source -> Map -> Sink ,當(dāng)作業(yè)運(yùn)行到 Map 節(jié)點(diǎn)時(shí),因?yàn)橥獠糠?wù)的原因?qū)е?JM 所在機(jī)器下線,從而造成了 JM failover。

隨后,高可用服務(wù)將會(huì)自動(dòng)拉起新的 JM 進(jìn)程,作業(yè)將進(jìn)入 RECONCILING 狀態(tài),表示作業(yè)進(jìn)入了恢復(fù)運(yùn)行進(jìn)展的階段。

當(dāng)作業(yè)恢復(fù)完成后,將進(jìn)入 RUNNING 狀態(tài)。

點(diǎn)進(jìn)作業(yè)詳情頁(yè)后,可以觀察到作業(yè)已經(jīng)恢復(fù)到 JM failover 前到進(jìn)展了。

四、如何啟用

要使用 Flink 批作業(yè)的狀態(tài)恢復(fù)功能,用戶需要:

  1. 確保已啟用集群高可用:目前 Flink 提供了基于 Zookeeper 和 Kubernetes 的兩種高可用服務(wù),更多細(xì)節(jié)詳見官方文檔
  2. 配置 execution.batch.job-recovery.enabled: true

所有 new source 都支持批處理作業(yè)在 JM 出錯(cuò)后進(jìn)行進(jìn)度恢復(fù)。然而,為了實(shí)現(xiàn)細(xì)粒度的進(jìn)度恢復(fù),new sourceSplitEnumerator 需要實(shí)現(xiàn) SupportsBatchSnapshot 接口,否則只有在該 source 的所有并發(fā)任務(wù)完成后,才能在 JM 出錯(cuò)恢復(fù)后避免重新執(zhí)行這個(gè) source 的 task。當(dāng)前,F(xiàn)ileSource 和 HiveSource 已經(jīng)實(shí)現(xiàn)了該接口。詳細(xì)信息請(qǐng)參見官方文檔。

考慮到不同集群和作業(yè)的差異,為了讓批作業(yè)在 job master failover 后能夠盡可能的恢復(fù)出錯(cuò)前的進(jìn)度,避免重新運(yùn)行已完成的任務(wù),用戶可以參考此文檔進(jìn)行配置項(xiàng)調(diào)優(yōu)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容