Kafka保證消息不丟失不重復(fù)

簡(jiǎn)單總結(jié):
消費(fèi)端重復(fù)消費(fèi):建立去重表
消費(fèi)端丟失數(shù)據(jù):關(guān)閉自動(dòng)提交offset,處理完之后受到移位,enable.auto.commit=false 關(guān)閉自動(dòng)提交位移
生產(chǎn)端重復(fù)發(fā)送:消費(fèi)端消費(fèi)之前從去重表中判重
生產(chǎn)端丟失數(shù)據(jù):這個(gè)是最麻煩的情況
解決策略:
1.異步方式緩沖區(qū)滿了,就阻塞在那,等著緩沖區(qū)可用,不能清空緩沖區(qū)
2.發(fā)送消息之后回調(diào)函數(shù),發(fā)送成功就發(fā)送下一條,發(fā)送失敗就記在日志中,等著定時(shí)腳本來掃描(發(fā)送失敗可能并不真的發(fā)送失敗,只是沒收到反饋,定時(shí)腳本可能會(huì)重發(fā))

數(shù)據(jù)丟失情況:
1)使用同步模式的時(shí)候,有3種狀態(tài)保證消息被安全生產(chǎn),在配置為1(只保證寫入leader成功)的話,如果剛好leader partition掛了,數(shù)據(jù)就會(huì)丟失。
2)還有一種情況可能會(huì)丟失消息,就是使用異步模式的時(shí)候,當(dāng)緩沖區(qū)滿了,如果配置為0(還沒有收到確認(rèn)的情況下,緩沖池一滿,就清空緩沖池里的消息),數(shù)據(jù)就會(huì)被立即丟棄掉。

只要能避免上述兩種情況,那么就可以保證消息不會(huì)被丟失。
1)就是說在同步模式的時(shí)候,確認(rèn)機(jī)制設(shè)置為-1,也就是讓消息寫入leader和所有的副本。
2)還有,在異步模式下,如果消息發(fā)出去了,但還沒有收到確認(rèn)的時(shí)候,緩沖池滿了,在配置文件中設(shè)置成不限制阻塞超時(shí)的時(shí)間,也就說讓生產(chǎn)端一直阻塞,這樣也能保證數(shù)據(jù)不會(huì)丟失。

ack:
ack確認(rèn)機(jī)制設(shè)置為0,表示不等待響應(yīng),不等待borker的確認(rèn)信息,最小延遲,producer無法知道消息是否發(fā)生成功,消息可能丟失,但具有最大吞吐量。
ack確認(rèn)機(jī)制設(shè)置為-1,也就是讓消息寫入leader和所有的副本,ISR列表中的所有replica都返回確認(rèn)消息。
ack確認(rèn)機(jī)制設(shè)置為1,leader已經(jīng)接收了數(shù)據(jù)的確認(rèn)信息,replica異步拉取消息,比較折中。
ack確認(rèn)機(jī)制設(shè)置為2,表示producer寫partition leader和其他一個(gè)follower成功的時(shí)候,broker就返回成功,無論其他的partition follower是否寫成功。
ack確認(rèn)機(jī)制設(shè)置為 "all" 即所有副本都同步到數(shù)據(jù)時(shí)send方法才返回, 以此來完全判斷數(shù)據(jù)是否發(fā)送成功, 理論上來講數(shù)據(jù)不會(huì)丟失。
min.insync.replicas=1 意思是至少有1個(gè)replica返回成功,否則product異常

總結(jié):
消息的完整性和系統(tǒng)的吞吐量是互斥的,為了確保消息不丟失就必然會(huì)損失系統(tǒng)的吞吐量
producer:
1、ack設(shè)置-1
2、設(shè)置副本同步成功的最小同步個(gè)數(shù)為副本數(shù)-1
3、加大重試次數(shù)
4、同步發(fā)送
5、對(duì)于單條數(shù)據(jù)過大,要設(shè)置可接收的單條數(shù)據(jù)的大小
6、對(duì)于異步發(fā)送,通過回調(diào)函數(shù)來感知丟消息,使用KafkaProducer.send(record, callback)方法而不是send(record)方法
7、配置不允許非ISR(In-Sync Replicas,副本同步隊(duì)列)集合中的副本當(dāng)leader。所有的副本(replicas)統(tǒng)稱為 Assigned Replicas,即 AR
8、客戶端緩沖區(qū)滿了也可能會(huì)丟消息;或者異步情況下消息在客戶端緩沖區(qū)還未發(fā)送,客戶端就宕機(jī)
9、block.on.buffer.full = true
consumer:
1、enable.auto.commit=false 關(guān)閉自動(dòng)提交位移

unclean.leader.election.enable 設(shè)置為 false(默認(rèn)參數(shù)為 true),意思是,當(dāng)存有你最新一條記錄的 replication 宕機(jī)的時(shí)候,Kafka 自己會(huì)選舉出一個(gè)主節(jié)點(diǎn),如果默認(rèn)允許還未同步你最新數(shù)據(jù)的 replication 所在的節(jié)點(diǎn)被選舉為主節(jié)點(diǎn)的話,你的數(shù)據(jù)將會(huì)丟失,因此這里應(yīng)該按需將參數(shù)調(diào)控為 false;

retries設(shè)置大一些。設(shè)置大于0的值將使客戶端重新發(fā)送任何數(shù)據(jù),一旦這些數(shù)據(jù)發(fā)送失敗。注意,這些重試與客戶端接收到發(fā)送錯(cuò)誤時(shí)的重試沒有什么不同。允許重試將潛在的改變數(shù)據(jù)的順序,如果這兩個(gè)消息記錄都是發(fā)送到同一個(gè)partition,則第一個(gè)消息失敗第二個(gè)發(fā)送成功,則第二條消息會(huì)比第一條消息出現(xiàn)要早。

