Flink重點難點:Flink任務綜合調優(yōu)(Checkpoint/反壓/內存)

CheckPoint調優(yōu)

Flink中基于異步輕量級的分布式快照技術提供了Checkpoints容錯機制,Checkpoints可以將同一時間點作業(yè)/算子的狀態(tài)數據全局統(tǒng)一快照處理,包括前面提到的算子狀態(tài)和鍵值分區(qū)狀態(tài)。當發(fā)生了故障后,F(xiàn)link會將所有任務的狀態(tài)恢復至最后一次Checkpoint中的狀態(tài),并從那里重新開始執(zhí)行。

對于Flink Checkpoint的優(yōu)化至關重要。我們常見的優(yōu)化 Checkpoint的手段如下:

一、設置最小時間間隔


當Flink應用開啟Checkpoint功能,并配置Checkpoint時間間隔,應用中就會根據指定的時間間隔周期性地對應用進行Checkpoint操作。默認情況下Checkpoint操作都是同步進行,也就是說,當前面觸發(fā)的Checkpoint動作沒有完全結束時,之后的Checkpoint操作將不會被觸發(fā)。在這種情況下,如果Checkpoint過程持續(xù)的時間超過了配置的時間間隔,就會出現(xiàn)排隊的情況。如果有非常多的Checkpoint操作在排隊,就會占用額外的系統(tǒng)資源用于Checkpoint,此時用于任務計算的資源將會減少,進而影響到整個應用的性能和正常執(zhí)行。

在這種情況下,如果大狀態(tài)數據確實需要很長的時間來進行Checkpoint,那么只能對Checkpoint的時間間隔進行優(yōu)化,可以通過Checkpoint之間的最小間隔參數進行配置,讓Checkpoint之間根據Checkpoint執(zhí)行速度進行調整,前面的Checkpoint沒有完全結束,后面的Checkpoint操作也不會觸發(fā)。

streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

通過最小時間間隔參數配置,可以降低Checkpoint對系統(tǒng)的性能影響,但需要注意的事,對于非常大的狀態(tài)數據,最小時間間隔只能減輕Checkpoint之間的堆積情況。如果不能有效快速地完成Checkpoint,將會導致系統(tǒng)Checkpoint頻次越來越低,當系統(tǒng)出現(xiàn)問題時,沒有及時對狀態(tài)數據有效地持久化,可能會導致系統(tǒng)丟失數據。因此,對于非常大的狀態(tài)數據而言,應該對Checkpoint過程進行優(yōu)化和調整,例如采用增量Checkpoint的方法等。

用戶也可以通過配置CheckpointConfig中setMaxConcurrentCheckpoints()方法設定并行執(zhí)行的checkpoint數量,這種方法也能有效降低checkpoint堆積的問題,但會提高資源占用。同時,如果開始了并行checkpoint操作,當用戶以手動方式觸發(fā)savepoint的時候,checkpoint操作也將繼續(xù)執(zhí)行,這將影響到savepoint過程中對狀態(tài)數據的持久化。

二、預估狀態(tài)容量


除了對已經運行的任務進行checkpoint優(yōu)化,對整個任務需要的狀態(tài)數據量進行預估也非常重要,這樣才能選擇合適的checkpoint策略。對任務狀態(tài)數據存儲的規(guī)劃依賴于如下基本規(guī)則:

正常情況下應該盡可能留有足夠的資源來應對頻繁的反壓。

需要盡可能提供給額外的資源,以便在任務出現(xiàn)異常中斷的情況下處理積壓的數據。這些資源的預估都取決于任務停止過程中數據的積壓量,以及對任務恢復時間的要求。

系統(tǒng)中出現(xiàn)臨時性的反壓沒有太大的問題,但是如果系統(tǒng)中頻繁出現(xiàn)臨時性的反壓,例如下游外部系統(tǒng)臨時性變慢導致數據輸出速率下降,這種情況就需要考慮給予算子一定的資源。

部分算子導致下游的算子的負載非常高,下游的算子完全是取決于上游算子的輸出,因此對類似于窗口算子的估計也將會影響到整個任務的執(zhí)行,應該盡可能給這些算子留有足夠的資源以應對上游算子產生的影響。

三、異步Snapshot


默認情況下,應用中的checkpoint操作都是同步執(zhí)行的,在條件允許的情況下應該盡可能地使用異步的snapshot,這樣講大幅度提升checkpoint的性能,尤其是在非常復雜的流式應用中,如多數據源關聯(lián)、co-functions操作或windows操作等,都會有較好的性能改善。

