怎么確保produce(生產(chǎn)者不丟數(shù)據(jù))
答:將ack機制設(shè)置為all,但是效率會降低。ack機制流程圖如下:

講一講kafka的ack三種機制
request.required.acks有三個值:0,1,-1
0表示生產(chǎn)者不會等待broker的ack,這個延遲最低但是存儲的保證最弱,當(dāng)server掛掉的時候數(shù)據(jù)就會丟失
1表示生產(chǎn)者會等待ack值leader副本確認(rèn)接收此消息后發(fā)送ack,但是如果leader掛掉后它不確保是否復(fù)制完成新leader也會丟失數(shù)據(jù)。
-1表示all,服務(wù)端會等待所有follower的副本收到數(shù)據(jù)后,才會收到leader發(fā)送出的ack,保證數(shù)據(jù)不丟失。
kafka與傳統(tǒng)MQ消息系統(tǒng)之間有三個關(guān)鍵區(qū)別:
1.kafka持久化日志,這些日志可以重復(fù)讀取和無限期保留
2.kafka是一個分布式系統(tǒng),以集群的方式運行,可以靈活伸縮,內(nèi)部采用復(fù)制數(shù)據(jù)提升了容錯性和高可用性
3.kafka支持實時流式處理
kafka可以脫離zookper單獨使用嗎,為什么
不能,因為kafka使用zookper管理和協(xié)調(diào)kafka的節(jié)點服務(wù)器。
kafka有幾種數(shù)據(jù)保留策略
兩種保留策略:按照過期時間保留 。按照存儲消息大小保留
kafka同時設(shè)置了7天和10G清除數(shù)據(jù),到第五天消息達(dá)到10G,這個時候kafka將如何處理:
kafka會將數(shù)據(jù)清理,時間個大小不論哪個滿足條件,都會清空數(shù)據(jù)。
什么情況會導(dǎo)致kafka變慢:
cpu瓶頸,磁盤讀寫瓶頸,網(wǎng)絡(luò)瓶頸
使用kafka集群時需要注意什么:
集群的數(shù)量不是越多越好,最好不要超過7個。
因為節(jié)點越多,消息復(fù)制時間越長,整個群組吞吐量就降低。
集群數(shù)量最好是單數(shù),因為超過一半故障的集群就不能用了,設(shè)置為單數(shù)容錯性更高。
關(guān)于零拷貝:

1、應(yīng)用程序中調(diào)用read() 方法,這里會涉及到一次上下文切換(用戶態(tài)->內(nèi)核態(tài)),底層采用DMA(direct memory access)讀取磁盤的文件,并把內(nèi)容存儲到內(nèi)核地址空間的讀取緩存區(qū)。
2、由于應(yīng)用程序無法讀取內(nèi)核地址空間的數(shù)據(jù),如果應(yīng)用程序要操作這些數(shù)據(jù),必須把這些內(nèi)容從讀取緩沖區(qū)拷貝到用戶緩沖區(qū)。這個時候,read() 調(diào)用返回,且引發(fā)一次上下文切換(內(nèi)核態(tài)->用戶態(tài)),現(xiàn)在數(shù)據(jù)已經(jīng)被拷貝到了用戶地址空間緩沖區(qū),這時,如果有需要,應(yīng)用程序可以操作修改這些內(nèi)容。
3、我們最終目的是把這個文件內(nèi)容通過Socket傳到另一個服務(wù)中,調(diào)用Socket的send()方法,這里又涉及到一次上下文切換(用戶態(tài)->內(nèi)核態(tài)),同時,文件內(nèi)容被進(jìn)行第三次拷貝,被再次拷貝到內(nèi)核地址空間緩沖區(qū),但是這次的緩沖區(qū)與目標(biāo)套接字相關(guān)聯(lián),與讀取緩沖區(qū)沒有半點關(guān)系。
4、send()調(diào)用返回,引發(fā)第四次的上下文切換,同時進(jìn)行第四次的數(shù)據(jù)拷貝,通過DMA把數(shù)據(jù)從目標(biāo)套接字相關(guān)的緩存區(qū)傳到協(xié)議引擎進(jìn)行發(fā)送。
"在整個過程中,過程1和4是由DMA負(fù)責(zé),并不會消耗CPU,只有過程2和3的拷貝需要CPU參與,整明白了?"
"我消化一下..."
半小時后...
"狼哥,這個過程,感覺好幾次的數(shù)據(jù)拷貝都是多余的,很影響性能啊"
"對,所以才有了零拷貝技術(shù)"
"具體咋實現(xiàn)?"
"慢慢來,如果在應(yīng)用程序中,不需要操作內(nèi)容,過程2和3就是多余的,如果可以直接把內(nèi)核態(tài)讀取緩存沖區(qū)數(shù)據(jù)直接拷貝到套接字相關(guān)的緩存區(qū),是不是可以達(dá)到優(yōu)化的目的?"

