Kafka 學(xué)習(xí)筆記

一、Kafka簡(jiǎn)介

Kafka (科技術(shù)語(yǔ))。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者規(guī)模的網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。

1.1 背景歷史

當(dāng)今社會(huì)各種應(yīng)用系統(tǒng)諸如商業(yè)、社交、搜索、瀏覽等像信息工廠一樣不斷的生產(chǎn)出各種信息,在大數(shù)據(jù)時(shí)代,我們面臨如下幾個(gè)挑戰(zhàn):

  • 如何收集這些巨大的信息
  • 如何分析它
  • 如何及時(shí)做到如上兩點(diǎn)

以上幾個(gè)挑戰(zhàn)形成了一個(gè)業(yè)務(wù)需求模型,即生產(chǎn)者生產(chǎn)(produce)各種信息,消費(fèi)者消費(fèi)(consume)(處理分析)這些信息,而在生產(chǎn)者與消費(fèi)者之間,需要一個(gè)溝通兩者的橋梁-消息系統(tǒng)。從一個(gè)微觀層面來(lái)說(shuō),這種需求也可理解為不同的系統(tǒng)之間如何傳遞消息。

1.2 Kafka誕生

  • Kafka由 linked-in 開源
  • kafka-即是解決上述這類問(wèn)題的一個(gè)框架,它實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者之間的無(wú)縫連接。
  • kafka-高產(chǎn)出的分布式消息系統(tǒng)(A high-throughput distributed messaging system)

1.3 Kafka現(xiàn)在

Apache kafka 是一個(gè)分布式的基于push-subscribe的消息系統(tǒng),它具備快速、可擴(kuò)展、可持久化的特點(diǎn)。它現(xiàn)在是Apache旗下的一個(gè)開源系統(tǒng),作為hadoop生態(tài)系統(tǒng)的一部分,被各種商業(yè)公司廣泛應(yīng)用。它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿足各種需求場(chǎng)景:比如基于hadoop的批處理系統(tǒng)、低延遲的實(shí)時(shí)系統(tǒng)、storm/spark流式處理引擎

二、Kafka技術(shù)概覽

2.1 Kafka的特性

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒
  • 可擴(kuò)展性:kafka集群支持熱擴(kuò)展
  • 持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
  • 容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失?。?/li>
  • 高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫

2.2 Kafka一些重要設(shè)計(jì)思想

  • Consumergroup:各個(gè)consumer可以組成一個(gè)組,每個(gè)消息只能被組中的一個(gè)consumer消費(fèi),如果一個(gè)消息可以被多個(gè)consumer消費(fèi)的話,那么這些consumer必須在不同的組。
  • 消息狀態(tài):在Kafka中,消息的狀態(tài)被保存在consumer中,broker不會(huì)關(guān)心哪個(gè)消息被消費(fèi)了被誰(shuí)消費(fèi)了,只記錄一個(gè)offset值(指向partition中下一個(gè)要被消費(fèi)的消息位置),這就意味著如果consumer處理不好的話,broker上的一個(gè)消息可能會(huì)被消費(fèi)多次。
  • 消息持久化:Kafka中會(huì)把消息持久化到本地文件系統(tǒng)中,并且保持極高的效率。
  • 消息有效期:Kafka會(huì)長(zhǎng)久保留其中的消息,以便consumer可以多次消費(fèi),當(dāng)然其中很多細(xì)節(jié)是可配置的。
  • 批量發(fā)送:Kafka支持以消息集合為單位進(jìn)行批量發(fā)送,以提高push效率。
  • push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對(duì)消息的生產(chǎn)和消費(fèi)是異步的。
  • Kafka集群中broker之間的關(guān)系:不是主從關(guān)系,各個(gè)broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個(gè)broker節(jié)點(diǎn)。
  • 負(fù)載均衡方面: Kafka提供了一個(gè) metadata API來(lái)管理broker之間的負(fù)載(對(duì)Kafka0.8.x而言,對(duì)于0.7.x主要靠zookeeper來(lái)實(shí)現(xiàn)負(fù)載均衡)。
  • 同步異步:Producer采用異步push方式,極大提高Kafka系統(tǒng)的吞吐率(可以通過(guò)參數(shù)控制是采用同步還是異步方式)。
  • 分區(qū)機(jī)制partition:Kafka的broker端支持消息分區(qū),Producer可以決定把消息發(fā)到哪個(gè)分區(qū),在一個(gè)分區(qū)中消息的順序就是Producer發(fā)送消息的順序,一個(gè)主題中可以有多個(gè)分區(qū),具體分區(qū)的數(shù)量是可配置的。分區(qū)的意義很重大,后面的內(nèi)容會(huì)逐漸體現(xiàn)。
  • 離線數(shù)據(jù)裝載:Kafka由于對(duì)可拓展的數(shù)據(jù)持久化的支持,它也非常適合向Hadoop或者數(shù)據(jù)倉(cāng)庫(kù)中進(jìn)行數(shù)據(jù)裝載。
  • 插件支持:現(xiàn)在不少活躍的社區(qū)已經(jīng)開發(fā)出不少插件來(lái)拓展Kafka的功能,如用來(lái)配合Storm、Hadoop、flume相關(guān)的插件