Flink提供了異步快照(Asynchronous Snapshot)的機制。當實際執(zhí)行快照時,F(xiàn)link可以立即向下廣播Checkpoint Barrier,表示自己已經執(zhí)行完自己部分的快照。同時,F(xiàn)link啟動一個后臺線程,它創(chuàng)建本地狀態(tài)的一份拷貝,這個線程用來將本地狀態(tài)的拷貝同步到State Backend上,一旦數據同步完成,再給Checkpoint Coordinator發(fā)送確認信息??截愐环輸祿隙ㄕ加酶鄡却妫@時可以利用寫入時復制(Copy-on-Write)的優(yōu)化策略。Copy-on-Write指:如果這份內存數據沒有任何修改,那沒必要生成一份拷貝,只需要有一個指向這份數據的指針,通過指針將本地數據同步到State Backend上;如果這份內存數據有一些更新,那再去申請額外的內存空間并維護兩份數據,一份是快照時的數據,一份是更新后的數據。

在使用異步快照需要確認應用遵循以下兩點要求:

首先必須是Flink托管狀態(tài),即使用Flink內部提供的托管狀態(tài)所對應的數據結構,例如常用的有ValueState、ListState、ReducingState等類型狀態(tài)。

StateBackend必須支持異步快照,在Flink1.2的版本之前,只有RocksDB完整地支持異步的Snapshot操作,從Flink1.3版本以后可以在heap-based StateBackend中支持異步快照功能。

四、壓縮狀態(tài)數據


Flink中提供了針對checkpoint和savepoint的數據進行壓縮的方法,目前Flink僅支持通過用snappy壓縮算法對狀態(tài)數據進行壓縮,在未來的版本中Flink將支持其他壓縮算法。在壓縮過程中,F(xiàn)link的壓縮算法支持key-group層面壓縮,也就是不同的key-group分別被壓縮成不同的部分,因此解壓縮過程可以并發(fā)執(zhí)行,這對大規(guī)模數據的壓縮和解壓縮帶來非常高的性能提升和較強的可擴展性。Flink中使用的壓縮算法在ExecutionConfig中進行指定,通過將setUseSnapshotCompression方法中的值設定為true即可。

五、觀察checkpoint延遲時間


checkpoint延遲啟動時間并不會直接暴露在客戶端中,而是需要通過以下公式計算得出。如果改時間過長,則表明算子在進行barrier對齊,等待上游的算子將數據寫入到當前算子中,說明系統(tǒng)正處于一個反壓狀態(tài)下。checkpoint延遲時間可以通過整個端到端的計算時間減去異步持續(xù)的時間和同步持續(xù)的時間得出。

六、Checkpoint相關配置


默認情況下,Checkpoint機制是關閉的,需要調用env.enableCheckpointing(n)來開啟,每隔n毫秒進行一次Checkpoint。Checkpoint是一種負載較重的任務,如果狀態(tài)比較大,同時n值又比較小,那可能一次Checkpoint還沒完成,下次Checkpoint已經被觸發(fā),占用太多本該用于正常數據處理的資源。增大n值意味著一個作業(yè)的Checkpoint次數更少,整個作業(yè)用于進行Checkpoint的資源更小,可以將更多的資源用于正常的流數據處理。同時,更大的n值意味著重啟后,整個作業(yè)需要從更長的Offset開始重新處理數據。

此外,還有一些其他參數需要配置,這些參數統(tǒng)一封裝在了CheckpointConfig里:

val?cpConfig:?CheckpointConfig?=?env.getCheckpointConfig

默認的Checkpoint配置是支持Exactly-Once投遞的,這樣能保證在重啟恢復時,所有算子的狀態(tài)對任一條數據只處理一次。用上文的Checkpoint原理來說,使用Exactly-Once就是進行了Checkpoint Barrier對齊,因此會有一定的延遲。如果作業(yè)延遲小,那么應該使用At-Least-Once投遞,不進行對齊,但某些數據會被處理多次。

//?使用At-Least-Once

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

如果一次Checkpoint超過一定時間仍未完成,直接將其終止,以免其占用太多資源:

//?超時時間1小時

env.getCheckpointConfig.setCheckpointTimeout(3600*1000)