這種實現(xiàn),可以有以下幾點改進(jìn):
- 上下文切換的次數(shù)從四次減少到了兩次
- 數(shù)據(jù)拷貝次數(shù)從四次減少到了三次(其中DMA copy 2次,CPU copy 1次)
"怎么實現(xiàn)?"
"在Java中,正好FileChannel的transferTo() 方法可以實現(xiàn)這個過程,該方法將數(shù)據(jù)從文件通道傳輸?shù)浇o定的可寫字節(jié)通道, 上面的file.read()和 socket.send() 調(diào)用動作可以替換為 transferTo() 調(diào)用"
public void transferTo(long position, long count, WritableByteChannel target);
在 UNIX 和各種 Linux 系統(tǒng)中,此調(diào)用被傳遞到 sendfile() 系統(tǒng)調(diào)用中,最終實現(xiàn)將數(shù)據(jù)從一個文件描述符傳輸?shù)搅肆硪粋€文件描述符。
"確實改善了很多,但還沒達(dá)到零拷貝的要求,還有其它黑技術(shù)嗎?"
"對的,如果底層網(wǎng)絡(luò)接口卡支持收集操作的話,就可以進(jìn)一步的優(yōu)化。"
"怎么優(yōu)化?"
在 Linux 內(nèi)核 2.4 及后期版本中,針對套接字緩沖區(qū)描述符做了相應(yīng)調(diào)整,DMA自帶了收集功能,對于用戶方面,用法還是一樣的,但是內(nèi)部操作已經(jīng)發(fā)生了改變:

