flink架構(gòu)師5-CEP、一致性、YARN

一 、 CEP 0:18~1:10

二、一致性保證 1:10 ~2:25

  1. 我們使用FlinkKafkaConumser,并且啟用Checkpoint,偏移量會(huì)通過(guò)checkpoint保存到state里
    面,并且默認(rèn)會(huì)寫入到kafka的特殊主體中,也就是__consumer_offset

  2. setCommitOffsetsOnCheckpoints 默認(rèn)會(huì)true,就是把偏移量寫入特殊主題中

  3. Flink自動(dòng)重啟的過(guò)程中,讀取的偏移量是state中的偏移量,如果state里面沒(méi)有那么從
    __consumer_offset里讀取偏移量,如果__consumer_offset里面沒(méi)有那么就會(huì)從earliest或者lastest讀取數(shù)據(jù)

redis通過(guò)冪等性實(shí)現(xiàn)僅一次語(yǔ)義

4.1.5 寫kafka保證Exactly once

兩階段提交
Flink的兩階段提交
核心源碼
通過(guò)冪等性實(shí)現(xiàn)僅一次語(yǔ)義
在分布式系統(tǒng)中,可以使用兩階段提交來(lái)實(shí)現(xiàn)事務(wù)性從而保證數(shù)據(jù)的一致性,兩階段提交分為:預(yù)提交階段與
提交階段,通常包含兩個(gè)角色:協(xié)調(diào)者與執(zhí)行者,協(xié)調(diào)者用于用于管理所有執(zhí)行者的操作,執(zhí)行者用于執(zhí)行具
體的提交操作,具體的操作流程:

  1. 首先協(xié)調(diào)者會(huì)送預(yù)提交(pre-commit)命令有的執(zhí)行者
  2. 執(zhí)行者執(zhí)行預(yù)提交操作然后發(fā)送一條反饋(ack)消息給協(xié)調(diào)者
  3. 待協(xié)調(diào)者收到所有執(zhí)行者的成功反饋,則發(fā)送一條提交信息(commit)給執(zhí)行者
  4. 執(zhí)行者執(zhí)行提交操作
    如果在流程2中部分預(yù)提交失敗,那么協(xié)調(diào)者就會(huì)收到一條失敗的反饋,則會(huì)發(fā)送一條rollback消息給所有執(zhí)
    行者,執(zhí)行回滾操作,保證數(shù)據(jù)一致性;但是如果在流程4中,出現(xiàn)部分提交成功部分提交失敗,那么就會(huì)造
    成數(shù)據(jù)的不一致,因此后面也提出了3PC或者通過(guò)其他補(bǔ)償機(jī)制來(lái)保證數(shù)據(jù)最終一致性
    flink中兩階段提交是為了保證端到端的Exactly Once,主要依托checkpoint機(jī)制來(lái)實(shí)現(xiàn),先看一下
    checkpoint的整體流程,
    1.JobManager會(huì)周期性的發(fā)送執(zhí)行checkpoint命令(start checkpoint);
    2.當(dāng)source端收到執(zhí)行指令后會(huì)產(chǎn)生一條barrier消息插入到input消息隊(duì)列中,當(dāng)處理到barrier時(shí)
    會(huì)執(zhí)行本地checkpoint, 并且會(huì)將barrier發(fā)送到下一個(gè)節(jié)點(diǎn),當(dāng)checkpoint完成之后會(huì)發(fā)送一條ack信
    息給JobManager;
  5. 當(dāng)所有節(jié)點(diǎn)都完成checkpoint之后,JobManager會(huì)收到來(lái)自所有節(jié)點(diǎn)的ack信息,那么就表示一次
    完整的checkpoint的完成;
  6. JobManager會(huì)給所有節(jié)點(diǎn)發(fā)送一條callback信息,表示通知checkpoint完成消息。接下來(lái)就可以
    提交事務(wù)了
    對(duì)比f(wàn)link整個(gè)checkpoint機(jī)制調(diào)用流程可以發(fā)現(xiàn)與2PC非常相似,JobManager相當(dāng)于協(xié)調(diào)者,flink提
    供了CheckpointedFunction與CheckpointListener這樣兩個(gè)接口,CheckpointedFunction中有
    snapshotState方法,每次checkpoint觸發(fā)執(zhí)行方法,通常會(huì)將緩存數(shù)據(jù)放入狀態(tài)中,可以理解為是一個(gè)
    hook,這個(gè)方法里面可以實(shí)現(xiàn)預(yù)提交,CheckpointListener中有notifyCheckpointComplete方法,
    checkpoint完成之后的通知方法,這里可以做一些額外的操作,比如真正提交kafka的事務(wù);在2PC中提到
    如果對(duì)應(yīng)流程2預(yù)提交失敗,那么本次checkpoint就被取消不會(huì)執(zhí)行,不會(huì)影響數(shù)據(jù)一致性.如果流程4失
    敗,那么重啟從上一次的checkpoints重新計(jì)算。

三、Flink On Yarn 2:25 ~ 2:35

第一種【yarn-session.sh(開(kāi)辟資源)+flink run(提交任務(wù))】
啟動(dòng)一個(gè)一直運(yùn)行的flink集群
/bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
?執(zhí)行任務(wù)
?./bin/flink run WordCount.jar --hostname xxx --port 8888
停止任務(wù) 【web界面或者命令行執(zhí)行cancel命令】

第二種【flink run -m yarn-cluster(開(kāi)辟資源+提交任務(wù))】
? 啟動(dòng)集群,執(zhí)行任務(wù)
?./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024
./examples/batch/WordCount.jar
注意:client端必須要設(shè)置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME環(huán)境變量,通過(guò)這
個(gè)環(huán)境變量來(lái)讀取YARN和HDFS的配置信息,否則啟動(dòng)會(huì)失敗

image.png

四、SQL 流平臺(tái) 2:35 ~2:53

基于袋鼠云

五、作業(yè) 2:53 ~3:13

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容