一、Kafka簡(jiǎn)介
Kafka (科技術(shù)語(yǔ))。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者規(guī)模的網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。
1.1 背景歷史
當(dāng)今社會(huì)各種應(yīng)用系統(tǒng)諸如商業(yè)、社交、搜索、瀏覽等像信息工廠一樣不斷的生產(chǎn)出各種信息,在大數(shù)據(jù)時(shí)代,我們面臨如下幾個(gè)挑戰(zhàn):
- 如何收集這些巨大的信息
- 如何分析它
- 如何及時(shí)做到如上兩點(diǎn)
以上幾個(gè)挑戰(zhàn)形成了一個(gè)業(yè)務(wù)需求模型,即生產(chǎn)者生產(chǎn)(produce)各種信息,消費(fèi)者消費(fèi)(consume)(處理分析)這些信息,而在生產(chǎn)者與消費(fèi)者之間,需要一個(gè)溝通兩者的橋梁-消息系統(tǒng)。從一個(gè)微觀層面來(lái)說(shuō),這種需求也可理解為不同的系統(tǒng)之間如何傳遞消息。
1.2 Kafka誕生
- Kafka由 linked-in 開源
- kafka-即是解決上述這類問(wèn)題的一個(gè)框架,它實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者之間的無(wú)縫連接。
- kafka-高產(chǎn)出的分布式消息系統(tǒng)(A high-throughput distributed messaging system)
1.3 Kafka現(xiàn)在
Apache kafka 是一個(gè)分布式的基于push-subscribe的消息系統(tǒng),它具備快速、可擴(kuò)展、可持久化的特點(diǎn)。它現(xiàn)在是Apache旗下的一個(gè)開源系統(tǒng),作為hadoop生態(tài)系統(tǒng)的一部分,被各種商業(yè)公司廣泛應(yīng)用。它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿足各種需求場(chǎng)景:比如基于hadoop的批處理系統(tǒng)、低延遲的實(shí)時(shí)系統(tǒng)、storm/spark流式處理引擎
二、Kafka技術(shù)概覽
2.1 Kafka的特性
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒
- 可擴(kuò)展性:kafka集群支持熱擴(kuò)展
- 持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
- 容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失?。?/li>
- 高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫
2.2 Kafka一些重要設(shè)計(jì)思想
- Consumergroup:各個(gè)consumer可以組成一個(gè)組,每個(gè)消息只能被組中的一個(gè)consumer消費(fèi),如果一個(gè)消息可以被多個(gè)consumer消費(fèi)的話,那么這些consumer必須在不同的組。
- 消息狀態(tài):在Kafka中,消息的狀態(tài)被保存在consumer中,broker不會(huì)關(guān)心哪個(gè)消息被消費(fèi)了被誰(shuí)消費(fèi)了,只記錄一個(gè)offset值(指向partition中下一個(gè)要被消費(fèi)的消息位置),這就意味著如果consumer處理不好的話,broker上的一個(gè)消息可能會(huì)被消費(fèi)多次。
- 消息持久化:Kafka中會(huì)把消息持久化到本地文件系統(tǒng)中,并且保持極高的效率。
- 消息有效期:Kafka會(huì)長(zhǎng)久保留其中的消息,以便consumer可以多次消費(fèi),當(dāng)然其中很多細(xì)節(jié)是可配置的。
- 批量發(fā)送:Kafka支持以消息集合為單位進(jìn)行批量發(fā)送,以提高push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對(duì)消息的生產(chǎn)和消費(fèi)是異步的。
- Kafka集群中broker之間的關(guān)系:不是主從關(guān)系,各個(gè)broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個(gè)broker節(jié)點(diǎn)。
- 負(fù)載均衡方面: Kafka提供了一個(gè) metadata API來(lái)管理broker之間的負(fù)載(對(duì)Kafka0.8.x而言,對(duì)于0.7.x主要靠zookeeper來(lái)實(shí)現(xiàn)負(fù)載均衡)。
- 同步異步:Producer采用異步push方式,極大提高Kafka系統(tǒng)的吞吐率(可以通過(guò)參數(shù)控制是采用同步還是異步方式)。
- 分區(qū)機(jī)制partition:Kafka的broker端支持消息分區(qū),Producer可以決定把消息發(fā)到哪個(gè)分區(qū),在一個(gè)分區(qū)中消息的順序就是Producer發(fā)送消息的順序,一個(gè)主題中可以有多個(gè)分區(qū),具體分區(qū)的數(shù)量是可配置的。分區(qū)的意義很重大,后面的內(nèi)容會(huì)逐漸體現(xiàn)。
- 離線數(shù)據(jù)裝載:Kafka由于對(duì)可拓展的數(shù)據(jù)持久化的支持,它也非常適合向Hadoop或者數(shù)據(jù)倉(cāng)庫(kù)中進(jìn)行數(shù)據(jù)裝載。
- 插件支持:現(xiàn)在不少活躍的社區(qū)已經(jīng)開發(fā)出不少插件來(lái)拓展Kafka的功能,如用來(lái)配合Storm、Hadoop、flume相關(guān)的插件
2.3 kafka 應(yīng)用場(chǎng)景
- 日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log,通過(guò)kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者、緩存消息等。
- 用戶活動(dòng)跟蹤:Kafka經(jīng)常被用來(lái)記錄web用戶或者app用戶的各種活動(dòng),如瀏覽網(wǎng)頁(yè)、搜索、點(diǎn)擊等活動(dòng),這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過(guò)訂閱這些topic來(lái)做實(shí)時(shí)的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉(cāng)庫(kù)中做離線分析和挖掘。
- 運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來(lái)記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告。
- 流式處理:比如spark streaming和storm
- 事件源
在我們?nèi)粘5捻?xiàng)目中可能運(yùn)用到的一些場(chǎng)景:日志記錄,分布式架構(gòu)中多服務(wù)之間數(shù)據(jù)通訊,跨平臺(tái)數(shù)據(jù)通知等。我經(jīng)歷過(guò)的一個(gè)傳統(tǒng)項(xiàng)目使用kafka改造后的一個(gè)場(chǎng)景:---
2.4 Kafka架構(gòu)組件
Kafka中發(fā)布訂閱的對(duì)象是topic。我們可以為每類數(shù)據(jù)創(chuàng)建一個(gè)topic,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時(shí)從多個(gè)topic讀寫數(shù)據(jù)。一個(gè)kafka集群由一個(gè)或多個(gè)broker服務(wù)器組成,它負(fù)責(zé)持久化和備份具體的kafka消息。
- topic:消息存放的目錄即主題
- Producer:生產(chǎn)消息到topic的一方
- Consumer:訂閱topic消費(fèi)消息的一方
- Broker:Kafka的服務(wù)實(shí)例就是一個(gè)broker

