2020-11.05-Flink-12(有狀態(tài)算子和應(yīng)用)

有狀態(tài)算子及用戶函數(shù)都是流應(yīng)用中常見(jiàn)組成部分,事實(shí)上,由于數(shù)據(jù)會(huì)隨著時(shí)間以流式到來(lái),大多數(shù)復(fù)雜一些的操作都需要存儲(chǔ)部分?jǐn)?shù)據(jù)或者中間結(jié)果,很多Flink內(nèi)置的DataStream算子、數(shù)據(jù)源以及數(shù)據(jù)匯都是有狀態(tài)的,它們需要對(duì)數(shù)據(jù)記錄進(jìn)行緩沖或者對(duì)中間結(jié)果以及元數(shù)據(jù)加以維護(hù)

1.實(shí)現(xiàn)有狀態(tài)函數(shù)

鍵值分區(qū)狀態(tài)只能由作用在KeyedStream上面的函數(shù)使用,這個(gè)可以通過(guò)DataStream.keyBy()方法來(lái)得到一個(gè)KeyedStream。KeyedStream會(huì)根據(jù)指定鍵值進(jìn)行分區(qū)并記住鍵值的定義,作用在KeyedStream上的算子可以訪問(wèn)它的鍵值定義上下文信息。Flink為鍵值分區(qū)狀態(tài)提供了多種數(shù)據(jù)類型,每個(gè)類型對(duì)應(yīng)了一種狀態(tài)結(jié)構(gòu)()數(shù)據(jù)結(jié)構(gòu),用戶可以根據(jù)自定義函數(shù)與狀態(tài)的交互方式或性能選擇不同的狀態(tài)類型:

  1. ValueState[T]:用于保存類型為T的單個(gè)值??捎梅椒ㄓ衯alue()、update();
  2. ListState[T]:用list結(jié)構(gòu)保存多個(gè)類型為T的元素,常用方法get()、add()、addAll()、update()等,但是它不支持刪除單個(gè)元素,我們可以使用update()方法更新整個(gè)列表,使用給定的列表值替換已有值;
  3. MapState[K,V]:用于保存一組鍵到值的映射;
  4. ReducingState[T]:提供了和ListState[T]相同的方法(除了addAll、update),但是你需要傳遞一個(gè)聚合函數(shù)ReduceFunction用來(lái)對(duì)存入的數(shù)據(jù)進(jìn)行聚合;
  5. AggregatingState[I,O]:和ReducingState[T]行為類似,但它使用了更加通用的AggregatingFunction來(lái)聚合狀態(tài)內(nèi)部的值。

總結(jié)

  1. 當(dāng)我們創(chuàng)建一個(gè)狀態(tài)對(duì)象時(shí),我們需要利用RichFunction中的RuntimeContext在Flink運(yùn)行時(shí)中注冊(cè)一個(gè)StateDescriptor;
  2. 每個(gè)狀態(tài)類型都有自己特定的StateDescriptor,入?yún)⒅幸獙懭霠顟B(tài)名稱與類型,ReducingState和AggregatingState的描述符還需要接收一個(gè)ReducingFunction或AggregatingFunction對(duì)象,以此來(lái)對(duì)加入的值進(jìn)行聚合;
  3. 狀態(tài)名稱的作用域是整個(gè)算子,我們可以通過(guò)在函數(shù)中注冊(cè)多個(gè)狀態(tài)描述符來(lái)創(chuàng)建多個(gè)狀態(tài)對(duì)象;
  4. 狀態(tài)類型可以通過(guò)Class或TypeInformation對(duì)象指定,因?yàn)镕link要為狀態(tài)創(chuàng)建合適的序列化器,所有類型指定是強(qiáng)制的。
  5. 通常情況下,狀態(tài)引用對(duì)象要在RichFunction的open()方法中初始化;
  6. 我們一般會(huì)將狀態(tài)引用對(duì)象聲明為函數(shù)類的普通成員變量;
  7. 對(duì)于函數(shù)類得外部傳入的參數(shù),我們也是以普通成員變量的方式通過(guò)構(gòu)造函數(shù)傳入;
  8. 狀態(tài)引用對(duì)象只提供用于訪問(wèn)狀態(tài)的接口而不會(huì)存儲(chǔ)狀態(tài)本身,具體保存工作交由狀態(tài)后端完成。
