翻譯自 https://www.confluent.io/wp-content/uploads/Optimizing-Your-Apache-Kafka-Deployment-1.pdf
前言
Apache kafka是一套可以拿過(guò)來(lái)直接運(yùn)行起來(lái)的很好的企業(yè)級(jí)流處理平臺(tái)。只需要將你的客戶(hù)端應(yīng)用放到Kafka集群中,剩下的事件就都可以交給Kafka來(lái)處理,比如:負(fù)載在brokers之間的自動(dòng)分布,brokers自動(dòng)借助零拷貝傳輸技術(shù)發(fā)送數(shù)據(jù)到消費(fèi)者,當(dāng)有消費(fèi)者加入或離開(kāi)時(shí)consumer groups自動(dòng)均衡,應(yīng)用程序使用Kafka Streams APIs將狀態(tài)存儲(chǔ)自動(dòng)備份到集群中,當(dāng)broker故障時(shí)partition主自動(dòng)重新選舉。這樣看起來(lái),運(yùn)維人員的夢(mèng)想成真啦!
在不需要對(duì)Kafka配置參數(shù)作任何改動(dòng)的情況下,你就可以部署起來(lái)一套Kafka的開(kāi)發(fā)環(huán)境并且測(cè)試基本功能。但事實(shí)上Kafka可以直接運(yùn)行起來(lái)并不意味著在上到生產(chǎn)環(huán)境前你不需要作一些調(diào)整。需要作調(diào)整的原因是,不同的用戶(hù)場(chǎng)景有不同的需求集群,這將最終驅(qū)動(dòng)不同的服務(wù)目標(biāo)。為了針對(duì)這些服務(wù)目標(biāo)來(lái)作優(yōu)化,你將需要改變Kafka的某些配置參數(shù)。實(shí)際上,Kafka自動(dòng)的設(shè)計(jì)就給用戶(hù)提供了靈活的配置。為了確保你的Kafka環(huán)境是針對(duì)你的服務(wù)目標(biāo)作了優(yōu)化的,你必須要調(diào)整一些配置參數(shù)的設(shè)定并且在你的環(huán)境中作基準(zhǔn)測(cè)試。理想情況下,你將在上到生產(chǎn)環(huán)境前完成這些,或者至少在將集群規(guī)模擴(kuò)充到比較大之前完成。
這份白皮書(shū)涉及到如果確定你的服務(wù)目標(biāo),配置你的Kafka部署來(lái)優(yōu)化它們,通過(guò)監(jiān)控來(lái)確保達(dá)到了你的目標(biāo)。

確定針對(duì)哪些服務(wù)目標(biāo)作優(yōu)化
第一步是先確定你希望針對(duì)哪些服務(wù)目標(biāo)作優(yōu)化。我們認(rèn)為經(jīng)常需要在四個(gè)目標(biāo)間作相互權(quán)衡:吞吐量,延遲,持久化和可用性。為了發(fā)現(xiàn)你需要優(yōu)化的目標(biāo),回想你的集群想要給哪些用戶(hù)場(chǎng)景打供服務(wù)。思考一下應(yīng)用和業(yè)務(wù)需求--針對(duì)這些用戶(hù)場(chǎng)景作到絕對(duì)不能失敗將是最滿(mǎn)意的結(jié)果。思考一下Kafka作了一個(gè)流處理平臺(tái)是如何填充你的業(yè)務(wù)管道的。