- 第一步,transferTo() 方法引發(fā) DMA 將文件內(nèi)容拷貝到內(nèi)核讀取緩沖區(qū)。
- 第二步,把包含數(shù)據(jù)位置和長度信息的描述符追加到套接字緩沖區(qū),避免了內(nèi)容整體的拷貝,DMA 引擎直接把數(shù)據(jù)從內(nèi)核緩沖區(qū)傳到協(xié)議引擎,從而消除了最后一次 CPU參與的拷貝動作。
Kafka中的HW、LEO、LSO等分別代表什么?
1.這些其實跟ISR有著緊密的關(guān)系,ISR(In-Sync Replicas )簡單說,就是為了 保證數(shù)據(jù)的一致性使用了ISR機制。是一個副本的列表,所有與leader副本保持一定程度同步的副本(包括leader)組成ISR.
2.HW (High Watermark)俗稱高水位,它標(biāo)識了一個特定的消息偏移量(offset),消費者只能拉取到這個offset之前的消息。
下圖表示一個日志文件,這個日志文件中只有9條消息,第一條消息的offset(LogStartOffset)為0,最有一條消息的offset為8,offset為9的消息使用虛線表示的,代表下一條待寫入的消息。日志文件的 HW 為6,表示消費者只能拉取offset在 0 到 5 之間的消息,offset為6的消息對消費者而言是不可見的。
image.png
LEO (Log End Offset),標(biāo)識當(dāng)前日志文件中下一條待寫入的消息的offset。上圖中offset為9的位置即為當(dāng)前日志文件的 LEO,LEO 的大小相當(dāng)于當(dāng)前日志分區(qū)中最后一條消息的offset值加1.分區(qū) ISR 集合中的每個副本都會維護(hù)自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對消費者而言只能消費 HW 之前的消息。
LSO 特指LastStableOffset。它具體與kafka的事物有關(guān)。
kafka有哪些情形會造成重復(fù)消費?或丟失信息?
先處理后提交offset,會造成重讀消費
先提交offset后處理,會造成數(shù)據(jù)丟失
kafka避免重復(fù)消費,或丟失信息
- 丟包問題
1.1 問題描述
所謂丟包一般是指發(fā)送方發(fā)送的數(shù)據(jù)未到達(dá)接收方. 常見的丟包可能發(fā)生在發(fā)送端, 網(wǎng)絡(luò),接收端.
例如,消息推送服務(wù),每天早上,手機上各終端都會給用戶推送消息,這時候流量劇增,可能會出現(xiàn)kafka發(fā)送數(shù)據(jù)過快,導(dǎo)致服務(wù)器網(wǎng)卡爆滿,或者磁盤處于繁忙狀態(tài),可能會出現(xiàn)丟包現(xiàn)象。
1.2 問題解決
解決方案:
對kafka進(jìn)行限速,平滑流量
啟用重試機制,重試間隔時間設(shè)置長一些
Kafka設(shè)置acks=all,即需要相應(yīng)的所有處于ISR的分區(qū)都確認(rèn)收到該消息后,才算發(fā)送成功。
檢測方法:使用重放機制,查看問題所在。
2.重發(fā)問題
2.1 問題描述
重發(fā)問題:當(dāng)消費者重新分配partition的時候,可能出現(xiàn)從頭開始消費的情況,導(dǎo)致重發(fā)問題。當(dāng)消費者消費的速度很慢的時候,可能在一個session周期內(nèi)還未完成,導(dǎo)致心跳機制檢測報告出問題。
問題場景:
1.設(shè)置offset為自動提交,正在消費數(shù)據(jù),kill消費者線程;
2.設(shè)置offset為自動提交,關(guān)閉kafka時,如果在close之前,調(diào)用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復(fù)消費;
3.消費kafka與業(yè)務(wù)邏輯在一個線程中處理,可能出現(xiàn)消費程序業(yè)務(wù)處理邏輯阻塞超時,導(dǎo)致一個周期內(nèi),offset還未提交;繼而重復(fù)消費,但是業(yè)務(wù)邏輯可能采用發(fā)送kafka或者其他無法回滾的方式;
2.2 問題分析
底層根本原因:已經(jīng)消費了數(shù)據(jù),但是offset沒提交。
配置問題:設(shè)置了offset自動提交
重復(fù)消費最常見的情況:re-balance問題,通常會遇到消費的數(shù)據(jù),處理很耗時,導(dǎo)致超過了Kafka的session timeout時間(0.10.x版本默認(rèn)是30秒),那么就會re-balance重平衡,此時有一定幾率offset沒提交,會導(dǎo)致重平衡后重復(fù)消費。
2.3 問題解決
解決辦法:至少成功發(fā)送一次+去重操作(冪等性)
2.3.1 如何保證至少成功發(fā)送一次
保證不丟失消息:
生產(chǎn)者(ack=all 代表至少成功發(fā)送一次)
消費者 (offset手動提交,業(yè)務(wù)邏輯成功處理后,提交offset)
2.3.2 去重操作(冪等性)
去重問題:消息可以使用唯一id標(biāo)識
保證不重復(fù)消費:落表(主鍵或者唯一索引的方式,避免重復(fù)數(shù)據(jù))
業(yè)務(wù)邏輯處理(選擇唯一主鍵存儲到Redis或者mongdb中,先查詢是否存在,若存在則不處理;若不存在,先插入Redis或Mongdb,再進(jìn)行業(yè)務(wù)邏輯處理)