如果兩次Checkpoint之間的間歇時間太短,那么正常的作業(yè)可能獲取的資源較少,更多的資源被用在了Checkpoint上。對這個參數進行合理配置能保證數據流的正常處理。比如,設置這個參數為60秒,那么前一次Checkpoint結束后60秒內不會啟動新的Checkpoint。這種模式只在整個作業(yè)最多允許1個Checkpoint時適用。

//?兩次Checkpoint的間隔為60秒

env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)

默認情況下一個作業(yè)只允許1個Checkpoint執(zhí)行,如果某個Checkpoint正在進行,另外一個Checkpoint被啟動,新的Checkpoint需要掛起等待。

//?最多同時進行3個Checkpoint

env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)

如果這個參數大于1,將與前面提到的最短間隔相沖突。

Checkpoint的初衷是用來進行故障恢復,如果作業(yè)是因為異常而失敗,F(xiàn)link會保存遠程存儲上的數據;如果開發(fā)者自己取消了作業(yè),遠程存儲上的數據都會被刪除。如果開發(fā)者希望通過Checkpoint數據進行調試,自己取消了作業(yè),同時希望將遠程數據保存下來,需要設置為:

//?作業(yè)取消后仍然保存Checkpoint

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

RETAIN_ON_CANCELLATION模式下,用戶需要自己手動刪除遠程存儲上的Checkpoint數據。

默認情況下,如果Checkpoint過程失敗,會導致整個應用重啟,我們可以關閉這個功能,這樣Checkpoint失敗不影響作業(yè)的運行。

env.getCheckpointConfig.setFailOnCheckpointingErrors(false)


反壓調優(yōu)


我們在Flink重點原理與機制 | 網絡流控及反壓機制一文中介紹過Flink中的反壓機制和現(xiàn)象。

Flink1.5之前是基于TCP流控+bounded buffer實現(xiàn)反壓。在Flink 1.5之后實現(xiàn)了自己托管的credit-based流控機制,在應用層模擬TCP的流控機制。

(1)反壓的定位

當你的任務出現(xiàn)反壓時,如果你的上游是類似 Kafka 的消息系統(tǒng),很明顯的表現(xiàn)就是消費速度變慢,Kafka 消息出現(xiàn)堆積。

如果你的業(yè)務對數據延遲要求并不高,那么反壓其實并沒有很大的影響。但是對于規(guī)模很大的集群中的大作業(yè),反壓會造成嚴重的“并發(fā)癥”。首先任務狀態(tài)會變得很大,因為數據大規(guī)模堆積在系統(tǒng)中,這些暫時不被處理的數據同樣會被放到“狀態(tài)”中。另外,F(xiàn)link 會因為數據堆積和處理速度變慢導致 checkpoint 超時,而 checkpoint 是 Flink 保證數據一致性的關鍵所在,最終會導致數據的不一致發(fā)生。

那么我們應該如何發(fā)現(xiàn)任務是否出現(xiàn)反壓了呢?

(2)Flink Web UI

Flink 的后臺頁面是我們發(fā)現(xiàn)反壓問題的第一選擇。Flink 的后臺頁面可以直觀、清晰地看到當前作業(yè)的運行狀態(tài)。

如上圖所示,是 Flink 官網給出的計算反壓狀態(tài)的案例。需要注意的是,只有用戶在訪問點擊某一個作業(yè)時,才會觸發(fā)反壓狀態(tài)的計算。在默認的設置下,F(xiàn)link 的 TaskManager 會每隔 50 ms 觸發(fā)一次反壓狀態(tài)監(jiān)測,共監(jiān)測 100 次,并將計算結果反饋給 JobManager,最后由 JobManager 進行計算反壓的比例,然后進行展示。

這個比例展示邏輯如下:

OK: 0 <= Ratio <= 0.10,正常;

LOW: 0.10 < Ratio <= 0.5,一般;

HIGH: 0.5 < Ratio <= 1,嚴重。

官網同樣給出了不同反壓狀態(tài)下,F(xiàn)link Web UI 中任務運行的狀態(tài),如下圖所示:

(3)Flink Metrics

如果你想對 Flink 做更為詳細的監(jiān)控的話,F(xiàn)link 本身提供了大量的 REST API 來獲取任務的各種狀態(tài)。

Flink 提供的所有系統(tǒng)監(jiān)控指標你都點擊這里找到:?https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/