2.5 Kafka Topic&Partition
消息發(fā)送時(shí)都被發(fā)送到一個(gè)topic,其本質(zhì)就是一個(gè)目錄,而topic由是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:

我們可以看到,每個(gè)Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值。
Kafka集群會(huì)保存所有的消息,不管消息有沒(méi)有被消費(fèi);我們可以設(shè)定消息的過(guò)期時(shí)間,只有過(guò)期的數(shù)據(jù)才會(huì)被自動(dòng)清除以釋放磁盤空間。比如我們?cè)O(shè)置消息過(guò)期時(shí)間為2天,那么這2天內(nèi)的所有消息都會(huì)被保存到集群中,數(shù)據(jù)只有超過(guò)了兩天才會(huì)被清除。
Kafka需要維持的元數(shù)據(jù)只有一個(gè)–消費(fèi)消息在Partition中的offset值,Consumer每消費(fèi)一個(gè)消息,offset就會(huì)加1。其實(shí)消息的狀態(tài)完全是由Consumer控制的,Consumer可以跟蹤和重設(shè)這個(gè)offset值,這樣的話Consumer就可以讀取任意位置的消息。
把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴(kuò)展,每個(gè)Partition可以通過(guò)調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了;第二就是可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫了
三、Kafka 核心組件
3.1 Replications、Partitions 和Leaders
通過(guò)上面介紹的我們可以知道,kafka中的數(shù)據(jù)是持久化的并且能夠容錯(cuò)的。Kafka允許用戶為每個(gè)topic設(shè)置副本數(shù)量,副本數(shù)量決定了有幾個(gè)broker來(lái)存放寫入的數(shù)據(jù)。如果你的副本數(shù)量設(shè)置為3,那么一份數(shù)據(jù)就會(huì)被存放在3臺(tái)不同的機(jī)器上,那么就允許有2個(gè)機(jī)器失敗。一般推薦副本數(shù)量至少為2,這樣就可以保證增減、重啟機(jī)器時(shí)不會(huì)影響到數(shù)據(jù)消費(fèi)。如果對(duì)數(shù)據(jù)持久化有更高的要求,可以把副本數(shù)量設(shè)置為3或者更多。
Kafka中的topic是以partition的形式存放的,每一個(gè)topic都可以設(shè)置它的partition數(shù)量,Partition的數(shù)量決定了組成topic的log的數(shù)量。Producer在生產(chǎn)數(shù)據(jù)時(shí),會(huì)按照一定規(guī)則(這個(gè)規(guī)則是可以自定義的)把消息發(fā)布到topic的各個(gè)partition中。上面將的副本都是以partition為單位的,不過(guò)只有一個(gè)partition的副本會(huì)被選舉成leader作為讀寫用。
關(guān)于如何設(shè)置partition值需要考慮的因素。一個(gè)partition只能被一個(gè)消費(fèi)者消費(fèi)(一個(gè)消費(fèi)者可以同時(shí)消費(fèi)多個(gè)partition),因此,如果設(shè)置的partition的數(shù)量小于consumer的數(shù)量,就會(huì)有消費(fèi)者消費(fèi)不到數(shù)據(jù)。所以,推薦partition的數(shù)量一定要大于同時(shí)運(yùn)行的consumer的數(shù)量。另外一方面,建議partition的數(shù)量大于集群broker的數(shù)量,這樣leader partition就可以均勻的分布在各個(gè)broker中,最終使得集群負(fù)載均衡。在Cloudera,每個(gè)topic都有上百個(gè)partition。需要注意的是,kafka需要為每個(gè)partition分配一些內(nèi)存來(lái)緩存消息數(shù)據(jù),如果partition數(shù)量越大,就要為kafka分配更大的heap space。
(1)下面以一個(gè)Kafka集群中4個(gè)Broker舉例,創(chuàng)建1個(gè)topic包含4個(gè)Partition,2 Replication;數(shù)據(jù)Producer流動(dòng)如圖所示:
(2)當(dāng)集群中新增2節(jié)點(diǎn),Partition增加到6個(gè)時(shí)分布情況如下:
3.2 Producers
Producers直接發(fā)送消息到broker上的leader partition,不需要經(jīng)過(guò)任何中介一系列的路由轉(zhuǎn)發(fā)。為了實(shí)現(xiàn)這個(gè)特性,kafka集群中的每個(gè)broker都可以響應(yīng)producer的請(qǐng)求,并返回topic的一些元信息,這些元信息包括哪些機(jī)器是存活的,topic的leader partition都在哪,現(xiàn)階段哪些leader partition是可以直接被訪問(wèn)的。
Producer客戶端自己控制著消息被推送到哪些partition。實(shí)現(xiàn)的方式可以是隨機(jī)分配、實(shí)現(xiàn)一類隨機(jī)負(fù)載均衡算法,或者指定一些分區(qū)算法。Kafka提供了接口供用戶實(shí)現(xiàn)自定義的分區(qū),用戶可以為每個(gè)消息指定一個(gè)partitionKey,通過(guò)這個(gè)key來(lái)實(shí)現(xiàn)一些hash分區(qū)算法。比如,把userid作為partitionkey的話,相同userid的消息將會(huì)被推送到同一個(gè)分區(qū)。
以Batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafkaProducer可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè)batch發(fā)送請(qǐng)求。Batch的數(shù)量大小可以通過(guò)Producer的參數(shù)控制,參數(shù)值可以設(shè)置為累計(jì)的消息的數(shù)量(如500條)、累計(jì)的時(shí)間間隔(如100ms)或者累計(jì)的數(shù)據(jù)大小(64KB)。通過(guò)增加batch的大小,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤IO的次數(shù),當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡。
Producers可以異步的并行的向kafka發(fā)送消息,但是通常producer在發(fā)送完消息之后會(huì)得到一個(gè)future響應(yīng),返回的是offset值或者發(fā)送過(guò)程中遇到的錯(cuò)誤。這其中有個(gè)非常重要的參數(shù)“acks”,這個(gè)參數(shù)決定了producer要求leader partition 收到確認(rèn)的副本個(gè)數(shù),如果acks設(shè)置數(shù)量為0,表示producer不會(huì)等待broker的響應(yīng),所以,producer無(wú)法知道消息是否發(fā)送成功,這樣有可能會(huì)導(dǎo)致數(shù)據(jù)丟失,但同時(shí),acks值為0會(huì)得到最大的系統(tǒng)吞吐量。若acks設(shè)置為1,表示producer會(huì)在leader partition收到消息時(shí)得到broker的一個(gè)確認(rèn),這樣會(huì)有更好的可靠性,因?yàn)榭蛻舳藭?huì)等待直到broker確認(rèn)收到消息。若設(shè)置為-1,producer會(huì)在所有備份的partition收到消息時(shí)得到broker的確認(rèn),這個(gè)設(shè)置可以得到最高的可靠性保證。
3.3 Consumers
Kafka提供了兩套consumer api,分為high-levelapi和sample-api。
Sample-api是一個(gè)底層的API,它維持了一個(gè)和單一
broker的連接,并且這個(gè)API是完全無(wú)狀態(tài)的,每次請(qǐng)求都需要指定offset值,因此,這套API也是最靈活的。
在kafka中,當(dāng)前讀到消息的offset值是由consumer來(lái)維護(hù)的,因此,consumer可以自己決定如何讀取kafka中的數(shù)據(jù)。比如,consumer可以通過(guò)重設(shè)offset值來(lái)重新消費(fèi)已消費(fèi)過(guò)的數(shù)據(jù)。不管有沒(méi)有被消費(fèi),kafka會(huì)保存數(shù)據(jù)一段時(shí)間,這個(gè)時(shí)間周期是可配置的,只有到了過(guò)期時(shí)間,kafka才會(huì)刪除這些數(shù)據(jù)。
High-level API封裝了對(duì)集群中一系列broker的訪問(wèn),可以透明的消費(fèi)一個(gè)topic。它自己維持了已消費(fèi)消息的狀態(tài),即每次消費(fèi)的都是下一個(gè)消息。
High-level API還支持以組的形式消費(fèi)topic,如果consumers有同一個(gè)組名,那么kafka就相當(dāng)于一個(gè)隊(duì)列消息服務(wù),而各個(gè)consumer均衡的消費(fèi)相應(yīng)partition中的數(shù)據(jù)。若consumers有不同的組名,那么此時(shí)kafka就相當(dāng)與一個(gè)廣播服務(wù),會(huì)把topic中的所有消息廣播到每個(gè)consumer。
消費(fèi)結(jié)構(gòu)