2.3 kafka 應(yīng)用場(chǎng)景

  • 日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log,通過(guò)kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  • 消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者、緩存消息等。
  • 用戶活動(dòng)跟蹤:Kafka經(jīng)常被用來(lái)記錄web用戶或者app用戶的各種活動(dòng),如瀏覽網(wǎng)頁(yè)、搜索、點(diǎn)擊等活動(dòng),這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過(guò)訂閱這些topic來(lái)做實(shí)時(shí)的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉(cāng)庫(kù)中做離線分析和挖掘。
  • 運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來(lái)記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告。
  • 流式處理:比如spark streaming和storm
  • 事件源

在我們?nèi)粘5捻?xiàng)目中可能運(yùn)用到的一些場(chǎng)景:日志記錄,分布式架構(gòu)中多服務(wù)之間數(shù)據(jù)通訊,跨平臺(tái)數(shù)據(jù)通知等。我經(jīng)歷過(guò)的一個(gè)傳統(tǒng)項(xiàng)目使用kafka改造后的一個(gè)場(chǎng)景:---

2.4 Kafka架構(gòu)組件

Kafka中發(fā)布訂閱的對(duì)象是topic。我們可以為每類數(shù)據(jù)創(chuàng)建一個(gè)topic,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時(shí)從多個(gè)topic讀寫數(shù)據(jù)。一個(gè)kafka集群由一個(gè)或多個(gè)broker服務(wù)器組成,它負(fù)責(zé)持久化和備份具體的kafka消息。

  • topic:消息存放的目錄即主題
  • Producer:生產(chǎn)消息到topic的一方
  • Consumer:訂閱topic消費(fèi)消息的一方
  • Broker:Kafka的服務(wù)實(shí)例就是一個(gè)broker
image

2.5 Kafka Topic&Partition

消息發(fā)送時(shí)都被發(fā)送到一個(gè)topic,其本質(zhì)就是一個(gè)目錄,而topic由是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:


image

我們可以看到,每個(gè)Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值。
Kafka集群會(huì)保存所有的消息,不管消息有沒(méi)有被消費(fèi);我們可以設(shè)定消息的過(guò)期時(shí)間,只有過(guò)期的數(shù)據(jù)才會(huì)被自動(dòng)清除以釋放磁盤空間。比如我們?cè)O(shè)置消息過(guò)期時(shí)間為2天,那么這2天內(nèi)的所有消息都會(huì)被保存到集群中,數(shù)據(jù)只有超過(guò)了兩天才會(huì)被清除。
Kafka需要維持的元數(shù)據(jù)只有一個(gè)–消費(fèi)消息在Partition中的offset值,Consumer每消費(fèi)一個(gè)消息,offset就會(huì)加1。其實(shí)消息的狀態(tài)完全是由Consumer控制的,Consumer可以跟蹤和重設(shè)這個(gè)offset值,這樣的話Consumer就可以讀取任意位置的消息。
把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴(kuò)展,每個(gè)Partition可以通過(guò)調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了;第二就是可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫了