隨著版本的持續(xù)變更,截止 1.14.0 版本,F(xiàn)link 提供的監(jiān)控指標中與反壓最為密切的如下表所示:

我們逐個介紹一下這四個指標。

outPoolUsage

這個指標代表的是當前 Task 的數據發(fā)送速率,當一個 Task 的 outPoolUsage 很高,則代表著數據發(fā)送速度很快。但是當一個 Task 的 outPoolUsage 很低,那么就需要特別注意,有可能是下游的處理速度很低導致的,也有可能當前節(jié)點就是反壓節(jié)點,導致數據處理速度很慢。

inPoolUsage

inPoolUsage 表示當前 Task 的數據接收速率,通常會和 outPoolUsage 配合使用;如果一個節(jié)點的 inPoolUsage 很高而 outPoolUsage 很低,則這個節(jié)點很有可能就是反壓節(jié)點。

floatingBuffersUsage 和 exclusiveBuffersUsage

floatingBuffersUsage 表示處理節(jié)點緩沖池的使用率;exclusiveBuffersUsage 表示數據輸入通道緩沖池的使用率。


(4)反壓問題處理


我們已經知道反壓產生的原因和監(jiān)控的方法,當線上任務出現(xiàn)反壓時,需要如何處理呢?

主要通過以下幾個方面進行定位和處理:

數據傾斜

GC

代碼本身

數據傾斜

數據傾斜問題是我們生產環(huán)境中出現(xiàn)頻率最多的影響任務運行的因素,可以在 Flink 的后臺管理頁面看到每個 Task 處理數據的大小。當數據傾斜出現(xiàn)時,通常是簡單地使用類似 KeyBy 等分組聚合函數導致的,需要用戶將熱點 Key 進行預處理,降低或者消除熱點 Key 的影響。

GC

垃圾回收問題也是造成反壓的因素之一。不合理的設置 TaskManager 的垃圾回收參數會導致嚴重的 GC 問題,我們可以通過 -XX:+PrintGCDetails 參數查看 GC 的日志。

代碼本身

開發(fā)者錯誤地使用 Flink 算子,沒有深入了解算子的實現(xiàn)機制導致性能問題。我們可以通過查看運行機器節(jié)點的 CPU 和內存情況定位問題。

內存調優(yōu)


我們在《Flink重點難點:內存模型與內存結構》這篇文章中詳細講解了Flink的內存模型。

Flink JVM 進程的進程總內存(Total Process Memory)包含了由 Flink 應用使用的內存(Flink 總內存)以及由運行 Flink 的 JVM 使用的內存。Flink 總內存(Total Flink Memory)包括 JVM 堆內存(Heap Memory)和堆外內存(Off-Heap Memory)。其中堆外內存包括直接內存(Direct Memory)和本地內存(Native Memory)。

配置 Flink 進程內存最簡單的方法是指定以下兩個配置項中的任意一個:

Flink有三種部署方式(這里不談Flink on k8s),一種為本地模式,一種為standalone模式,還有一種為yarn或者mesos模式,這三種模式中,用戶必須要選擇一種進行配置(本地模式除外),否則flink將無法啟動,這意味著,用戶需要從以下的無默認值的配置參數中選擇一個給出明確的配置。

不建議同時設置進程總內存和 Flink 總內存。這可能會造成內存配置沖突,從而導致部署失敗。額外配置其他內存部分時,同樣需要注意可能產生的配置沖突

配置TaskManager內存

Flink 的 TaskManager 負責執(zhí)行用戶代碼。根據實際需求為 TaskManager 配置內存將有助于減少 Flink 的資源占用,增強作業(yè)運行的穩(wěn)定性。

本篇內存配置文檔僅針對 TaskManager與JobManager相比,TaskManager 具有相似但更加復雜的內存模型。

配置總內存

Flink JVM 進程的進程總內存(Total Process Memory)包含了由 Flink 應用使用的內存(Flink 總內存)以及由運行 Flink 的 JVM 使用的內存。其中,F(xiàn)link 總內存(Total Flink Memory)包括 JVM 堆內存(Heap Memory)、托管內存(Managed Memory)以及其他直接內存(Direct Memory)或本地內存(Native Memory)。

如果你是在本地運行 Flink(例如在 IDE 中)而非創(chuàng)建一個集群,那么本文介紹的配置并非所有都是適用的,詳情請參考本地執(zhí)行