replication.factor > min.insync.replicas。如果兩者相等,當(dāng)一個(gè)副本掛掉了分區(qū)也就沒法正常工作了。通常設(shè)置replication.factor = min.insync.replicas + 1即可。

同一分區(qū)消息亂序:
假設(shè)a,b兩條消息,a先發(fā)送后由于發(fā)送失敗重試,這時(shí)順序就會(huì)在b的消息后面,可以設(shè)置max.in.flight.requests.per.connection=1來避免
max.in.flight.requests.per.connection:限制客戶端在單個(gè)連接上能夠發(fā)送的未響應(yīng)請(qǐng)求的個(gè)數(shù)。設(shè)置此值是1表示kafka broker在響應(yīng)請(qǐng)求之前client不能再向同一個(gè)broker發(fā)送請(qǐng)求,但吞吐量會(huì)下降

0.11.0之后的版本:
冪等性發(fā)送:
引入了Producer ID(PID)和Sequence Number實(shí)現(xiàn)Producer的冪等語(yǔ)義。

  • Producer ID:每個(gè)新的Producer在初始化的時(shí)候會(huì)被分配一個(gè)唯一的PID
  • Sequence Number:對(duì)于每個(gè)PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個(gè)<Topic, Partition>都對(duì)應(yīng)一個(gè)從0開始單調(diào)遞增的Sequence Number。

Broker端也會(huì)為每個(gè)<PID, Topic, Partition>維護(hù)一個(gè)序號(hào),并且每次Commit一條消息時(shí)將其對(duì)應(yīng)序號(hào)遞增。對(duì)于接收的每條消息,如果其序號(hào)比Broker維護(hù)的序號(hào)(即最后一次Commit的消息的序號(hào))大一,則Broker會(huì)接受它,否則將其丟棄:

  • 如果消息序號(hào)比Broker維護(hù)的序號(hào)大一以上,說明中間有數(shù)據(jù)尚未寫入,也即亂序,此時(shí)Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 如果消息序號(hào)小于等于Broker維護(hù)的序號(hào),說明該消息已被保存,即為重復(fù)消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber

這種機(jī)制很好的解決了數(shù)據(jù)重復(fù)和數(shù)據(jù)亂序的問題。
事務(wù)機(jī)制:
多個(gè)操作要么全部成功要么全部失敗。Kafka事務(wù)的本質(zhì)是,將一組寫操作(如果有)對(duì)應(yīng)的消息與一組讀操作(如果有)對(duì)應(yīng)的Offset的更新進(jìn)行同樣的標(biāo)記(即Transaction Marker)來實(shí)現(xiàn)事務(wù)中涉及的所有讀寫操作同時(shí)對(duì)外可見或同時(shí)對(duì)外不可見。

補(bǔ)充ISR:
HW 俗稱高水位,HighWatermark 的縮寫,取一個(gè) partition 對(duì)應(yīng)的 ISR 中最小的 LEO 作為 HW,consumer 最多只能消費(fèi)到 HW 所在的位置。另外每個(gè) replica 都有 HW,leader 和 follower 各自負(fù)責(zé)更新自己的 HW 的狀態(tài)。對(duì)于 leader 新寫入的消息,consumer 不能立刻消費(fèi),leader 會(huì)等待該消息被所有 ISR 中的 replicas 同步后更新 HW,此時(shí)消息才能被 consumer 消費(fèi)。這樣就保證了如果 leader 所在的 broker 失效,該消息仍然可以從新選舉的 leader 中獲取。對(duì)于來自內(nèi)部 broKer 的讀取請(qǐng)求,沒有 HW 的限制。
下圖詳細(xì)的說明了當(dāng) producer 生產(chǎn)消息至 broker 后,ISR 以及 HW 和 LEO 的流轉(zhuǎn)過程:


由此可見,Kafka 的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。事實(shí)上,同步復(fù)制要求所有能工作的 follower 都復(fù)制完,這條消息才會(huì)被 commit,這種復(fù)制方式極大的影響了吞吐率。而異步復(fù)制方式下,follower 異步的從 leader 復(fù)制數(shù)據(jù),數(shù)據(jù)只要被 leader 寫入 log 就被認(rèn)為已經(jīng) commit,這種情況下如果 follower 都還沒有復(fù)制完,落后于 leader 時(shí),突然 leader 宕機(jī),則會(huì)丟失數(shù)據(jù)。而 Kafka 的這種使用 ISR 的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。
Kafka 的 ISR 的管理最終都會(huì)反饋到 Zookeeper 節(jié)點(diǎn)上。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state。目前有兩個(gè)地方會(huì)對(duì)這個(gè) Zookeeper 的節(jié)點(diǎn)進(jìn)行維護(hù):
1.Controller 來維護(hù):Kafka 集群中的其中一個(gè) Broker 會(huì)被選舉為 Controller,主要負(fù)責(zé) Partition 管理和副本狀態(tài)管理,也會(huì)執(zhí)行類似于重分配 partition 之類的管理任務(wù)。在符合某些特定條件下,Controller 下的 LeaderSelector 會(huì)選舉新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 寫入 Zookeeper 的相關(guān)節(jié)點(diǎn)中。同時(shí)發(fā)起 LeaderAndIsrRequest 通知所有的 replicas。
2.leader 來維護(hù):leader 有單獨(dú)的線程定期檢測(cè) ISR 中 follower 是否脫離 ISR, 如果發(fā)現(xiàn) ISR 變化,則會(huì)將新的 ISR 的信息返回到 Zookeeper 的相關(guān)節(jié)點(diǎn)中。

參考文獻(xiàn):
https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容