三、Kafka 核心組件

3.1 Replications、Partitions 和Leaders

通過(guò)上面介紹的我們可以知道,kafka中的數(shù)據(jù)是持久化的并且能夠容錯(cuò)的。Kafka允許用戶為每個(gè)topic設(shè)置副本數(shù)量,副本數(shù)量決定了有幾個(gè)broker來(lái)存放寫入的數(shù)據(jù)。如果你的副本數(shù)量設(shè)置為3,那么一份數(shù)據(jù)就會(huì)被存放在3臺(tái)不同的機(jī)器上,那么就允許有2個(gè)機(jī)器失敗。一般推薦副本數(shù)量至少為2,這樣就可以保證增減、重啟機(jī)器時(shí)不會(huì)影響到數(shù)據(jù)消費(fèi)。如果對(duì)數(shù)據(jù)持久化有更高的要求,可以把副本數(shù)量設(shè)置為3或者更多。

Kafka中的topic是以partition的形式存放的,每一個(gè)topic都可以設(shè)置它的partition數(shù)量,Partition的數(shù)量決定了組成topic的log的數(shù)量。Producer在生產(chǎn)數(shù)據(jù)時(shí),會(huì)按照一定規(guī)則(這個(gè)規(guī)則是可以自定義的)把消息發(fā)布到topic的各個(gè)partition中。上面將的副本都是以partition為單位的,不過(guò)只有一個(gè)partition的副本會(huì)被選舉成leader作為讀寫用。

關(guān)于如何設(shè)置partition值需要考慮的因素。一個(gè)partition只能被一個(gè)消費(fèi)者消費(fèi)(一個(gè)消費(fèi)者可以同時(shí)消費(fèi)多個(gè)partition),因此,如果設(shè)置的partition的數(shù)量小于consumer的數(shù)量,就會(huì)有消費(fèi)者消費(fèi)不到數(shù)據(jù)。所以,推薦partition的數(shù)量一定要大于同時(shí)運(yùn)行的consumer的數(shù)量。另外一方面,建議partition的數(shù)量大于集群broker的數(shù)量,這樣leader partition就可以均勻的分布在各個(gè)broker中,最終使得集群負(fù)載均衡。在Cloudera,每個(gè)topic都有上百個(gè)partition。需要注意的是,kafka需要為每個(gè)partition分配一些內(nèi)存來(lái)緩存消息數(shù)據(jù),如果partition數(shù)量越大,就要為kafka分配更大的heap space。

(1)下面以一個(gè)Kafka集群中4個(gè)Broker舉例,創(chuàng)建1個(gè)topic包含4個(gè)Partition,2 Replication;數(shù)據(jù)Producer流動(dòng)如圖所示:


image

(2)當(dāng)集群中新增2節(jié)點(diǎn),Partition增加到6個(gè)時(shí)分布情況如下:

image

3.2 Producers

Producers直接發(fā)送消息到broker上的leader partition,不需要經(jīng)過(guò)任何中介一系列的路由轉(zhuǎn)發(fā)。為了實(shí)現(xiàn)這個(gè)特性,kafka集群中的每個(gè)broker都可以響應(yīng)producer的請(qǐng)求,并返回topic的一些元信息,這些元信息包括哪些機(jī)器是存活的,topic的leader partition都在哪,現(xiàn)階段哪些leader partition是可以直接被訪問(wèn)的。

Producer客戶端自己控制著消息被推送到哪些partition。實(shí)現(xiàn)的方式可以是隨機(jī)分配、實(shí)現(xiàn)一類隨機(jī)負(fù)載均衡算法,或者指定一些分區(qū)算法。Kafka提供了接口供用戶實(shí)現(xiàn)自定義的分區(qū),用戶可以為每個(gè)消息指定一個(gè)partitionKey,通過(guò)這個(gè)key來(lái)實(shí)現(xiàn)一些hash分區(qū)算法。比如,把userid作為partitionkey的話,相同userid的消息將會(huì)被推送到同一個(gè)分區(qū)。

以Batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafkaProducer可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè)batch發(fā)送請(qǐng)求。Batch的數(shù)量大小可以通過(guò)Producer的參數(shù)控制,參數(shù)值可以設(shè)置為累計(jì)的消息的數(shù)量(如500條)、累計(jì)的時(shí)間間隔(如100ms)或者累計(jì)的數(shù)據(jù)大小(64KB)。通過(guò)增加batch的大小,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤IO的次數(shù),當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡。

Producers可以異步的并行的向kafka發(fā)送消息,但是通常producer在發(fā)送完消息之后會(huì)得到一個(gè)future響應(yīng),返回的是offset值或者發(fā)送過(guò)程中遇到的錯(cuò)誤。這其中有個(gè)非常重要的參數(shù)“acks”,這個(gè)參數(shù)決定了producer要求leader partition 收到確認(rèn)的副本個(gè)數(shù),如果acks設(shè)置數(shù)量為0,表示producer不會(huì)等待broker的響應(yīng),所以,producer無(wú)法知道消息是否發(fā)送成功,這樣有可能會(huì)導(dǎo)致數(shù)據(jù)丟失,但同時(shí),acks值為0會(huì)得到最大的系統(tǒng)吞吐量。若acks設(shè)置為1,表示producer會(huì)在leader partition收到消息時(shí)得到broker的一個(gè)確認(rèn),這樣會(huì)有更好的可靠性,因?yàn)榭蛻舳藭?huì)等待直到broker確認(rèn)收到消息。若設(shè)置為-1,producer會(huì)在所有備份的partition收到消息時(shí)得到broker的確認(rèn),這個(gè)設(shè)置可以得到最高的可靠性保證。

3.3 Consumers

Kafka提供了兩套consumer api,分為high-levelapi和sample-api。

Sample-api是一個(gè)底層的API,它維持了一個(gè)和單一
broker的連接,并且這個(gè)API是完全無(wú)狀態(tài)的,每次請(qǐng)求都需要指定offset值,因此,這套API也是最靈活的。
在kafka中,當(dāng)前讀到消息的offset值是由consumer來(lái)維護(hù)的,因此,consumer可以自己決定如何讀取kafka中的數(shù)據(jù)。比如,consumer可以通過(guò)重設(shè)offset值來(lái)重新消費(fèi)已消費(fèi)過(guò)的數(shù)據(jù)。不管有沒(méi)有被消費(fèi),kafka會(huì)保存數(shù)據(jù)一段時(shí)間,這個(gè)時(shí)間周期是可配置的,只有到了過(guò)期時(shí)間,kafka才會(huì)刪除這些數(shù)據(jù)。

High-level API封裝了對(duì)集群中一系列broker的訪問(wèn),可以透明的消費(fèi)一個(gè)topic。它自己維持了已消費(fèi)消息的狀態(tài),即每次消費(fèi)的都是下一個(gè)消息。
High-level API還支持以組的形式消費(fèi)topic,如果consumers有同一個(gè)組名,那么kafka就相當(dāng)于一個(gè)隊(duì)列消息服務(wù),而各個(gè)consumer均衡的消費(fèi)相應(yīng)partition中的數(shù)據(jù)。若consumers有不同的組名,那么此時(shí)kafka就相當(dāng)與一個(gè)廣播服務(wù),會(huì)把topic中的所有消息廣播到每個(gè)consumer。

消費(fèi)結(jié)構(gòu)

image

消費(fèi)原理

image

3.4 kafka中partition和消費(fèi)者對(duì)應(yīng)關(guān)系

1個(gè)partition只能被同組的一個(gè)consumer消費(fèi),同組的consumer則起到均衡效果

  • 消費(fèi)者數(shù)量為2 ,partition數(shù)量1,此時(shí)partition和消費(fèi)者進(jìn)程對(duì)應(yīng)關(guān)系如下:
image
  • 消費(fèi)者數(shù)量為2 ,partition數(shù)量3,此時(shí)partition和消費(fèi)者進(jìn)程對(duì)應(yīng)關(guān)系如下:
image
  • 消費(fèi)者數(shù)量為4 ,partition數(shù)量3,此時(shí)partition和消費(fèi)者進(jìn)程對(duì)應(yīng)關(guān)系如下:
image

四、Kafka核心特性

4.1 壓縮

我們上面已經(jīng)知道了Kafka支持以集合(batch)為單位發(fā)送消息,在此基礎(chǔ)上,Kafka還支持對(duì)消息集合進(jìn)行壓縮,Producer端可以通過(guò)GZIP或Snappy格式對(duì)消息集合進(jìn)行壓縮。Producer端進(jìn)行壓縮之后,在Consumer端需進(jìn)行解壓。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量,減輕對(duì)網(wǎng)絡(luò)傳輸?shù)膲毫?,在?duì)大數(shù)據(jù)處理上,瓶頸往往體現(xiàn)在網(wǎng)絡(luò)上而不是CPU(壓縮和解壓會(huì)耗掉部分CPU資源)。
那么如何區(qū)分消息是壓縮的還是未壓縮的呢,Kafka在消息頭部添加了一個(gè)描述壓縮屬性字節(jié),這個(gè)字節(jié)的后兩位表示消息的壓縮采用的編碼,如果后兩位為0,則表示消息未被壓縮。

4.2消息可靠性

在消息系統(tǒng)中,保證消息在生產(chǎn)和消費(fèi)過(guò)程中的可靠性是十分重要的,在實(shí)際消息傳遞過(guò)程中,可能會(huì)出現(xiàn)如下三中情況:

  • 一個(gè)消息發(fā)送失敗
  • 一個(gè)消息被發(fā)送多次
  • 最理想的情況:exactly-once ,一個(gè)消息發(fā)送成功且僅發(fā)送了一次

有許多系統(tǒng)聲稱它們實(shí)現(xiàn)了exactly-once,但是它們其實(shí)忽略了生產(chǎn)者或消費(fèi)者在生產(chǎn)和消費(fèi)過(guò)程中有可能失敗的情況。比如雖然一個(gè)Producer成功發(fā)送一個(gè)消息,但是消息在發(fā)送途中丟失,或者成功發(fā)送到broker,也被consumer成功取走,但是這個(gè)consumer在處理取過(guò)來(lái)的消息時(shí)失敗了。

從Producer端看:Kafka是這么處理的,當(dāng)一個(gè)消息被發(fā)送后,Producer會(huì)等待broker成功接收到消息的反饋(可通過(guò)參數(shù)控制等待時(shí)間),如果消息在途中丟失或是其中一個(gè)broker掛掉,Producer會(huì)重新發(fā)送(我們知道Kafka有備份機(jī)制,可以通過(guò)參數(shù)控制是否等待所有備份節(jié)點(diǎn)都收到消息)。

從Consumer端看:前面講到過(guò)partition,broker端記錄了partition中的一個(gè)offset值,這個(gè)值指向Consumer下一個(gè)即將消費(fèi)message。當(dāng)Consumer收到了消息,但卻在處理過(guò)程中掛掉,此時(shí)Consumer可以通過(guò)這個(gè)offset值重新找到上一個(gè)消息再進(jìn)行處理。Consumer還有權(quán)限控制這個(gè)offset值,對(duì)持久化到broker端的消息做任意處理。

4.3 備份機(jī)制

備份機(jī)制是Kafka0.8版本的新特性,備份機(jī)制的出現(xiàn)大大提高了Kafka集群的可靠性、穩(wěn)定性。有了備份機(jī)制后,Kafka允許集群中的節(jié)點(diǎn)掛掉后而不影響整個(gè)集群工作。一個(gè)備份數(shù)量為n的集群允許n-1個(gè)節(jié)點(diǎn)失敗。在所有備份節(jié)點(diǎn)中,有一個(gè)節(jié)點(diǎn)作為lead節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)保存了其它備份節(jié)點(diǎn)列表,并維持各個(gè)備份間的狀體同步。下面這幅圖解釋了Kafka的備份機(jī)制:

image

4.4 Kafka高效性相關(guān)設(shè)計(jì)

  • 4.4.1 消息的持久化
  • 4.4.2 常數(shù)時(shí)間性能保證
  • 4.4.3 進(jìn)一步提高效率