其他情況下,配置 Flink 內存最簡單的方法就是配置總內存。此外,F(xiàn)link 也支持更細粒度的內存配置,比如說配置堆內存和托管內存

Flink 會根據默認值或其他配置參數自動調整剩余內存部分的大小。

配置堆內存和托管內存

如配置總內存中所述,另一種配置 Flink 內存的方式是同時設置任務堆內存和托管內存, 通過這種方式,用戶可以更好地掌控用于 Flink 任務的 JVM 堆內存及 Flink 的托管內存的大小。

Flink 會根據默認值或其他配置參數自動調整剩余內存部分的大小。關于各內存部分的更多細節(jié),請參考后續(xù)的內存詳解。

注意:如果已經明確設置了任務堆內存和托管內存,建議不要再設置進程總內存或 Flink 總內存,否則可能會造成內存配置沖突。

任務(算子)堆內存

如果希望確保指定大小的 JVM 堆內存給用戶代碼使用,可以明確指定任務堆內存(taskmanager.memory.task.heap.size )指定的內存將被包含在總的 JVM 堆空間中,專門用于 Flink 算子及用戶代碼的執(zhí)行。

托管內存

托管內存是由 Flink 負責分配和管理的本地(堆外)內存。以下場景需要使用托管內存:

流處理作業(yè)中用于 RocksDB State Backend。

批處理作業(yè)中用于排序、哈希表及緩存中間結果。

流處理和批處理作業(yè)中用于「在Python進程中執(zhí)行用戶自定義函數」。

可以通過以下兩種范式指定托管內存的大?。?/p>

通過 taskmanager.memory.managed.size 明確指定其大小。

通過 taskmanager.memory.managed.fraction 指定在Flink 總內存中的占比。

當同時指定二者時,會優(yōu)先采用指定的大?。⊿ize)。若二者均未指定,會根據默認占比進行計算。

消費者權重

對于包含不同種類的托管內存消費者的作業(yè),可以進一步控制托管內存如何在消費者之間分配。通過taskmanager.memory.managed.consumer-weights可以為每一種類型的消費者指定一個權重,F(xiàn)link 會按照權重的比例進行內存分配。目前支持的消費者類型包括:

DATAPROC:用于流處理中的 RocksDB State Backend 和批處理中的內置算法。

PYTHON:用戶 Python 進程。

例如,一個流處理作業(yè)同時使用到了 RocksDB State Backend 和 Python UDF,消費者權重設置為 DATAPROC:70,PYTHON:30,那么 Flink 會將 70% 的托管內存用于 RocksDB State Backend,30% 留給 Python 進程。

只有作業(yè)中包含某種類型的消費者時,F(xiàn)link 才會為該類型分配托管內存。例如,一個流處理作業(yè)使用 Heap State Backend 和 Python UDF,消費者權重設置為 DATAPROC:70,PYTHON:30,那么 Flink 會將全部托管內存用于 Python 進程,因為 Heap State Backend 不使用托管內存。

提示對于未出現(xiàn)在消費者權重中的類型,F(xiàn)link將不會為其分配托管內存。如果缺失的類型是作業(yè)運行所必須的,則會引發(fā)內存分配失敗。默認情況下,消費者權重中包含了所有可能的消費者類型。上述問題僅可能出現(xiàn)在用戶顯式地配置了消費者權重的情況下。

配置堆外內存(直接內存或本地內存)

你也可以調整框架堆外內存(Framework Off-heap Memory)。這是一個進階配置,建議僅在確定 Flink 框架需要更多的內存時調整該配置。

Flink 將框架堆外內存和任務堆外內存都計算在 JVM 的直接內存限制中。

內存詳解

TaskManager內存也包括堆內存和堆外內存。下表中列出了 Flink TaskManager內存模型的所有組成部分,以及影響其大小的相關配置參數。

我們可以看到,有些內存部分的大小可以直接通過一個配置參數進行設置,有些則需要根據多個參數進行調整。

框架內存

通常情況下,不建議對框架堆內存和框架堆外內存進行調整。除非你非??隙?Flink 的內部數據結構及操作需要更多的內存。這可能與具體的部署環(huán)境及作業(yè)結構有關,例如非常高的并發(fā)度。此外,F(xiàn)link 的部分依賴(例如 Hadoop)在某些特定的情況下也可能會需要更多的直接內存或本地內存。