消費(fèi)原理

3.4 kafka中partition和消費(fèi)者對(duì)應(yīng)關(guān)系
1個(gè)partition只能被同組的一個(gè)consumer消費(fèi),同組的consumer則起到均衡效果
- 消費(fèi)者數(shù)量為2 ,partition數(shù)量1,此時(shí)partition和消費(fèi)者進(jìn)程對(duì)應(yīng)關(guān)系如下:

- 消費(fèi)者數(shù)量為2 ,partition數(shù)量3,此時(shí)partition和消費(fèi)者進(jìn)程對(duì)應(yīng)關(guān)系如下:

- 消費(fèi)者數(shù)量為4 ,partition數(shù)量3,此時(shí)partition和消費(fèi)者進(jìn)程對(duì)應(yīng)關(guān)系如下:

四、Kafka核心特性
4.1 壓縮
我們上面已經(jīng)知道了Kafka支持以集合(batch)為單位發(fā)送消息,在此基礎(chǔ)上,Kafka還支持對(duì)消息集合進(jìn)行壓縮,Producer端可以通過(guò)GZIP或Snappy格式對(duì)消息集合進(jìn)行壓縮。Producer端進(jìn)行壓縮之后,在Consumer端需進(jìn)行解壓。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量,減輕對(duì)網(wǎng)絡(luò)傳輸?shù)膲毫?,在?duì)大數(shù)據(jù)處理上,瓶頸往往體現(xiàn)在網(wǎng)絡(luò)上而不是CPU(壓縮和解壓會(huì)耗掉部分CPU資源)。
那么如何區(qū)分消息是壓縮的還是未壓縮的呢,Kafka在消息頭部添加了一個(gè)描述壓縮屬性字節(jié),這個(gè)字節(jié)的后兩位表示消息的壓縮采用的編碼,如果后兩位為0,則表示消息未被壓縮。
4.2消息可靠性
在消息系統(tǒng)中,保證消息在生產(chǎn)和消費(fèi)過(guò)程中的可靠性是十分重要的,在實(shí)際消息傳遞過(guò)程中,可能會(huì)出現(xiàn)如下三中情況:
- 一個(gè)消息發(fā)送失敗
- 一個(gè)消息被發(fā)送多次
- 最理想的情況:exactly-once ,一個(gè)消息發(fā)送成功且僅發(fā)送了一次
有許多系統(tǒng)聲稱它們實(shí)現(xiàn)了exactly-once,但是它們其實(shí)忽略了生產(chǎn)者或消費(fèi)者在生產(chǎn)和消費(fèi)過(guò)程中有可能失敗的情況。比如雖然一個(gè)Producer成功發(fā)送一個(gè)消息,但是消息在發(fā)送途中丟失,或者成功發(fā)送到broker,也被consumer成功取走,但是這個(gè)consumer在處理取過(guò)來(lái)的消息時(shí)失敗了。
從Producer端看:Kafka是這么處理的,當(dāng)一個(gè)消息被發(fā)送后,Producer會(huì)等待broker成功接收到消息的反饋(可通過(guò)參數(shù)控制等待時(shí)間),如果消息在途中丟失或是其中一個(gè)broker掛掉,Producer會(huì)重新發(fā)送(我們知道Kafka有備份機(jī)制,可以通過(guò)參數(shù)控制是否等待所有備份節(jié)點(diǎn)都收到消息)。
從Consumer端看:前面講到過(guò)partition,broker端記錄了partition中的一個(gè)offset值,這個(gè)值指向Consumer下一個(gè)即將消費(fèi)message。當(dāng)Consumer收到了消息,但卻在處理過(guò)程中掛掉,此時(shí)Consumer可以通過(guò)這個(gè)offset值重新找到上一個(gè)消息再進(jìn)行處理。Consumer還有權(quán)限控制這個(gè)offset值,對(duì)持久化到broker端的消息做任意處理。
4.3 備份機(jī)制
備份機(jī)制是Kafka0.8版本的新特性,備份機(jī)制的出現(xiàn)大大提高了Kafka集群的可靠性、穩(wěn)定性。有了備份機(jī)制后,Kafka允許集群中的節(jié)點(diǎn)掛掉后而不影響整個(gè)集群工作。一個(gè)備份數(shù)量為n的集群允許n-1個(gè)節(jié)點(diǎn)失敗。在所有備份節(jié)點(diǎn)中,有一個(gè)節(jié)點(diǎn)作為lead節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)保存了其它備份節(jié)點(diǎn)列表,并維持各個(gè)備份間的狀體同步。下面這幅圖解釋了Kafka的備份機(jī)制:
4.4 Kafka高效性相關(guān)設(shè)計(jì)
- 4.4.1 消息的持久化
- 4.4.2 常數(shù)時(shí)間性能保證
- 4.4.3 進(jìn)一步提高效率
五、Kafka部署
5.1 單機(jī)部署
zookeeper zoo.cfg 配置
# The number of milliseconds of each tick
#是Zookeeper獨(dú)立的工作時(shí)間單元
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
#
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper/zookeeper0
# the port at which the clients will connect
clientPort=2180
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
kafka server.properties 配置
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.60.96.143:9091
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
#server用來(lái)處理請(qǐng)求的I/O線程的數(shù)目;這個(gè)線程數(shù)目至少要等于硬盤的個(gè)數(shù)。
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#如果創(chuàng)建topic時(shí)沒(méi)有給出劃分partitions個(gè)數(shù),這個(gè)數(shù)字將是topic下partitions數(shù)目的默認(rèn)數(shù)值。
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
#每個(gè)數(shù)據(jù)目錄用來(lái)日志恢復(fù)的線程數(shù)目
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
#日志有效時(shí)間
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log文件尺寸
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
#檢查日志分段文件的間隔時(shí)間,以確定是否文件屬性是否到達(dá)刪除要求。
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.60.96.143:2180
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#enable delete topic
delete.topic.enable=true
常用的命令工具
topi命令
- bin/kafka-topics.sh --list --zookeeper 10.60.96.142:2182
- bin/kafka-topics.sh --create --zookeeper 10.60.96.142:2182--replication-factor 1 --partitions 1 --topic test
- bin/kafka-topics.sh --zookeeper 10.60.96.142:2182 --delete --topic "test"
- bin/kafka-topics.sh --zookeeper 10.60.96.142:2182 --topic "test" --describe
啟動(dòng)kafka 服務(wù)
- bin/kafka-server-start.sh -daemon config/server3.properties
查看log信息
- bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/topic1-0/00000000000000000000.log --print-data-log
測(cè)試
#創(chuàng)建Topic
./kafka-topics.sh --create --zookeeper 10.60.96.143:2181 --replication-factor 2 --partitions 1 --topic shuaige
#解釋
--replication-factor 2 #復(fù)制兩份
--partitions 1 #創(chuàng)建1個(gè)分區(qū)
--topic #主題為shuaige
'''在一臺(tái)服務(wù)器上創(chuàng)建一個(gè)發(fā)布者'''
#創(chuàng)建一個(gè)broker,發(fā)布者
./kafka-console-producer.sh --broker-list 10.60.96.143:9092 --topic test
'''在一臺(tái)服務(wù)器上創(chuàng)建一個(gè)訂閱者'''
./kafka-console-consumer.sh --zookeeper 10.60.96.143:2181 --topic test --from-beginning
5.2 集群部署
5.2 配置詳解
zookeeper配置 zoo.ccfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper/zookeeper1
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.0=127.0.0.1:8880:7770
server.1=127.0.0.1:8881:7771
server.2=127.0.0.1:8882:7772
kafka server.properties 配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.60.96.143:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.60.96.143:2181,10.60.96.143:2182,10.60.96.143:2183
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#enable delete topic
delete.topic.enable=true
******producer配置******
#參與消息確認(rèn)的broker數(shù)量控制,0代表不需要任何確認(rèn) 1代表需要leader replica確認(rèn) -1代表需要ISR中所有進(jìn)行確認(rèn)
request.required.acks 0
#從發(fā)送請(qǐng)求到收到ACK確認(rèn)等待的最長(zhǎng)時(shí)間(超時(shí)時(shí)間)
request.timeout.ms 10000
#設(shè)置消息發(fā)送模式,默認(rèn)是同步方式, async異步模式下允許消息累計(jì)到一定量或一段時(shí)間又另外線程批量發(fā)送,吞吐量好但丟失數(shù)據(jù)風(fēng)險(xiǎn)增大
producer.type sync
#消息序列化類實(shí)現(xiàn)方式,默認(rèn)是byte[]數(shù)組形式
serializer.class kafka.serializer.DefaultEncoder
#kafka消息分區(qū)策略實(shí)現(xiàn)方式,默認(rèn)是對(duì)key進(jìn)行hash
partitioner.class kafka.producer.DefaultPartitioner
#對(duì)發(fā)送的消息采取的壓縮編碼方式,有none|gzip|snappy
compression.codec none
#指定哪些topic的message需要壓縮
compressed.topics null
#消息發(fā)送失敗的情況下,重試發(fā)送的次數(shù) 存在消息發(fā)送是成功的,只是由于網(wǎng)絡(luò)導(dǎo)致ACK沒(méi)收到的重試,會(huì)出現(xiàn)消息被重復(fù)發(fā)送的情況
message.send.max.retries 3
#在開始重新發(fā)起metadata更新操作需要等待的時(shí)間
retry.backoff.ms 100
#metadata刷新間隔時(shí)間,如果負(fù)值則失敗的時(shí)候才會(huì)刷新,如果0則每次發(fā)送后都刷新,正值則是一種周期行為
topic.metadata.refresh.interval.ms 600 * 1000
#異步發(fā)送模式下,緩存數(shù)據(jù)的最長(zhǎng)時(shí)間,之后便會(huì)被發(fā)送到broker
queue.buffering.max.ms 5000
#producer端異步模式下最多緩存的消息條數(shù)
queue.buffering.max.messages 10000
#0代表隊(duì)列沒(méi)滿的時(shí)候直接入隊(duì),滿了立即扔棄,-1代表無(wú)條件阻塞且不丟棄
queue.enqueue.timeout.ms -1
#一次批量發(fā)送需要達(dá)到的消息條數(shù),當(dāng)然如果queue.buffering.max.ms達(dá)到的時(shí)候也會(huì)被發(fā)送
batch.num.messages 200
consumer配置
#指明當(dāng)前消費(fèi)進(jìn)程所屬的消費(fèi)組,一個(gè)partition只能被同一個(gè)消費(fèi)組的一個(gè)消費(fèi)者消費(fèi)
group.id
#針對(duì)一個(gè)partition的fetch request所能拉取的最大消息字節(jié)數(shù),必須大于等于Kafka運(yùn)行的最大消息
fetch.message.max.bytes 1024 * 1024
#是否自動(dòng)周期性提交已經(jīng)拉取到消費(fèi)端的消息offset
auto.commit.enable true
#自動(dòng)提交offset到zookeeper的時(shí)間間隔
auto.commit.interval.ms 60 * 1000
#消費(fèi)均衡的重試次數(shù)
rebalance.max.retries 4
#消費(fèi)均衡兩次重試之間的時(shí)間間隔
rebalance.backoff.ms 2000
#當(dāng)重新去獲取partition的leader前需要等待的時(shí)間
refresh.leader.backoff.ms 200
#如果zookeeper上沒(méi)有offset合理的初始值情況下獲取第一條消息開始的策略smallest|largeset
auto.offset.reset largest
#如果其超時(shí),將會(huì)可能觸發(fā)rebalance并認(rèn)為已經(jīng)死去
zookeeper.session.timeout.ms 6000
#確認(rèn)zookeeper連接建立操作客戶端能等待的最長(zhǎng)時(shí)間
zookeeper.connection.timeout.ms 6000
其他kafka配置詳解
六、Kafka消息結(jié)構(gòu)解析
內(nèi)部數(shù)據(jù)結(jié)構(gòu):
- -- Topic (名字)
- -- PartitionID ( 可選)
- -- Key[( 可選 )
- -- Value
生產(chǎn)者記錄(簡(jiǎn)稱PR)的發(fā)送邏輯:
- 若指定Partition ID,則PR被發(fā)送至指定Partition
- 若未指定Partition ID,但指定了Key, PR會(huì)按照hasy(key)發(fā)送至對(duì)應(yīng)Partition
- 若既未指定Partition ID也沒(méi)指定Key,PR會(huì)按照round-robin模式發(fā)送到每個(gè)Partition
- 若同時(shí)指定了Partition ID和Key, PR只會(huì)發(fā)送到指定的Partition (Key不起作用,代碼邏輯決定)