五、Kafka部署

5.1 單機(jī)部署

zookeeper zoo.cfg 配置

# The number of milliseconds of each tick
#是Zookeeper獨(dú)立的工作時(shí)間單元
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
#
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper/zookeeper0
# the port at which the clients will connect
clientPort=2180
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

kafka server.properties 配置

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.60.96.143:9091

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
#server用來(lái)處理請(qǐng)求的I/O線程的數(shù)目;這個(gè)線程數(shù)目至少要等于硬盤的個(gè)數(shù)。
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#如果創(chuàng)建topic時(shí)沒(méi)有給出劃分partitions個(gè)數(shù),這個(gè)數(shù)字將是topic下partitions數(shù)目的默認(rèn)數(shù)值。
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
#每個(gè)數(shù)據(jù)目錄用來(lái)日志恢復(fù)的線程數(shù)目
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
#日志有效時(shí)間
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log文件尺寸
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
#檢查日志分段文件的間隔時(shí)間,以確定是否文件屬性是否到達(dá)刪除要求。
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.60.96.143:2180

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#enable delete topic
delete.topic.enable=true

常用的命令工具

topi命令

  • bin/kafka-topics.sh --list --zookeeper 10.60.96.142:2182
  • bin/kafka-topics.sh --create --zookeeper 10.60.96.142:2182--replication-factor 1 --partitions 1 --topic test
  • bin/kafka-topics.sh --zookeeper 10.60.96.142:2182 --delete --topic "test"
  • bin/kafka-topics.sh --zookeeper 10.60.96.142:2182 --topic "test" --describe

啟動(dòng)kafka 服務(wù)

  • bin/kafka-server-start.sh -daemon config/server3.properties

查看log信息

  • bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/topic1-0/00000000000000000000.log --print-data-log

測(cè)試

#創(chuàng)建Topic
./kafka-topics.sh --create --zookeeper 10.60.96.143:2181 --replication-factor 2 --partitions 1 --topic shuaige
#解釋
--replication-factor 2   #復(fù)制兩份
--partitions 1 #創(chuàng)建1個(gè)分區(qū)
--topic #主題為shuaige

'''在一臺(tái)服務(wù)器上創(chuàng)建一個(gè)發(fā)布者'''
#創(chuàng)建一個(gè)broker,發(fā)布者
./kafka-console-producer.sh --broker-list 10.60.96.143:9092 --topic test

'''在一臺(tái)服務(wù)器上創(chuàng)建一個(gè)訂閱者'''
./kafka-console-consumer.sh --zookeeper 10.60.96.143:2181 --topic test --from-beginning

5.2 集群部署

5.2 配置詳解

zookeeper配置 zoo.ccfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper/zookeeper1
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.0=127.0.0.1:8880:7770  
server.1=127.0.0.1:8881:7771  
server.2=127.0.0.1:8882:7772 

kafka server.properties 配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.60.96.143:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1


# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.60.96.143:2181,10.60.96.143:2182,10.60.96.143:2183

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#enable delete topic
delete.topic.enable=true

******producer配置******

#參與消息確認(rèn)的broker數(shù)量控制,0代表不需要任何確認(rèn) 1代表需要leader replica確認(rèn) -1代表需要ISR中所有進(jìn)行確認(rèn)
request.required.acks 0

#從發(fā)送請(qǐng)求到收到ACK確認(rèn)等待的最長(zhǎng)時(shí)間(超時(shí)時(shí)間)
request.timeout.ms  10000

#設(shè)置消息發(fā)送模式,默認(rèn)是同步方式, async異步模式下允許消息累計(jì)到一定量或一段時(shí)間又另外線程批量發(fā)送,吞吐量好但丟失數(shù)據(jù)風(fēng)險(xiǎn)增大
producer.type sync

#消息序列化類實(shí)現(xiàn)方式,默認(rèn)是byte[]數(shù)組形式
serializer.class kafka.serializer.DefaultEncoder