有時(shí)確定需要優(yōu)化的服務(wù)目標(biāo)是一個(gè)很難回答的問(wèn)題,但是你必須強(qiáng)制你的團(tuán)隊(duì)討論并確定你們最基本的業(yè)務(wù)使用場(chǎng)景和主要目標(biāo)是什么。對(duì)這個(gè)討論而言有兩個(gè)原因是很重要的。
首先一個(gè)原因是你不可能在同一時(shí)間將所有的目標(biāo)都最大化。它需要在吞吐量,延遲,持久化和可用性間作權(quán)衡,我們將在這份白皮書(shū)中詳細(xì)闡述這些服務(wù)目標(biāo)。你可能熟悉常見(jiàn)的在吞吐量和延遲間的性能權(quán)衡,也可能熟悉在持久性和可用性之間的權(quán)衡。從整體上來(lái)考慮,你會(huì)發(fā)現(xiàn)你不能孤立地來(lái)考慮它們,這份白皮書(shū)就是將他們放在一起來(lái)考慮。這不是說(shuō)我們對(duì)目標(biāo)中的一個(gè)作優(yōu)化而完全丟掉其他的。它僅僅意味著這些服務(wù)目標(biāo)都是有內(nèi)在聯(lián)系的,但你不可能在同一時(shí)間內(nèi)對(duì)所有的都作出優(yōu)化。
確定對(duì)哪些服務(wù)目標(biāo)作優(yōu)化的第二個(gè)重要原因是你能夠并且也可以通過(guò)調(diào)整Kafka配置參數(shù)到達(dá)成它。你需要明白你的用戶(hù)期望從系統(tǒng)中得到什么來(lái)確保你優(yōu)化Kafka來(lái)完成他們需要的。
你希望針對(duì)高吞吐量,即數(shù)據(jù)生產(chǎn)或消費(fèi)的速度,來(lái)作出優(yōu)化嗎?有些使用場(chǎng)景每秒鐘可以寫(xiě)入上百萬(wàn)條消息?;贙afka本身的設(shè)計(jì),寫(xiě)入大量的數(shù)據(jù)對(duì)它來(lái)說(shuō)不是難事。它比寫(xiě)入大量數(shù)據(jù)到傳統(tǒng)數(shù)據(jù)庫(kù)或key-value存儲(chǔ)要愉,并且它可以使用先進(jìn)的硬件來(lái)完成這些操作。
你希望針對(duì)低延遲,即消息在端到端到達(dá)上的時(shí)間間隔,來(lái)作出優(yōu)化嗎? 低延遲的一個(gè)使用場(chǎng)景是聊天應(yīng)用,它總是希望消息接收者越快收到消息越好。另外一些例子包括交互式web應(yīng)有,比如用戶(hù)看到他朋友們的更新,或者是物聯(lián)網(wǎng)中的實(shí)時(shí)流處理。
你希望對(duì)可靠的持久性,即保證消息被提交后將不會(huì)丟失,來(lái)作出優(yōu)化嗎? 可靠持久性的一個(gè)使用場(chǎng)景是使用kafka作為事件存儲(chǔ)的事件驅(qū)動(dòng)的微服務(wù)管道。另一個(gè)例子是為了交付關(guān)鍵業(yè)務(wù)內(nèi)容而整合數(shù)據(jù)流來(lái)源和一些永久存儲(chǔ)(比如 AWS的S3)。
如果你希望優(yōu)化的服務(wù)目標(biāo)需要覆蓋Kafka集群中的所有topic,那么你可以在所有brokers上設(shè)置相應(yīng)的broker級(jí)別的配置參數(shù)來(lái)將期應(yīng)用到全部的topic上。換句話(huà)說(shuō),如果你想針對(duì)也不同的topic作不同的優(yōu)化,你同樣可以重寫(xiě)對(duì)應(yīng)的topic的配置參數(shù)。Topic沒(méi)有明確地被重寫(xiě)了配置的,將應(yīng)用這個(gè)broker的配置。
Kafka有上百個(gè)不同的配置參數(shù),這份白皮書(shū)只會(huì)針對(duì)我們的討論中用到的一部分配置。這些參數(shù)的名字,描述和默認(rèn)值在Confluent Platform version 3.2中已經(jīng)更新到最新。關(guān)于這些配置參數(shù),topic復(fù)寫(xiě)和其他一些參數(shù)的更多信息可以在http://docs.confluent.io找到。
在我們針對(duì)不同的服務(wù)目標(biāo)作優(yōu)化前有一點(diǎn)需要注意:我們?cè)谶@份白皮書(shū)中討論的一些配置參數(shù)的值取決于另一些因素,比如消息的平均大小,partition個(gè)數(shù)等。它們可能因環(huán)境不同而有很大不同。對(duì)于一個(gè)配置參數(shù),我們提供了配置值的一個(gè)合理的范圍,回想一下,基準(zhǔn)測(cè)試總是能夠很多地驗(yàn)證我們針對(duì)特定部署而作的設(shè)置。
優(yōu)化吞吐量