通過(guò)ListCheckpointed接口實(shí)現(xiàn)算子列表狀態(tài)(☆)

snapshotState方法會(huì)在Flink觸發(fā)為有狀態(tài)函數(shù)生成檢查點(diǎn)時(shí)調(diào)用
restoreState方法會(huì)在初始化函數(shù)狀態(tài)時(shí)調(diào)用
(沒(méi)用過(guò),學(xué)習(xí)一下)

使用聯(lián)結(jié)的廣播狀態(tài)

(沒(méi)用過(guò),學(xué)習(xí)一下)

2.為有狀態(tài)的應(yīng)用開(kāi)啟故障恢復(fù)

啟動(dòng)周期性檢查點(diǎn)功能

3.確保有狀態(tài)應(yīng)用的可維護(hù)性

flink利用保存點(diǎn)機(jī)制來(lái)對(duì)應(yīng)用及其狀態(tài)進(jìn)行維護(hù),但是需要初始版本應(yīng)用的全部有狀態(tài)算子都指定好兩個(gè)參數(shù),才可以在未來(lái)正常使用,這兩個(gè)參數(shù)是算子唯一標(biāo)識(shí)和最大并行度
算子的唯一標(biāo)識(shí)和最大并行度會(huì)被固定在保存點(diǎn)上,不可更改.一旦修改只能丟棄從頭開(kāi)始運(yùn)行

指定算子唯一標(biāo)識(shí)

uid方法

為使用鍵值分區(qū)狀態(tài)的算子定義最大并行度

4.有狀態(tài)應(yīng)用的性能及魯棒性

選擇狀態(tài)后端
  1. MemoryStateBackend

MemoryStateBackend 是將狀態(tài)維護(hù)在 Java 堆上的一個(gè)內(nèi)部狀態(tài)后端。鍵值狀態(tài)和窗口算子使用哈希表來(lái)存儲(chǔ)數(shù)據(jù)(values)和定時(shí)器(timers)。當(dāng)應(yīng)用程序 checkpoint 時(shí),此后端會(huì)在將狀態(tài)發(fā)給 JobManager 之前快照下?tīng)顟B(tài),JobManager 也將狀態(tài)存儲(chǔ)在 Java 堆上。默認(rèn)情況下,MemoryStateBackend 配置成支持異步快照。異步快照可以避免阻塞數(shù)據(jù)流的處理,從而避免反壓的發(fā)生。當(dāng)然,使用 new MemoryStateBackend(MAX_MEM_STATE_SIZE, false)也可以禁用該特點(diǎn)

默認(rèn)情況下,每一個(gè)狀態(tài)的大小限制為 5 MB。可以通過(guò) MemoryStateBackend 的構(gòu)造函數(shù)增加這個(gè)大小。狀態(tài)大小受到 akka 幀大小的限制(maxStateSize <= akka.framesize 默認(rèn) 10 M),所以無(wú)論怎么調(diào)整狀態(tài)大小配置,都不能大于 akka 的幀大小。也可以通過(guò) akka.framesize 調(diào)整 akka 幀大小。
狀態(tài)的總大小不能超過(guò) JobManager 的內(nèi)存。

  1. FsStateBackend

FsStateBackend需要配置的主要是文件系統(tǒng),如 URL(類型,地址,路徑)。舉個(gè)例子,比如可以是:“hdfs://namenode:40010/flink/checkpoints” 或“s3://flink/checkpoints

當(dāng)選擇使用 FsStateBackend時(shí),正在進(jìn)行的數(shù)據(jù)會(huì)被存在TaskManager的內(nèi)存中。在checkpoint時(shí),此后端會(huì)將狀態(tài)快照寫入配置的文件系統(tǒng)和目錄的文件中,同時(shí)會(huì)在JobManager的內(nèi)存中(在高可用場(chǎng)景下會(huì)存在 Zookeeper 中)存儲(chǔ)極少的元數(shù)據(jù)。容量限制上,單 TaskManager 上 State 總量不超過(guò)它的內(nèi)存,總大小不超過(guò)配置的文件系統(tǒng)容量。