提示:不管是堆內存還是堆外內存,F(xiàn)link 中的框架內存和任務內存之間目前是沒有隔離的。對框架和任務內存的區(qū)分,主要是為了在后續(xù)版本中做進一步優(yōu)化。

配置JobManager內存

配置 JobManager 內存最簡單的方法就是進程的配置總內存,本地模式下不需要為 JobManager 進行內存配置,配置參數將不會生效。

如上圖所示,下表中列出了 Flink JobManager 內存模型的所有組成部分,以及影響其大小的相關配置參數。

配置JVM堆內存

如配置總內存中所述,配置 JobManager 內存的方式是明確指定 JVM 堆內存的大?。╦obmanager.memory.heap.size)。通過這種方式,用戶可以更好地掌控用于以下用途的 JVM 堆內存大小。

Flink 框架

在作業(yè)提交時(例如一些特殊的批處理 Source)及 Checkpoint 完成的回調函數中執(zhí)行的用戶代碼

Flink 需要多少 JVM 堆內存,很大程度上取決于運行的作業(yè)數量、作業(yè)的結構及上述用戶代碼的需求。

提示:如果已經明確設置了 JVM 堆內存,建議不要再設置進程總內存或 Flink 總內存,否則可能會造成內存配置沖突。

在啟動 JobManager 進程時,F(xiàn)link 啟動腳本及客戶端通過設置 JVM 參數 -Xms 和 -Xmx 來管理 JVM 堆空間的大小。

配置堆外內存

堆外內存包括 JVM 直接內存 和 本地內存??梢酝ㄟ^配置參數 jobmanager.memory.enable-jvm-direct-memory-limit 設置是否啟用 JVM 直接內存限制。如果該配置項設置為 true,F(xiàn)link 會根據配置的堆外內存大小設置 JVM 參數 -XX:MaxDirectMemorySize。

可以通過配置參數jobmanager.memory.off-heap.size設置堆外內存的大小。如果遇到 JobManager 進程拋出 "OutOfMemoryError: Direct buffer memory"的異常,可以嘗試調大這項配置。

以下情況可能用到堆外內存:

Flink 框架依賴(例如 Akka 的網絡通信)

在作業(yè)提交時(例如一些特殊的批處理 Source)及 Checkpoint 完成的回調函數中執(zhí)行的用戶代碼

提示:如果同時配置了 Flink 總內存和 JVM 堆內存,且沒有配置堆外內存,那么堆外內存的大小將會是 Flink 總內存減去JVM 堆內存。這種情況下,堆外內存的默認大小將不會生效。

如果你是在本地運行 Flink(例如在 IDE 中)而非創(chuàng)建一個集群,那么 JobManager 的內存配置將不會生效。

內存調優(yōu)


獨立部署模式(Standalone Deployment)下的內存配置

獨立部署模式下,我們通常更關注 Flink 應用本身使用的內存大小。建議配置 Flink 總內存(taskmanager.memory.flink.size?或者?jobmanager.memory.flink.size)或其組成部分。此外,如果出現(xiàn) Metaspace 不足的問題,可以調整 JVM Metaspace 的大小。

這種情況下通常無需配置進程總內存,因為不管是 Flink 還是部署環(huán)境都不會對 JVM 開銷 進行限制,它只與機器的物理資源相關。

容器(Container)的內存配置

在容器化部署模式(Containerized Deployment)下(Kubernetes、Yarn 或 Mesos),建議配置進程總內存(taskmanager.memory.process.size或者jobmanager.memory.process.size)。該配置參數用于指定分配給 Flink JVM 進程的總內存,也就是需要申請的容器大小。

提示:如果配置了 Flink 總內存,F(xiàn)link 會自動加上 JVM 相關的內存部分,根據推算出的進程總內存大小申請容器。

注意:如果 Flink 或者用戶代碼分配超過容器大小的非托管的堆外(本地)內存,部署環(huán)境可能會殺掉超用內存的容器,造成作業(yè)執(zhí)行失敗。

State Backend 的內存配置

執(zhí)行無狀態(tài)作業(yè)或者使用 Heap State Backend(MemoryStateBackend 或 FsStateBackend)時,建議將托管內存設置為 0。這樣能夠最大化分配給 JVM 上用戶代碼的內存。

RocksDB State Backend