為了優(yōu)化吞吐量,生產(chǎn)者,消費(fèi)者和brokers都需要在給定的時(shí)間內(nèi)移動(dòng)盡可能多的數(shù)據(jù)。對(duì)于高吞量,你需要嘗試將數(shù)據(jù)移動(dòng)的速度最大化。這個(gè)數(shù)據(jù)移動(dòng)的速度越快越來(lái)。
Topic的partition是Kafka系統(tǒng)中最小的并發(fā)單元。生產(chǎn)者可以并行地將消息發(fā)送到不同的partition,并行地寫(xiě)入到不同的brokers上,消費(fèi)者也可以并行地從不同的partition上消費(fèi)數(shù)據(jù)。通常來(lái)講,數(shù)量多的topic paritions會(huì)帶來(lái)高的吞吐量。然后創(chuàng)建據(jù)有大量partition的topic看起來(lái)很有誘惑力,但依然需要權(quán)衡這種作法。我們需要基于生產(chǎn)者和消費(fèi)者的吞吐量來(lái)小心地確定partition的數(shù)量。這里有篇文章How to choose the number of topics/partitions in a Kafka cluster? 專(zhuān)門(mén)來(lái)講解了為何選擇partition數(shù)量并需要在你的環(huán)境中確認(rèn)基準(zhǔn)性能測(cè)試。
接下來(lái)我們討論一下kafka生產(chǎn)者的批量發(fā)送策略。生產(chǎn)者能夠?qū)⑾⑴堪l(fā)送到同一個(gè)partition, 也就是說(shuō)將多個(gè)消息收集到一個(gè)發(fā)送請(qǐng)求中然后一起發(fā)送出去。我們優(yōu)化吞吐量很重要的一步就是調(diào)整這個(gè)生產(chǎn)者批量發(fā)送的參數(shù),包括增加批量發(fā)送的大小和等待添充滿(mǎn)批量發(fā)送隊(duì)列所耗費(fèi)的時(shí)間。大的批量發(fā)送大小使得只有很少的請(qǐng)求發(fā)送到brokers,這降低了在生產(chǎn)者和brokers上處理每條請(qǐng)求的的CPU負(fù)載。在Java客戶(hù)端中,可以配置batch.size參數(shù)來(lái)每次批量發(fā)送的最大字節(jié)數(shù)。為了能有更多的時(shí)候來(lái)添充批量發(fā)送的隊(duì)列,你可以配置參數(shù)linger.ms來(lái)讓生產(chǎn)者在發(fā)送前等待更長(zhǎng)的時(shí)間。這需要權(quán)衡一下是否能容忍高的延遲,因?yàn)樵谶@種情況下消息不是在準(zhǔn)備好之后就立即發(fā)送。
你同樣可以通過(guò)配置compression.type參數(shù)來(lái)開(kāi)啟壓縮功能。壓縮意味著按照壓縮算法的使用,大數(shù)據(jù)量可以變成小數(shù)據(jù)量被發(fā)送。Kafka支持lz4, snappy和gzip壓縮算法。壓縮算法可以應(yīng)用到每個(gè)完整的數(shù)據(jù)batche上,這樣可以更好地提高壓縮比。
當(dāng)生產(chǎn)者發(fā)送消息到Kafka集群集地,這條消息是被發(fā)送到目標(biāo)partition的主所在的broker上。在發(fā)送下一條消息前,生產(chǎn)者總是要一直等待leader broker的響應(yīng)來(lái)知曉這條消息是否已經(jīng)提交。服務(wù)端是自動(dòng)檢測(cè)以確保消費(fèi)者不能讀取未提交的消息。leader brokers何時(shí)發(fā)送響應(yīng)可難會(huì)影響到生產(chǎn)者的吞吐量:生產(chǎn)者越早地收到響應(yīng),就能越早地發(fā)送下一條消息,這通常也會(huì)產(chǎn)生高的吞吐量。生產(chǎn)者可以設(shè)置配置參數(shù)acks來(lái)指定leader broker在發(fā)送給生產(chǎn)者ack響應(yīng)前需要收到多少個(gè)followers的ack。如果設(shè)置acls=1,則leader broker在將消息寫(xiě)到本地的log中后,不用接收到任何一個(gè)followers的ack就能夠發(fā)送響應(yīng)到生產(chǎn)者。這需要權(quán)衡你是否能容忍低的持久性,因?yàn)檫@時(shí)生產(chǎn)者不能夠等待到消息被復(fù)制到其他的brokers。
如果發(fā)送失敗,生產(chǎn)者可以重試,直到重試次數(shù)達(dá)到配置參數(shù)retries指定的上限。如果應(yīng)用程序能夠處理一些數(shù)據(jù)的丟失,你可以設(shè)置retries=0。在這種情況下,如果消息發(fā)送失敗,生產(chǎn)者將不會(huì)嘗試重新發(fā)送這個(gè)相同的消息并且消耗的帶寬將分配給其他的消息。
對(duì)于Java客戶(hù)端,Kafka生產(chǎn)者可能自動(dòng)分配內(nèi)存來(lái)存儲(chǔ)未發(fā)送的消息。如果內(nèi)存的使用達(dá)到上限,生產(chǎn)者會(huì)阻塞額外的消息發(fā)送直到內(nèi)存釋放或者直到max.block.ms時(shí)間過(guò)去。你可以通過(guò)配置參數(shù)buffer.memory來(lái)調(diào)整分配置內(nèi)存的大小。如果你沒(méi)有大量的partitions,你可能不需要調(diào)整這個(gè)大小。然而,如果你有大量的partition,你需要綜合buffer size, linger time和partition數(shù)量來(lái)調(diào)整這個(gè)參數(shù)。通過(guò)調(diào)整這個(gè)參數(shù),使得生產(chǎn)者在阻塞額外的消息發(fā)送前將經(jīng)歷很長(zhǎng)的時(shí)間,這樣也就提高了吞吐量。
同理,你也能夠通過(guò)調(diào)整消費(fèi)者每次從leader broker拉取的數(shù)據(jù)量的大小來(lái)提高吞吐量,它可以通過(guò)調(diào)整fetch.min.bytes這個(gè)參數(shù)來(lái)完成。增加這個(gè)參數(shù)會(huì)減少發(fā)送到broker的fetch請(qǐng)求的數(shù)據(jù),降低broker處理每條fetch請(qǐng)求的CPU負(fù)載,這將改善吞吐量。與在生產(chǎn)者上增加batch大小相似,當(dāng)增加這個(gè)參數(shù)時(shí),需要考慮高延遲的權(quán)衡。這是因?yàn)閎roker不會(huì)立即發(fā)送新的消息到消費(fèi)者,直到有足夠的消息來(lái)填滿(mǎn)fetch.min.bytes,或者直到等待時(shí)間過(guò)期。
假設(shè)應(yīng)用程序允許,可以使用由多個(gè)消費(fèi)者組成的消費(fèi)組來(lái)并行消費(fèi)。并行消費(fèi)可以提高吞吐量,因?yàn)槎鄠€(gè)消費(fèi)者可以自動(dòng)作負(fù)載均衡,同時(shí)消費(fèi)不同的partition。
最后,你可以調(diào)整JVM參數(shù)來(lái)最小化GC的停止時(shí)間。GC對(duì)于刪除不再使用的對(duì)象和回收內(nèi)存是很有必要的。但是,長(zhǎng)的GC停止間對(duì)提高吞吐量有負(fù)面影響,在最壞情況下還會(huì)導(dǎo)致broker的軟故障,比如zookeeper會(huì)話(huà)超時(shí)。
優(yōu)化吞吐量的一些配置
生產(chǎn)者:
- batch.size: increase to 100000 - 200000 (default 16384)
- linger.ms: increase to 10 - 100 (default 0)
- compression.type=lz4 (default none)
- acks=1 (default 1)
- retries=0 (default 0)
- buffer.memory: increase if there are a lot of partitions (default 33554432)
消費(fèi)者:
- fetch.min.bytes: increase to ~100000 (default 1)
優(yōu)化延遲
在上面吞吐量一節(jié)中討論的有些Kafka配置參數(shù)有些默認(rèn)值也可以?xún)?yōu)化延遲。雖然這些參數(shù)通常不需要調(diào)整,但我們還是需要弄清楚它們是如何工作的。
Partition是Kafka的最小并行彈單元,增加Partition數(shù)量可以增加吞吐量。但是,需要權(quán)衡在增加Partition數(shù)量的同時(shí)會(huì)增大延遲。默認(rèn)情況下一個(gè)Broker使用單一的一個(gè)線(xiàn)程從其他broker來(lái)復(fù)制數(shù)據(jù),因此在兩兩broker之間復(fù)制大量Partition將花費(fèi)更長(zhǎng)的時(shí)間并且這樣的結(jié)果是消息被最終認(rèn)定為提交狀態(tài)也需要花費(fèi)更長(zhǎng)的時(shí)間。沒(méi)有提交的消息不能被消費(fèi),因此這又增加了端到端的延遲。
為了解決這個(gè)特定的拉取消息的延遲,一個(gè)選擇是嘗試限制任意一臺(tái)broker上的partition數(shù)量。你可以通過(guò)限制整個(gè)集群的partition數(shù)量或者增加集數(shù)的broker數(shù)量來(lái)達(dá)到這個(gè)目的。對(duì)于需要低延遲但又需要大量partiton的實(shí)時(shí)處理類(lèi)應(yīng)用程序,你可以調(diào)整同步線(xiàn)程個(gè)數(shù)。這可以通過(guò)調(diào)整num.replicat.fetchers參數(shù)來(lái)完成,它會(huì)增加在follower broker上的并行IO資源。你可以在它是默認(rèn)值1的時(shí)候作基準(zhǔn)測(cè)試,如果follwers不能跟上leader的數(shù)據(jù)寫(xiě)主,你可以增加大這個(gè)配置。
你可以考慮是否需要開(kāi)啟壓縮功能。開(kāi)啟壓縮通常需要更多的CPU周期,但可以減少網(wǎng)絡(luò)帶寬的占用。反之,會(huì)增加網(wǎng)絡(luò)帶寬占用。好的壓縮編碼方式也可能潛在地降低延遲。
你可以調(diào)整在認(rèn)為發(fā)送請(qǐng)求完成前生產(chǎn)者需要leader broker收到的ACK數(shù)量。leader broker越快的響應(yīng),生產(chǎn)者也就能更快地繼續(xù)發(fā)送下一條消息,這通??梢越档脱舆t。可以通過(guò)生產(chǎn)者的ack配置參數(shù)來(lái)設(shè)置所需要的ACK的數(shù)量。默認(rèn)acks=1, 意味著只要leader broker在消息寫(xiě)入了本地存儲(chǔ),在所有復(fù)本收到這個(gè)消息之前,就可以響應(yīng)生產(chǎn)者了。依賴(lài)于你的應(yīng)用的需求,你可以設(shè)置acks=0, 這樣會(huì)讓生產(chǎn)者不等待broker的任何響應(yīng)就可以發(fā)送下一條消息,但這樣潛在地可能會(huì)丟失數(shù)據(jù)。
與生產(chǎn)者批量發(fā)送的目的類(lèi)似,你也可以調(diào)整消費(fèi)者每次從leader broker拉取的數(shù)據(jù)量的大小來(lái)優(yōu)化這個(gè)延遲。默認(rèn)情況下,消息者配置參數(shù)fetch.min.bytes=1,它意味著只要一字節(jié)的數(shù)據(jù)是有效的,fetch 請(qǐng)求就會(huì)返回,或者是在有效數(shù)據(jù)到達(dá)前fetch reqeust超時(shí)了。
有些場(chǎng)景,你需要執(zhí)行大規(guī)模,低延遲地table 查詢(xún)操作,你可以使用Kafka Stream API來(lái)作本地的流式處理。一個(gè)流行的方案是使用Kafka Connect將遠(yuǎn)程數(shù)據(jù)庫(kù)存的數(shù)據(jù)拉取到本地的kafka系統(tǒng)中,然后你就可以利用Streams API來(lái)執(zhí)行特別快速和有效地一些tables的本地的join操作和流處理,而不再需要應(yīng)用程序針對(duì)每條記錄都參通過(guò)網(wǎng)絡(luò)發(fā)起一次針對(duì)遠(yuǎn)程數(shù)據(jù)庫(kù)的查詢(xún)操作。你可以跟蹤在本地狀態(tài)存儲(chǔ)中的每個(gè)table的最新?tīng)顟B(tài),當(dāng)你作類(lèi)似于streaming joins這些的操作時(shí),將極大地降低處理的延遲。
優(yōu)化延遲的一些配置
生產(chǎn)者:
- linger.ms=0 (default 0)
- compression.type=none (default none)
- acks=1 (default 1)
Broker:
- num.replica.fetchers: 如果follwers不能跟上leader的數(shù)據(jù)寫(xiě)主,你可以增加大這個(gè)配置(default 1)
消息者:
- fetch.min.bytes=1 (default 1)
優(yōu)化持久化存儲(chǔ)
持久化是降低消息丟失的全部機(jī)會(huì)之所在。開(kāi)啟持久化的最重要的特性是復(fù)制,它意味著消息將被拷貝到多個(gè)broker上。如果一個(gè)broker故障,數(shù)據(jù)還可以在至少一個(gè)broker上是有效的。對(duì)有高持久化需求的topic來(lái)說(shuō),需要將配置參數(shù)replication.factor設(shè)置為3,這將確保集群在壞掉兩臺(tái)broker時(shí)也不丟失數(shù)據(jù)。如果Kafka集群開(kāi)啟了topic自動(dòng)創(chuàng)建功能,那么你需要考慮改變配置參數(shù)default.replication.factor到3,使自動(dòng)創(chuàng)建的topic也有復(fù)本,或者禁止topic自動(dòng)創(chuàng)建,由你自己來(lái)控制每個(gè)topic的復(fù)本數(shù)和partition設(shè)置。
復(fù)本對(duì)于被客戶(hù)端使用的所有topic的持久化來(lái)說(shuō)是很重要的,對(duì)于像__consumer_offsets這種Kafka內(nèi)部topic來(lái)說(shuō)也是很重要的。這個(gè)topic跟蹤已經(jīng)被消費(fèi)的消息的offsets。除非你運(yùn)行的kafka版強(qiáng)制為每個(gè)topic設(shè)置復(fù)本,那你應(yīng)該小心處理topic的自動(dòng)創(chuàng)建。當(dāng)啟動(dòng)一個(gè)新集群時(shí),在開(kāi)始從topics消費(fèi)數(shù)據(jù)前,應(yīng)當(dāng)至少等待三個(gè)brokers在線(xiàn)。這可以避免自動(dòng)創(chuàng)建的topic__consumer_offsets的復(fù)本數(shù)比配置參數(shù)offsets.topic.replication.factor定義的復(fù)本數(shù)還少。
生產(chǎn)者可以通過(guò)acks配置參數(shù)來(lái)控制寫(xiě)到Kafka的消息的持久性。這個(gè)參數(shù)在吞吐量和延遲優(yōu)化中討論過(guò),但是它主要是用在持久化方面。為了優(yōu)化高的持久性,我們建議設(shè)置它為acks=all,這意味著leader將等待收到in-sync列表中所有復(fù)本的ack回應(yīng)后才認(rèn)為這個(gè)消息被提交了。它強(qiáng)有力地保證了只要in-sync列表中的復(fù)本只要有一個(gè)還活著,數(shù)據(jù)就不會(huì)丟失。這需要權(quán)衡是否能容忍高的延遲,因?yàn)樵陧憫?yīng)生產(chǎn)者之前,leader需要等待所有in-sync表表中復(fù)本的回應(yīng)。
生產(chǎn)者同樣也可以通過(guò)在發(fā)送失敗時(shí)嘗試重新發(fā)送的方式來(lái)增強(qiáng)持久性。這可以自動(dòng)完成也可以手動(dòng)完成。生產(chǎn)者自動(dòng)重試的次數(shù)上限是通過(guò)retries參數(shù)指定的。生產(chǎn)者手動(dòng)重試是依賴(lài)于返回給客戶(hù)端的異常來(lái)完成的。如果你希望生產(chǎn)者自動(dòng)處理重試操作,你可以設(shè)置retries為大于0的數(shù)。如果你配置了retries,有兩件事情你需要考慮:
- 如果集群有一個(gè)瞬時(shí)的抖動(dòng),可以造成消息重復(fù)。為了處理這種情況,你需要確保你的消費(fèi)者能夠處理重復(fù)消息。當(dāng)前,我們正致力于開(kāi)發(fā)消費(fèi)且僅消費(fèi)一次語(yǔ)義的支持,它將幫助解決這個(gè)問(wèn)題。
- 多次發(fā)送嘗試可能是在相同時(shí)間內(nèi)并且可能發(fā)生在另一個(gè)成功發(fā)送之后,這可能導(dǎo)致發(fā)送亂序。為了在允許重發(fā)失敗的消息的前提下也保持消息順序,你需要設(shè)置配置參數(shù)
max.in.flight.requests.pre.connection為1來(lái)確保同一時(shí)間僅有一個(gè)請(qǐng)求發(fā)送到broker。
Kafka集群通過(guò)在多個(gè)brokers間復(fù)制數(shù)據(jù)來(lái)提供持久性。每個(gè)Paritition都有一個(gè)需要復(fù)制其數(shù)據(jù)的復(fù)本的列表,能夠跟上leader數(shù)據(jù)寫(xiě)入進(jìn)程的復(fù)本列表被叫作in sync replicas(ISR)。對(duì)于每一個(gè)partition, leader broker將自動(dòng)復(fù)制消息到ISR列表中的其他broker,實(shí)際上是其他復(fù)本主動(dòng)去leader broker上拉取。當(dāng)生產(chǎn)者設(shè)置了acks=all時(shí),然后這個(gè)配置參數(shù)min.insync.replicas可以針對(duì)ISR列表里復(fù)本個(gè)數(shù)指定一個(gè)最小的閾值。如果這個(gè)最小的復(fù)制數(shù)沒(méi)有達(dá)到,生產(chǎn)者將產(chǎn)生一個(gè)異常。min.insyc.replicas和acks一起使用能使持久化得到強(qiáng)制保證。一個(gè)典型的場(chǎng)景是創(chuàng)建一個(gè)topic,它的replication.factor=3,broker的min.insync.replicas=2并且acks=all,這可以確保在大多數(shù)復(fù)本都沒(méi)有接收到數(shù)據(jù)時(shí),生產(chǎn)者將產(chǎn)生一個(gè)異常。
Kafka將針對(duì)broker故障提供的保障擴(kuò)展到能覆蓋機(jī)架故障。這個(gè)機(jī)架感知特性能夠?qū)⑾嗤琾artition的復(fù)本分散到不同的機(jī)架。這樣就限制了全部broker都在一個(gè)機(jī)架上時(shí)機(jī)架發(fā)生故障導(dǎo)致的數(shù)據(jù)丟失。這個(gè)特性同樣可以通過(guò)將broker分散到不同的有效區(qū)域的方式應(yīng)用到像度亞馬遜EC2這樣的云解決方案上。你可以通過(guò)設(shè)置配置參數(shù)broker.rack來(lái)指定broker屬于哪個(gè)機(jī)架,這樣Kafka將自動(dòng)確保復(fù)本盡可能多地分散到現(xiàn)不的機(jī)架。
如果有broker故障,Kafka集群能夠自動(dòng)偵測(cè)這個(gè)故障并且選舉出新的partition主。新的partition主是從正在運(yùn)行中的復(fù)本中選出。在ISR列表中的brokers都有最新的消息,并且它們中的一個(gè)將變?yōu)樾碌闹?,它能夠從之前的主broker中斷的位置繼續(xù)拷貝消息到其他仍需要追趕的復(fù)本上。配置參數(shù)unclean.leader.election.enable標(biāo)識(shí)在ISR列表中之外的沒(méi)有追趕上leader的brokers是否能變?yōu)樾碌闹鳌?duì)于高持久性而言,需要通過(guò)設(shè)置unclean.leader.election.enable=false來(lái)確何新的leader僅僅從ISR列表中選舉出來(lái)。這可以避免消息因已提交但沒(méi)復(fù)制而丟失的風(fēng)險(xiǎn)。這需要權(quán)衡是否能容忍更多的不工作時(shí)間直到足夠多的復(fù)本重新回到同步狀態(tài),即回到ISR列表中。
因此我們強(qiáng)烈建議你為了持久性而使用Kafka復(fù)本并且允許OS控制數(shù)據(jù)從page cache同步到磁盤(pán),你通常不需要更改flush的設(shè)置。但是,對(duì)于對(duì)吞吐率要求極低的關(guān)鍵topics來(lái)說(shuō),在OS將數(shù)據(jù)flush到磁盤(pán)前可能有比較久的時(shí)間周期。對(duì)于這樣的topics,你可以考慮調(diào)整log.flush.interval.ms或log.flush.interval.messages到比較小。例如,如果你需要將每條消息都實(shí)時(shí)持久化到磁盤(pán),你可以設(shè)置log.flush.interval.messages=1。
你同樣需要考慮如果消費(fèi)者遇到不可預(yù)知的故障時(shí)如何確保再次處理消息時(shí),消息不丟失。Consumer offset用來(lái)跟蹤已經(jīng)消費(fèi)了的消息,因此消費(fèi)者何時(shí)提交,如何提交message offset對(duì)于持久性來(lái)說(shuō)就很關(guān)鍵。你肯定想避免這樣的情況發(fā)生:消費(fèi)者提交了消息的offset,然后開(kāi)始處理這個(gè)消息,并且發(fā)生了不可預(yù)支的故障。這將導(dǎo)致后續(xù)從這個(gè)partition讀取消息的消費(fèi)都不能再重新處理這個(gè)消息,因?yàn)樗膐ffset已經(jīng)被提交過(guò)。你可以通過(guò)配置參數(shù)auto.commit.enable來(lái)配置offset的提交方式。默認(rèn)情況下,offset被配置成在消費(fèi)者周期性地調(diào)用poll()期間自動(dòng)提交。但是如何消息者是事務(wù)鏈中的一部分,那么你需要消息到達(dá)的強(qiáng)保證。對(duì)于持久性來(lái)說(shuō),通過(guò)設(shè)置auto.commit.enable=false來(lái)禁用掉自動(dòng)提交并且顯式地調(diào)用commitSync或者commitAsync等提交方法。
優(yōu)化持久化的一些配置
生產(chǎn)者:
- replication.factor: 3, configure per topic
- acks=all (default 1)
- retries: 1 or more (default 0)
- max.in.flight.requests.per.connection=1 (default 5),避免消息亂序
Broker:
- default.replication.factor=3 (default 1)
- auto.create.topics.enable=false (default true)
- min.insync.replicas=2 (default 1); topic override available
- unclean.leader.election.enable=false (default true); topic override available
- broker.rack: rack of the broker (default null)
- log.flush.interval.messages, log.flush.interval.ms: 對(duì)于吞吐量很低的topics,將這兩個(gè)參數(shù)設(shè)置低一些是需要的。
消費(fèi)者:
- auto.commit.enable=false (default true)
優(yōu)化可用性
為了優(yōu)化高可用性,你需要調(diào)整Kafka,以便其能夠快速?gòu)墓收现谢謴?fù)。
很高的partiton數(shù)量可以增加并發(fā),增加吞吐量,但它也可能會(huì)增加從broker故障事件中恢復(fù)所需的時(shí)間。所有的生產(chǎn)者和消費(fèi)者都將暫停,直到主選舉完成,并且每一個(gè)partition都需要進(jìn)行l(wèi)eader選舉。因此在選擇partition數(shù)量時(shí)需要考慮故障恢復(fù)時(shí)間。
當(dāng)一個(gè)生產(chǎn)者設(shè)置了acks=all,配置參數(shù)min.insync.replicas指定為認(rèn)為寫(xiě)消息成功時(shí)需要回應(yīng)ack的最小復(fù)本數(shù)。如果這個(gè)最小復(fù)本數(shù)都不能達(dá)成,生產(chǎn)者將產(chǎn)生一個(gè)異常。在ISR列表收縮的情況,生產(chǎn)者發(fā)送消息更可能失敗,這將降低partition的高用性。換言之,設(shè)置它為一個(gè)比較低的值,比如min.insync.replicas=1,則系統(tǒng)將能容忍更多的復(fù)本故障。只要滿(mǎn)足最小的復(fù)本個(gè)數(shù),生產(chǎn)者發(fā)送消息將繼續(xù)成功,這增加了partition的可用性。
Broker故障將導(dǎo)致partition選舉,它會(huì)自動(dòng)進(jìn)行。你可以控制哪些broker有能力選舉成為主。為了優(yōu)化持久性,新的主僅從ISR列表中選舉出來(lái),這樣作可以避免因消息提交了但沒(méi)有復(fù)制而導(dǎo)致丟消息的風(fēng)險(xiǎn)。相比之比,為了優(yōu)化多可用性,新主可以允許從ISR列表中移除的brokers中選舉出來(lái),這可以通過(guò)設(shè)置unclean.leader.election.enable=true來(lái)實(shí)現(xiàn)。它可以讓leader的選舉很快的發(fā)生,增強(qiáng)了整體的可用性。
當(dāng)Broker啟動(dòng)時(shí),為了和其他broker同步數(shù)據(jù),它會(huì)掃描日志數(shù)據(jù)文件。這個(gè)過(guò)程被稱(chēng)作日志恢復(fù)。針對(duì)每個(gè)數(shù)據(jù)目錄,在啟動(dòng)中用作恢復(fù)和在關(guān)閉時(shí)用作flush的線(xiàn)程數(shù)由配置參數(shù)mun.recovery.threads.per.data.dir來(lái)控制。有數(shù)千個(gè)log segments的brokers也會(huì)有大量的索引文件,它們會(huì)導(dǎo)致在broker啟動(dòng)時(shí)log的加載很慢。如果你使用RAID,那么你可以將num.recovery.threads.per.data.dir增加到磁盤(pán)個(gè)數(shù)大小,可能會(huì)減少日志加載時(shí)間。
最后,在消費(fèi)者一側(cè),消費(fèi)者作為消息組的一部分來(lái)共享處理所有的消費(fèi)負(fù)載。如果一個(gè)消費(fèi)者發(fā)生故障,Kafka能夠偵測(cè)到錯(cuò)誤并且對(duì)這個(gè)消費(fèi)組中余下的消費(fèi)者作負(fù)載均衡。通過(guò)配置參數(shù)session.timeout.ms來(lái)設(shè)置用來(lái)確定消費(fèi)者故障所需的超時(shí)時(shí)間。消費(fèi)者故障分為硬故障(比如 SIGKILL)和軟故障(比如 session超時(shí))。軟故障的發(fā)生通常有兩種情況:通過(guò)poll()返回的批量消息處理的時(shí)間過(guò)長(zhǎng),或者JVM GC停頓時(shí)間過(guò)長(zhǎng)。如果session超時(shí)時(shí)間設(shè)置得較短,可以很快地偵測(cè)到消費(fèi)者故障,這就減小從故障中恢復(fù)的時(shí)間。
優(yōu)化可用性的一些配置
Broker:
- unclean.leader.election.enable=true (default true); topic override available
- min.insync.replicas=1 (default 1); topic override available
- num.recovery.threads.per.data.dir: number of directories in log.dirs (default 1)
Consumer:
- session.timeout.ms: 越低越快 (default 10000)
基準(zhǔn)測(cè)試,監(jiān)控和調(diào)優(yōu)
基準(zhǔn)測(cè)試很重要,因?yàn)閷?duì)于上面我們討論的配置參數(shù)沒(méi)有一種配置可以適用于所有的場(chǎng)景。合適的配置總是取決于使用場(chǎng)景,每臺(tái)broker的硬件配置,你開(kāi)啟的一些特性和數(shù)據(jù)的配置等。如果你想要調(diào)整Kafka的默認(rèn)配置,我們通常建議你作基準(zhǔn)測(cè)試。不管你的服務(wù)目標(biāo)是什么 ,你都需要明白這個(gè)集群的性能配置是什么--當(dāng)你想優(yōu)化吞吐量或延遲時(shí),它特別重要。你的基準(zhǔn)測(cè)試同時(shí)也可以使用計(jì)算并確定合適的partition數(shù)量,集群規(guī)模和生產(chǎn)者,消費(fèi)者進(jìn)程數(shù)量。
針對(duì)一個(gè)使用默認(rèn)配置的測(cè)試環(huán)境來(lái)開(kāi)始基準(zhǔn)測(cè)試,并且熟悉這些默認(rèn)值是什么。確定針對(duì)生產(chǎn)者的輸入性能基線(xiàn)。首先需要從生產(chǎn)者上移除任何的上游數(shù)據(jù)依賴(lài)。不要從上游數(shù)據(jù)源接收數(shù)據(jù),更改你的生產(chǎn)者以很高的輸出頻率來(lái)產(chǎn)生模擬數(shù)據(jù),并且這些模擬數(shù)據(jù)的產(chǎn)生不要成為瓶頸。如果你測(cè)試時(shí)使用了壓縮,你需要留意這些模擬數(shù)據(jù)的產(chǎn)生方式。有時(shí)產(chǎn)生的模擬數(shù)據(jù)都是被無(wú)意義的0所填充的。這可能會(huì)導(dǎo)致壓縮性能要好于生產(chǎn)環(huán)境的性能。確保填充的數(shù)據(jù)能反映真實(shí)的生產(chǎn)環(huán)境的數(shù)據(jù)。
在單個(gè)server上運(yùn)行單個(gè)生產(chǎn)者。Kafka集群有足夠大的容量,因此它沒(méi)有瓶頸。可以使用有效的JMX metrics來(lái)統(tǒng)計(jì)Kafka生產(chǎn)者的最終吞吐量。重復(fù)這個(gè)生產(chǎn)者的基準(zhǔn)測(cè)試,在每次迭代中增加生產(chǎn)者進(jìn)程數(shù)量,來(lái)確定一臺(tái)server上達(dá)到最大吞吐量時(shí)的生產(chǎn)者進(jìn)程數(shù)量。你可以用類(lèi)似的方法來(lái)確定針對(duì)消費(fèi)者的輸出流量的性能。
接下來(lái)我們可以用能反應(yīng)我們服務(wù)目標(biāo)的配置參數(shù)的不同組合來(lái)運(yùn)行基準(zhǔn)測(cè)試。將配置參數(shù)集中在這個(gè)白皮書(shū)中討論的,并且抵擋住發(fā)掘和更改那些你沒(méi)有完全明白對(duì)整個(gè)系統(tǒng)有何影響的參數(shù)的默認(rèn)值。集中在我們已經(jīng)討論過(guò)的參數(shù),迭代地調(diào)整他們,運(yùn)行測(cè)試,觀察結(jié)果,再調(diào)整,直到這設(shè)置很好滿(mǎn)足了你的吞吐量和延遲。
在你遷移到生產(chǎn)環(huán)境前,確保針對(duì)brokers, 生產(chǎn)者,消費(fèi)者,topics和其他你使用的kafka組件都有一個(gè)強(qiáng)有力的監(jiān)控。為了將內(nèi)部的JMX metrics暴露給JMX工具,你在開(kāi)始運(yùn)行broker進(jìn)程時(shí),加上JMX_PORT這個(gè)參數(shù)。
在運(yùn)維一個(gè)Kafka集群的監(jiān)控系統(tǒng)時(shí),你需要基于你的服務(wù)目標(biāo)來(lái)考慮你的報(bào)警設(shè)置。這些報(bào)警可能是因topic而異的。有的topic可能有低延遲的需求,有的topic可能有高吞吐量的需求。