默認(rèn)情況下,F(xiàn)sStateBackend 配置成提供異步快照,以避免在狀態(tài) checkpoint 時(shí)阻塞數(shù)據(jù)流的處理。該特性可以實(shí)例化 FsStateBackend 時(shí)傳入false的布爾標(biāo)志來(lái)禁用掉,例如:new FsStateBackend(path, false)

  1. RocksDBStateBackend

RocksDBStateBackend 的配置也需要一個(gè)文件系統(tǒng)(類型,地址,路徑),如下所示:
hdfs://namenode:40010/flink/checkpoints” 或“s3://flink/checkpoints
RocksDB 是一種嵌入式的本地?cái)?shù)據(jù)庫(kù)。RocksDBStateBackend 將處理中的數(shù)據(jù)使用 RocksDB 存儲(chǔ)在本地磁盤上。在 checkpoint 時(shí),整個(gè) RocksDB 數(shù)據(jù)庫(kù)會(huì)被存儲(chǔ)到配置的文件系統(tǒng)中,或者在超大狀態(tài)作業(yè)時(shí)可以將增量的數(shù)據(jù)存儲(chǔ)到配置的文件系統(tǒng)中。同時(shí) Flink 會(huì)將極少的元數(shù)據(jù)存儲(chǔ)在 JobManager 的內(nèi)存中,或者在 Zookeeper 中(對(duì)于高可用的情況)。RocksDB 默認(rèn)也是配置成異步快照的模式。

RocksDB是一個(gè) key/value 的內(nèi)存存儲(chǔ)系統(tǒng),和其他的 key/value 一樣,先將狀態(tài)放到內(nèi)存中,如果內(nèi)存快滿時(shí),則寫入到磁盤中,但需要注意RocksDB不支持同步的 Checkpoint,構(gòu)造方法中沒(méi)有同步快照這個(gè)選項(xiàng)。不過(guò)RocksDB支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲(chǔ)在外部文件系統(tǒng)(本地或HDFS),其容量限制只要單個(gè) TaskManager 上 State 總量不超過(guò)它的內(nèi)存+磁盤,單Key最大2G,總大小不超過(guò)配置的文件系統(tǒng)容量即可。

(學(xué)習(xí)一下)???

選擇狀態(tài)原語(yǔ)

有狀態(tài)算子(無(wú)論是內(nèi)置的還是用戶自定義的)的性能取決于多個(gè)方面,包含狀態(tài)的數(shù)據(jù)類型,應(yīng)用的狀態(tài)后端以及所選的狀態(tài)原語(yǔ)
建議每次函數(shù)調(diào)用只更新一次狀態(tài)

防止?fàn)顟B(tài)泄露

5.更新有狀態(tài)應(yīng)用

保存現(xiàn)有狀態(tài)更新應(yīng)用
從應(yīng)用中刪除狀態(tài)
修改算子的狀態(tài)
可查詢狀態(tài)服務(wù)的架構(gòu)及啟用方式
  1. QueryableStateClient: 客戶端。運(yùn)行在外部系統(tǒng)。提交查詢請(qǐng)求并接收最終返回的結(jié)果
  2. QueryableStateClientProxy: 客戶端代理。運(yùn)行在每個(gè)TaskManager上。接收客戶端的請(qǐng)求,找到Key對(duì)應(yīng)的TaskManager,然后將請(qǐng)求轉(zhuǎn)發(fā)給具體的查詢服務(wù),并負(fù)責(zé)最終向客戶端返回結(jié)果
  3. QueryableStateServer: 查詢服務(wù)。運(yùn)行在每個(gè)TaskManager上。處理來(lái)自客戶端代理的請(qǐng)求并返回結(jié)果

可查詢狀態(tài)實(shí)現(xiàn)案例:https://blog.csdn.net/wangpei1949/article/details/100608828

(學(xué)習(xí)一下)

對(duì)外暴露可查詢式狀態(tài)
從外部系統(tǒng)查詢狀態(tài)

https://blog.csdn.net/weixin_45366499/article/details/115442928

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容