? ? 本文主要介紹kafka producer的可靠性,包括ack、batch、重試機制等
消息發(fā)布
KafkaProducer的send是異步發(fā)送方法,一旦消息存儲到發(fā)送隊列緩沖區(qū),方法就會立即返回,實際處理消息發(fā)送的是一個后臺Sender線程。send方法返回一個Future對象(包含消息的元數(shù)據(jù)信息如分區(qū)、消息存儲的offset),并接收回調(diào)函數(shù)參數(shù),當消息ack后會調(diào)用回調(diào)函數(shù);調(diào)用Future的get方法,將阻塞直到相關聯(lián)的請求完成并返回該消息的元數(shù)據(jù)或在消息發(fā)送過程中拋出異常。
? ? 如果是簡單的阻塞調(diào)用,可以使用get方法:
????完全非阻塞的用法是使用回調(diào)參數(shù),在消息發(fā)送請求完成后調(diào)用該函數(shù):
忽略返回參數(shù),客戶端感知不到消息發(fā)送狀態(tài):
????如果應用和Kafka集群間的網(wǎng)絡質(zhì)量太差,那么阻塞的方式發(fā)送每條消息后需要等待較長時間才能收到應答。這對高并發(fā)、海量消息發(fā)送簡直就是災難,因為等待應答的時間遠超過消息發(fā)送時間。如果一些消息不太注重可靠性,發(fā)送失敗了只需要記錄下日志,可以用回調(diào)函數(shù)方式。
Acker
? ? Kafka使用push模式把消息發(fā)布到broker,消息后發(fā)布后,producer又是怎么確定消息已經(jīng)成功持久化?這是通過acker機制實現(xiàn)的,broker反饋給客戶端消息已經(jīng)收到并寫入到日志文件(基于性能考慮,broker并沒有把數(shù)據(jù)落盤而是放到內(nèi)存)。通過配置不同acks值,對應不同級別:
acks=0:消息會立即添加到socket緩沖區(qū),producer不會等待broker的任何確認消息,就認為消息已經(jīng)發(fā)送了。這種模式下數(shù)據(jù)傳輸效率是最高的,但不能保證broker已經(jīng)收到了消息,所以數(shù)據(jù)可靠性也是最低的。
? ? acks=1:意味著leader成功把消息寫到本地日志后就反饋給producer,而不關心同步節(jié)點上該消息是否已寫到本地。如果leader宕機但同步節(jié)點還沒有及時拉取到該消息,則數(shù)據(jù)就丟失了。
? ? acks=all/-1:leader會等待ISR中所有的同步節(jié)點都確認接收到了消息,才反饋給客戶端。這種可靠性是最高的,保證了只要至少有一個in-sync副本還活著,消息就不會丟失,acks=all需要leader等待所有同步節(jié)點ack,這種延遲取決于最慢節(jié)點。但是這樣也不能保證數(shù)據(jù)不丟失,比如當ISR中只有l(wèi)eader時,這樣就變成了acks=1的情況。
? ? 當acks=all/-1時,min.insync.replicas這個參數(shù)指定了ISR中的最少副本數(shù),默認值為1。如果ISR中的副本數(shù)小于min.insync.replicas時,客戶端會返回異常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。通過這兩個參數(shù)配合使用,能保證消息發(fā)送的可靠性。
重試機制
? ? Kafka Producer一般有兩類錯誤,可重試錯誤會通過重試發(fā)送消息解決,比如連接重連可解決連接錯誤、partition重新選舉leader可解決“NotLeaderForPartitionException”錯誤。Kafka Producer能配置重試次數(shù),超過重試次數(shù)還不能解決的會拋出錯誤。另外一類就是不能通過重試處理的錯誤,比如,消息大小太大,這種情況下Kafka Producer會立即報錯。
? ? 如果producer發(fā)送數(shù)據(jù)給broker后,遇到的網(wǎng)絡問題而造成通信中斷,那producer就無法判斷該條消息是否已經(jīng)提交(commit)。雖然Kafka無法確定網(wǎng)絡故障期間發(fā)生了什么,但是producer可以retry多次,確保消息已經(jīng)正確傳輸?shù)絙roker中
Batch
? ? 當多個消息被發(fā)送到同一分區(qū)時,producer會嘗試將多個消息合并到一個批次,就是把多個消息打包在一起發(fā)送到 Broker。在一次請求中發(fā)送大批量數(shù)據(jù),提高producer和broker性能,廣泛用于大數(shù)據(jù)場景。
batch.size:配置批量發(fā)送的最大字節(jié)數(shù),如果batch.size=0,會禁用batch。producer是在內(nèi)存中積累數(shù)據(jù),batch size越大占用內(nèi)存越多,因會始終分配指定大小的緩沖區(qū)。
linger.ms:通常是在消息的到達速度比發(fā)送速度快,producer才會把多個消息打包成一個批次,然而,在某些場景下,客戶端希望降低請求次數(shù),這可以通過增加延遲發(fā)送功能來實現(xiàn):producer不是立即發(fā)送消息,而是等待給定的延遲時間,以積累更多的消息批量發(fā)送,達到節(jié)省網(wǎng)絡資源的目的。linger.ms配置項就是讓Producer在發(fā)送消息前等待一定時間,以積累更多的消息打包發(fā)送,默認配置為0
消息重復性
? ? acks=-1的情況下,消息發(fā)送到leader后 ,當只有部分ISR副本完成了消息同步,leader此時掛掉,客戶端會認為消息發(fā)送失敗,就會重新發(fā)送數(shù)據(jù)(設置了retries),數(shù)據(jù)就可能會重復。比如follower1同步了leader的消息,follower2沒有同步到,leader掛掉后,producer會得到異常,認為消息發(fā)送失敗了,而follower1被選舉為leader,producer又重新發(fā)送消息,這樣消息就重復了。
本文首發(fā)于公眾號:data之道