生產(chǎn)者:往消息隊(duì)列里推送消息的應(yīng)用
發(fā)送消息的過程
Kafka 生產(chǎn)者發(fā)送消息的過程:
- Kafka 會(huì)將發(fā)送消息包裝為 ProducerRecord 對(duì)象, ProducerRecord 對(duì)象包含了目標(biāo)主題和要發(fā)送的內(nèi)容,同時(shí)還可以指定鍵和分區(qū)。在發(fā)送 ProducerRecord 對(duì)象前,生產(chǎn)者會(huì)先把鍵和值對(duì)象序列化成字節(jié)數(shù)組,這樣它們才能夠在網(wǎng)絡(luò)上傳輸。
- 接下來,數(shù)據(jù)被傳給分區(qū)器。如果之前已經(jīng)在 ProducerRecord 對(duì)象里指定了分區(qū),那么分區(qū)器就不會(huì)再做任何事情。如果沒有指定分區(qū) ,那么分區(qū)器會(huì)根據(jù) ProducerRecord 對(duì)象的鍵來選擇一個(gè)分區(qū),緊接著,這條記錄被添加到一個(gè)記錄批次里,這個(gè)批次里的所有消息會(huì)被發(fā)送到相同的主題和分區(qū)上。有一個(gè)獨(dú)立的線程負(fù)責(zé)把這些記錄批次發(fā)送到相應(yīng)的 broker 上。
-
服務(wù)器在收到這些消息時(shí)會(huì)返回一個(gè)響應(yīng)。如果消息成功寫入 Kafka,就返回一個(gè) RecordMetaData 對(duì)象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量。如果寫入失敗,則會(huì)返回一個(gè)錯(cuò)誤。生產(chǎn)者在收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息,如果達(dá)到指定的重試次數(shù)后還沒有成功,則直接拋出異常,不再重試。
kafka生產(chǎn)者發(fā)送消息
發(fā)送方式
生產(chǎn)者有三種方式發(fā)送消息。實(shí)際上,發(fā)送的動(dòng)作都是一致的,不由使用者決定。這三種方式區(qū)別在于對(duì)于消息是否正常到達(dá)的處理。
- fire-and-forget:把消息發(fā)送給broker之后不關(guān)心其是否正常到達(dá)。大多數(shù)情況下,消息會(huì)正常到達(dá),即使出錯(cuò)了生產(chǎn)者也會(huì)自動(dòng)重試。但是如果出錯(cuò)了,對(duì)于我們的服務(wù)而言,是無感知的。這種適用于可丟失消息、對(duì)吞吐量要求大的場景,比如用戶點(diǎn)擊日志上報(bào)。
- 同步發(fā)送:我們使用send方法發(fā)送一條消息,它會(huì)返回一個(gè)Future,調(diào)用get方法可以阻塞住當(dāng)前線程,等待返回。這種適用對(duì)消息可靠性要求高的場景,比如支付,要求消息不可丟失,如果丟失了則阻斷業(yè)務(wù)(或回滾)
- 異步發(fā)送:使用send方法發(fā)送一條消息時(shí)指定回調(diào)函數(shù),在broker返回結(jié)果時(shí)調(diào)用。這個(gè)回調(diào)函數(shù)可以進(jìn)行錯(cuò)誤日志的記錄,或者重試。這種方式犧牲了一部分可靠性,但是吞吐量會(huì)比同步發(fā)送高很多。但是我們可以通過后續(xù)的補(bǔ)償操作彌補(bǔ)業(yè)務(wù)。
序列化器
一般而言,我們都會(huì)采用默認(rèn)的序列化器。但是我們也可以采用更加高效和通用的序列化框架,比如Protobuf、Avro。
如果有業(yè)務(wù)需要,可以自定義序列化器實(shí)現(xiàn)消息的加密
分區(qū)
在kafka中,一個(gè)topic可以設(shè)定多個(gè)分區(qū)。在ProducerRecord中,我們可以指定分區(qū),也可以通過指定key來決定消息被分到哪個(gè)分區(qū)。
如果key為nil,那么分區(qū)器會(huì)采用輪詢的方式將消息平均分配到各個(gè)分區(qū);
如果key不為nil,那么分區(qū)器會(huì)采用內(nèi)置的hash函數(shù)(不是java提供的,不會(huì)因?yàn)閖ava的升級(jí)而改變,可以保證新舊數(shù)據(jù)可以分到同一個(gè)分區(qū))進(jìn)行分區(qū)
指定key有兩個(gè)問題:
- 必須保證分區(qū)可用。如果分區(qū)不可用,那么消息將寫入失敗。這種情況很少發(fā)生,也可以通過復(fù)制和同步的功能來保證分區(qū)的高可用。
- 使用默認(rèn)的分區(qū)器,只有在不改變分區(qū)數(shù)量的情況下才能保證鍵與分區(qū)的映射。一般來說,我們最好是在創(chuàng)建分區(qū)時(shí)就把分區(qū)規(guī)劃好,避免在生產(chǎn)環(huán)境中修改分區(qū)數(shù)量。
自定義分區(qū)算法:如果需要保證key與分區(qū)的映射,同時(shí)也要兼顧后續(xù)業(yè)務(wù)增長增加分區(qū)數(shù)量,那么最好是自定義分區(qū)算法,采用一致性hash進(jìn)行分區(qū)。
順序保證
kafka可以保證同一個(gè)分區(qū)的消息時(shí)有序的,也就是說,broker會(huì)將同一個(gè)分區(qū)的消息按發(fā)送順序(準(zhǔn)備來說是接收順序)寫入分區(qū),消費(fèi)者也會(huì)按照同樣的順序讀取。
但是為了吞吐量,一般我們都會(huì)設(shè)置每次發(fā)送消息的數(shù)據(jù)大于1(max.in.flight.requests.per.connection)。如果AB兩條消息同一批次發(fā)送,正常來說應(yīng)該A在B前面。但是有可能因?yàn)锳寫入出錯(cuò)了,B成功了。之后A在下個(gè)批次重試,這時(shí)兩條消息在隊(duì)列里的順序就會(huì)變成BA,消費(fèi)也會(huì)按這個(gè)順序進(jìn)行消費(fèi)。
如果要確保消息順序,那么在生產(chǎn)者的配置上,就應(yīng)該將max.in.flight.requests.per.connection設(shè)置為1,每次只提交一條消息,成功了再提交第二條消息。當(dāng)然,這樣會(huì)嚴(yán)重影響吞吐量。
還有一種方法,那就是在發(fā)送AB的時(shí)候,先保證A發(fā)送成功,再發(fā)送B。B的發(fā)送可以同步發(fā)送A成功后再發(fā)送,或者在異步發(fā)送A消息的回調(diào)函數(shù)中進(jìn)行。這種方法會(huì)加長單個(gè)業(yè)務(wù)的處理時(shí)長,但是發(fā)送消息的吞吐量不會(huì)受到影響。就整體而言,如果是有異步發(fā)送的方式,其吞吐量并不會(huì)下降多少。
參考
《kafka權(quán)威指南》