#kafka消息分區(qū)策略實(shí)現(xiàn)方式,默認(rèn)是對(duì)key進(jìn)行hash
partitioner.class kafka.producer.DefaultPartitioner

#對(duì)發(fā)送的消息采取的壓縮編碼方式,有none|gzip|snappy
compression.codec none

#指定哪些topic的message需要壓縮
compressed.topics  null

#消息發(fā)送失敗的情況下,重試發(fā)送的次數(shù) 存在消息發(fā)送是成功的,只是由于網(wǎng)絡(luò)導(dǎo)致ACK沒(méi)收到的重試,會(huì)出現(xiàn)消息被重復(fù)發(fā)送的情況
message.send.max.retries 3

#在開始重新發(fā)起metadata更新操作需要等待的時(shí)間
retry.backoff.ms 100

#metadata刷新間隔時(shí)間,如果負(fù)值則失敗的時(shí)候才會(huì)刷新,如果0則每次發(fā)送后都刷新,正值則是一種周期行為
topic.metadata.refresh.interval.ms 600 * 1000

#異步發(fā)送模式下,緩存數(shù)據(jù)的最長(zhǎng)時(shí)間,之后便會(huì)被發(fā)送到broker
queue.buffering.max.ms 5000

#producer端異步模式下最多緩存的消息條數(shù)
queue.buffering.max.messages 10000

#0代表隊(duì)列沒(méi)滿的時(shí)候直接入隊(duì),滿了立即扔棄,-1代表無(wú)條件阻塞且不丟棄
queue.enqueue.timeout.ms -1

#一次批量發(fā)送需要達(dá)到的消息條數(shù),當(dāng)然如果queue.buffering.max.ms達(dá)到的時(shí)候也會(huì)被發(fā)送
batch.num.messages 200

consumer配置

#指明當(dāng)前消費(fèi)進(jìn)程所屬的消費(fèi)組,一個(gè)partition只能被同一個(gè)消費(fèi)組的一個(gè)消費(fèi)者消費(fèi)
group.id

#針對(duì)一個(gè)partition的fetch request所能拉取的最大消息字節(jié)數(shù),必須大于等于Kafka運(yùn)行的最大消息
fetch.message.max.bytes  1024 * 1024

#是否自動(dòng)周期性提交已經(jīng)拉取到消費(fèi)端的消息offset
auto.commit.enable true

#自動(dòng)提交offset到zookeeper的時(shí)間間隔
auto.commit.interval.ms  60 * 1000

#消費(fèi)均衡的重試次數(shù)
rebalance.max.retries  4

#消費(fèi)均衡兩次重試之間的時(shí)間間隔
rebalance.backoff.ms 2000

#當(dāng)重新去獲取partition的leader前需要等待的時(shí)間
refresh.leader.backoff.ms   200

#如果zookeeper上沒(méi)有offset合理的初始值情況下獲取第一條消息開始的策略smallest|largeset
auto.offset.reset largest

#如果其超時(shí),將會(huì)可能觸發(fā)rebalance并認(rèn)為已經(jīng)死去
zookeeper.session.timeout.ms  6000

#確認(rèn)zookeeper連接建立操作客戶端能等待的最長(zhǎng)時(shí)間
zookeeper.connection.timeout.ms 6000

其他kafka配置詳解

Kafka配置說(shuō)明

六、Kafka消息結(jié)構(gòu)解析

內(nèi)部數(shù)據(jù)結(jié)構(gòu):

  • -- Topic (名字)
  • -- PartitionID ( 可選)
  • -- Key[( 可選 )
  • -- Value

生產(chǎn)者記錄(簡(jiǎn)稱PR)的發(fā)送邏輯:

  • 若指定Partition ID,則PR被發(fā)送至指定Partition
  • 若未指定Partition ID,但指定了Key, PR會(huì)按照hasy(key)發(fā)送至對(duì)應(yīng)Partition
  • 若既未指定Partition ID也沒(méi)指定Key,PR會(huì)按照round-robin模式發(fā)送到每個(gè)Partition
  • 若同時(shí)指定了Partition ID和Key, PR只會(huì)發(fā)送到指定的Partition (Key不起作用,代碼邏輯決定)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容