RocksDBStateBackend使用本地內存。默認情況下,RocksDB 會限制其內存用量不超過用戶配置的托管內存。因此,使用這種方式存儲狀態(tài)時,配置足夠多的托管內存是十分重要的。如果你關閉了 RocksDB 的內存控制,那么在容器化部署模式下如果 RocksDB 分配的內存超出了申請容器的大?。ㄟM程總內存),可能會造成 TaskExecutor 被部署環(huán)境殺掉。請同時參考如何調整 RocksDB 內存以及 state.backend.rocksdb.memory.managed。

SortMerge數據Shuffle內存配置

對于SortMerge數據Shuffle,每個ResultPartition需要的網絡緩沖區(qū)(Buffer)數目是由taskmanager.network.sort-shuffle.min-buffers這個配置決定的。它的 默認值是64,是比較小的。雖然64個網絡Buffer已經可以支持任意規(guī)模的并發(fā),但性能可能不是最好的。對于大并發(fā)的作業(yè),通 過增大這個配置值,可以提高落盤數據的壓縮率并且減少網絡小包的數量,從而有利于提高Shuffle性能。為了增大這個配置值, 你可能需要通過調整taskmanager.memory.network.fraction,taskmanager.memory.network.min和taskmanager.memory.network.max這三個參數來增大總的網絡內存大小從而避免出現(xiàn)insufficient number of network buffers錯誤。

除了網絡內存,SortMerge數據Shuffle還需要使用一些JVM Direct Memory來進行Shuffle數據的寫出與讀取。所以,為了使 用SortMerge數據Shuffle你可能還需要通過增大這個配置值taskmanager.memory.task.off-heap.size來為其來預留一些JVM Direct Memory。如果在你開啟 SortMerge數據Shuffle之后出現(xiàn)了Direct Memory OOM的錯誤,你只需要繼續(xù)加大上面的配置值來預留更多的Direct Memory 直到不再發(fā)生Direct Memory OOM的錯誤為止。


常見問題


IllegalConfigurationException

如果遇到從 TaskExecutorProcessUtils 或 JobManagerProcessUtils 拋出的 IllegalConfigurationException 異常,這通常說明您的配置參數中存在無效值(例如內存大小為負數、占比大于 1 等)或者配置沖突。請根據異常信息,確認出錯的內存部分的相關文檔及配置信息

OutOfMemoryError: Java heap space

該異常說明 JVM 的堆空間過小??梢酝ㄟ^增大總內存、TaskManager 的任務堆內存、JobManager的JVM堆內存等方法來增大JVM堆空間。

提示:也可以增大 TaskManager 的框架堆內存。這是一個進階配置,只有在確認是 Flink 框架自身需要更多內存時才應該去調整。

OutOfMemoryError: Direct buffer memory

該異常通常說明 JVM 的直接內存限制過小,或者存在直接內存泄漏(Direct Memory Leak)。請確認用戶代碼及外部依賴中是否使用了 JVM 直接內存,以及如果使用了直接內存,是否配置了足夠的內存空間??梢酝ㄟ^調整堆外內存來增大直接內存限制。

OutOfMemoryError: Metaspace

該異常說明 JVM Metaspace 限制過小??梢試L試調整 TaskManager、JobManager 的 JVM Metaspace。

IOException: Insufficient number of network buffers

該異常僅與 TaskManager 相關。

該異常通常說明網絡內存過小。可以通過調整以下配置參數增大網絡內存:

taskmanager.memory.network.min

taskmanager.memory.network.max

taskmanager.memory.network.fraction

容器(Container)內存超用

如果 Flink 容器嘗試分配超過其申請大小的內存(Yarn、Mesos 或 Kubernetes),這通常說明 Flink 沒有預留出足夠的本地內存。可以通過外部監(jiān)控系統(tǒng)或者容器被部署環(huán)境殺掉時的錯誤信息判斷是否存在容器內存超用。

對于 JobManager 進程,你還可以嘗試啟用 JVM 直接內存限制(jobmanager.memory.enable-jvm-direct-memory-limit),以排除 JVM 直接內存泄漏的可能性。

如果使用了 RocksDBStateBackend 且沒有開啟內存控制,也可以嘗試增大 TaskManager 的托管內存。

對于 JobManager 進程,你還可以嘗試啟用 JVM 直接內存限制(jobmanager.memory.enable-jvm-direct-memory-limit),以排除 JVM 直接內存泄漏的可能性。

如果使用了 RocksDBStateBackend 且沒有開啟內存控制,也可以嘗試增大 TaskManager 的托管內存。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容