一 、 CEP 0:18~1:10
二、一致性保證 1:10 ~2:25
我們使用FlinkKafkaConumser,并且啟用Checkpoint,偏移量會(huì)通過(guò)checkpoint保存到state里
面,并且默認(rèn)會(huì)寫入到kafka的特殊主體中,也就是__consumer_offsetsetCommitOffsetsOnCheckpoints 默認(rèn)會(huì)true,就是把偏移量寫入特殊主題中
Flink自動(dòng)重啟的過(guò)程中,讀取的偏移量是state中的偏移量,如果state里面沒(méi)有那么從
__consumer_offset里讀取偏移量,如果__consumer_offset里面沒(méi)有那么就會(huì)從earliest或者lastest讀取數(shù)據(jù)
redis通過(guò)冪等性實(shí)現(xiàn)僅一次語(yǔ)義
4.1.5 寫kafka保證Exactly once
兩階段提交
Flink的兩階段提交
核心源碼
通過(guò)冪等性實(shí)現(xiàn)僅一次語(yǔ)義
在分布式系統(tǒng)中,可以使用兩階段提交來(lái)實(shí)現(xiàn)事務(wù)性從而保證數(shù)據(jù)的一致性,兩階段提交分為:預(yù)提交階段與
提交階段,通常包含兩個(gè)角色:協(xié)調(diào)者與執(zhí)行者,協(xié)調(diào)者用于用于管理所有執(zhí)行者的操作,執(zhí)行者用于執(zhí)行具
體的提交操作,具體的操作流程:
- 首先協(xié)調(diào)者會(huì)送預(yù)提交(pre-commit)命令有的執(zhí)行者
- 執(zhí)行者執(zhí)行預(yù)提交操作然后發(fā)送一條反饋(ack)消息給協(xié)調(diào)者
- 待協(xié)調(diào)者收到所有執(zhí)行者的成功反饋,則發(fā)送一條提交信息(commit)給執(zhí)行者
- 執(zhí)行者執(zhí)行提交操作
如果在流程2中部分預(yù)提交失敗,那么協(xié)調(diào)者就會(huì)收到一條失敗的反饋,則會(huì)發(fā)送一條rollback消息給所有執(zhí)
行者,執(zhí)行回滾操作,保證數(shù)據(jù)一致性;但是如果在流程4中,出現(xiàn)部分提交成功部分提交失敗,那么就會(huì)造
成數(shù)據(jù)的不一致,因此后面也提出了3PC或者通過(guò)其他補(bǔ)償機(jī)制來(lái)保證數(shù)據(jù)最終一致性
flink中兩階段提交是為了保證端到端的Exactly Once,主要依托checkpoint機(jī)制來(lái)實(shí)現(xiàn),先看一下
checkpoint的整體流程,
1.JobManager會(huì)周期性的發(fā)送執(zhí)行checkpoint命令(start checkpoint);
2.當(dāng)source端收到執(zhí)行指令后會(huì)產(chǎn)生一條barrier消息插入到input消息隊(duì)列中,當(dāng)處理到barrier時(shí)
會(huì)執(zhí)行本地checkpoint, 并且會(huì)將barrier發(fā)送到下一個(gè)節(jié)點(diǎn),當(dāng)checkpoint完成之后會(huì)發(fā)送一條ack信
息給JobManager; - 當(dāng)所有節(jié)點(diǎn)都完成checkpoint之后,JobManager會(huì)收到來(lái)自所有節(jié)點(diǎn)的ack信息,那么就表示一次
完整的checkpoint的完成; - JobManager會(huì)給所有節(jié)點(diǎn)發(fā)送一條callback信息,表示通知checkpoint完成消息。接下來(lái)就可以
提交事務(wù)了
對(duì)比f(wàn)link整個(gè)checkpoint機(jī)制調(diào)用流程可以發(fā)現(xiàn)與2PC非常相似,JobManager相當(dāng)于協(xié)調(diào)者,flink提
供了CheckpointedFunction與CheckpointListener這樣兩個(gè)接口,CheckpointedFunction中有
snapshotState方法,每次checkpoint觸發(fā)執(zhí)行方法,通常會(huì)將緩存數(shù)據(jù)放入狀態(tài)中,可以理解為是一個(gè)
hook,這個(gè)方法里面可以實(shí)現(xiàn)預(yù)提交,CheckpointListener中有notifyCheckpointComplete方法,
checkpoint完成之后的通知方法,這里可以做一些額外的操作,比如真正提交kafka的事務(wù);在2PC中提到
如果對(duì)應(yīng)流程2預(yù)提交失敗,那么本次checkpoint就被取消不會(huì)執(zhí)行,不會(huì)影響數(shù)據(jù)一致性.如果流程4失
敗,那么重啟從上一次的checkpoints重新計(jì)算。
三、Flink On Yarn 2:25 ~ 2:35
第一種【yarn-session.sh(開(kāi)辟資源)+flink run(提交任務(wù))】
啟動(dòng)一個(gè)一直運(yùn)行的flink集群
/bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
?執(zhí)行任務(wù)
?./bin/flink run WordCount.jar --hostname xxx --port 8888
停止任務(wù) 【web界面或者命令行執(zhí)行cancel命令】
第二種【flink run -m yarn-cluster(開(kāi)辟資源+提交任務(wù))】
? 啟動(dòng)集群,執(zhí)行任務(wù)
?./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024
./examples/batch/WordCount.jar
注意:client端必須要設(shè)置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME環(huán)境變量,通過(guò)這
個(gè)環(huán)境變量來(lái)讀取YARN和HDFS的配置信息,否則啟動(dòng)會(huì)失敗

四、SQL 流平臺(tái) 2:35 ~2:53
基于袋鼠云