Kafka筆記

一、背景知識(shí)

Kafka定義

傳統(tǒng)定義:Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。

最新定義:Kafka 是一個(gè)開源的分布式事件流平臺(tái),被數(shù)千家公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。

消息隊(duì)列

傳統(tǒng)的消息隊(duì)列的主要應(yīng)用場(chǎng)景包括: 緩存/消峰、 解耦和異步通信。目前企業(yè)中比較常見的消息隊(duì)列產(chǎn)品主要有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。

消息隊(duì)列的兩種模式:

  • 點(diǎn)對(duì)點(diǎn)模式:一對(duì)一,消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后消息清除。該模式使用較少
  • 發(fā)布/ 訂閱模式:一對(duì)多,消息生產(chǎn)者將消息發(fā)布到 topic 中,同時(shí)有多個(gè)消費(fèi)者消費(fèi)該消息,消費(fèi)之后不會(huì)清除消息

二、Kafka架構(gòu)

Kafka架構(gòu)
  1. Producer:消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端
  2. Consumer:消息消費(fèi)者,向 kafka broker 取消息的客戶端
  3. Consumer Group:消費(fèi)者組,由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響,所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者
  4. Broker:一臺(tái) kafka 服務(wù)器就是一個(gè) broker,一個(gè)集群由多個(gè) broker 組成。一個(gè) broker可以容納多個(gè) topic
  5. Topic:可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic
  6. Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列
  7. Replica:副本,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 partition 數(shù)據(jù)不丟失,且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機(jī)制,一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本,其中有一個(gè) leader 和若干個(gè) follower
  8. Leader:每個(gè)分區(qū)多個(gè)副本的主,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是 leader。由 zk 記錄誰是 leader,2.8.0 版本以后也可以配置不使用 zk
  9. Follower:每個(gè)分區(qū)多個(gè)副本中的從,實(shí)時(shí)從 leader 中同步數(shù)據(jù),保持和 leader 數(shù)據(jù)的同步。leader 發(fā)生故障時(shí),某個(gè) follower 會(huì)成為新的 follower。

三、生產(chǎn)者

3.1 消息發(fā)送流程

在消息發(fā)送的過程中,涉及到了兩個(gè)線程:main 線程和 sender 線程。在 main 線程中創(chuàng)建了一個(gè)雙端隊(duì)列 RecordAccumulator。Main 線程將消息發(fā)送給 RecordAccumulator,sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 broker。

消息發(fā)送流程

幾個(gè)重要參數(shù):

  • buffer.memory:RecordAccumulator 緩沖區(qū)總大小,默認(rèn) 32m
  • batch.size:緩沖區(qū)一批數(shù)據(jù)最大值,默認(rèn)16k。適當(dāng)增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加
  • linger.ms:如果數(shù)據(jù)遲遲未達(dá)到 batch.size, sender 等待 linger.time 之后就會(huì)發(fā)送數(shù)據(jù)。單位 ms,默認(rèn)值是 0ms, 表示沒有延遲。生產(chǎn)環(huán)境建議該值大小為 5-100ms 之間
  • acks:Kafka 提供了三種可靠性級(jí)別,用戶根據(jù)對(duì)可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置:
    0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答
    1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),leader 收到數(shù)據(jù)后應(yīng)答
    -1(all):生產(chǎn)者發(fā)送過來的數(shù)據(jù),leader 和 ISR(和 leader 保持同步的 follower 集合) 隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。 默認(rèn)值是-1,-1 和 all 是等價(jià)的
  • compression.type:生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是 none,也就是不壓縮。支持壓縮類型:none、gzip、snappy、lz4 和 zstd
  • max.in.flight.requests.per.connection:允許最多沒有返回 ack 的次數(shù),默認(rèn)為 5,開啟冪等性要保證該值是 1-5 的數(shù)字

幾種消息發(fā)送方式:

  • 普通異步發(fā)送
  • 帶回調(diào)函數(shù)的異步 api
  • 同步 api

3.2 分區(qū)

分區(qū)的好處:

  • 方便在集群中擴(kuò)展,每個(gè) partition 可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè) topic 又可以有多個(gè) Partition 組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了
  • 可以提高并發(fā),因?yàn)榭梢砸?partition 為單位生產(chǎn)/消費(fèi)數(shù)據(jù)了

生產(chǎn)者發(fā)送消息的分區(qū)策略:

  1. 指明 partition 的情況下,直接將指明的值直接作為 partiton 值
  2. 沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值
  3. 既沒有 partition 值又沒有 key 值的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增),將這個(gè)值與 topic 可用的 partition 總數(shù)取余得到 partition 值,也就是常說的 round-robin 輪詢算法

3.3 生產(chǎn)經(jīng)驗(yàn)

生產(chǎn)者如何提高吞吐量

  1. 調(diào)整批次大?。喝鐚?batch.size 由16k調(diào)整為32k
  2. 調(diào)整Sender線程等待時(shí)間:如將 linger.ms 由0調(diào)整為5-100ms
  3. 壓縮策略:如將 compression.type 設(shè)為 snappy
  4. 調(diào)整緩存大?。喝鐚?buffer.memory 由32m調(diào)整為64m

數(shù)據(jù)可靠性

Ack應(yīng)答級(jí)別:

  • acks=0,生產(chǎn)者發(fā)送數(shù)據(jù)后就不管了,可靠性差,效率高
  • acks=1,生產(chǎn)者發(fā)送數(shù)據(jù)后 leader 應(yīng)答即可,可靠性中等,效率中等
  • acks=-1,生產(chǎn)者發(fā)送數(shù)據(jù)后 leader 和 ISR 隊(duì)列中所有 follower 應(yīng)答才行,可靠性高,效率低

生產(chǎn)環(huán)境中,acks=0 很少使用;acks=1,一般用于傳輸普通日志,允許丟失個(gè)別數(shù)據(jù);acks=-1,一般用于傳輸和交易相關(guān)等對(duì)可靠性要求較高的場(chǎng)景。

數(shù)據(jù)完全可靠條件 = ACK級(jí)別為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)大于等于2

數(shù)據(jù)重復(fù)性

至少一次(At Least Once)= ACK級(jí)別為1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)大于等于2。不能保證數(shù)據(jù)不重復(fù)。

最多一次(At Most Once)= ACK級(jí)別為0。不能保證數(shù)據(jù)不丟失。

精確一次(Exactly Once)= 冪等性 + 至少一次。冪等性默認(rèn)開啟,但只能保證在單分區(qū)單會(huì)話內(nèi)不重復(fù),如果需要全局嚴(yán)格一致,則需要開啟事務(wù)(開啟事務(wù)的前提是開啟冪等性)。

數(shù)據(jù)順序

單分區(qū)內(nèi),可以配置為有序:多分區(qū),分區(qū)與分區(qū)間無序。

單分區(qū)有序的條件:

  • 1.x 版本之前:max.in.flight.requests.per.connection = 1
  • 1.x 及之后版本:
    (1)若未開啟冪等性
    配置 max.in.flight.requests.per.connection = 1
    (2)若開啟冪等性
    配置 max.in.flight.requests.per.connection <= 5。其原理是 1.x 版本后,如果開啟冪等,kafka 服務(wù)端會(huì)緩存生產(chǎn)者發(fā)來的最近5個(gè) requests 的元數(shù)據(jù),因此可以保證最近5個(gè) requests 的數(shù)據(jù)是有序的。

四、Broker

4.1 Broker啟動(dòng)流程

Kafka 集群中有一個(gè) broker 的 controller 會(huì)被選舉為 controller leader,負(fù)責(zé)管理集群 broker 的上下線、所有 topic 的分區(qū)副本分配和 leader 選舉等工作。Controller 的信息同步工作是依賴于 zookeeper 的(2.8.0 版本以后也可以不依賴)。

Broker啟動(dòng)流程

4.2 副本與故障處理

副本

副本的作用是提高數(shù)據(jù)可靠性,Kafka 默認(rèn)副本1個(gè),生產(chǎn)環(huán)境一般配置為2個(gè),保證數(shù)據(jù)可靠性;太多副本會(huì)增加磁盤存儲(chǔ)空間,增加網(wǎng)絡(luò)上數(shù)據(jù)傳輸,降低效率。

Kafka 中副本分為:leader 和 follower。Kafka 生產(chǎn)者只會(huì)把數(shù)據(jù)發(fā)往 leader,
然后 follower 找 leader 進(jìn)行同步數(shù)據(jù)。

幾個(gè)重要概念:

  • AR:Kafka 分區(qū)中的所有副本統(tǒng)稱為(Assigned Repllicas)。AR = ISR + OSR
  • ISR:表示和 leader 保持同步的 follower集合。如果 follower 長時(shí)間未向 leader 發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該 follower 將被踢出 ISR。該時(shí)間閾值由 replica.lag.time.max.ms 參數(shù)設(shè)定,默認(rèn)30s。Leader 發(fā)生故障之后,就會(huì)從 ISR 中選舉新的 leader
  • OSR:表示 follower 與 leader 副本同步時(shí),延遲過多的副本
  • LEO:Log End Offset,每個(gè)副本的最新的 offset + 1
  • HW:High Watermart,所有副本中最小的 LEO

Follower 故障

  1. Follower 發(fā)生故障后會(huì)被臨時(shí)提出 ISR
  2. 這個(gè)期間 leader 和 follower 繼續(xù)接受數(shù)據(jù)
  3. 待該 follower 恢復(fù)后,follower 會(huì)讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 leader 進(jìn)行同步
  4. 等該 follower 的 LEO 大于等于該分區(qū)的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了

Leader 故障

  1. Leader 發(fā)生故障之后,會(huì)從 ISR 中選出一個(gè)新的 leader
  2. 為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的 follower 會(huì)先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader 同步數(shù)據(jù)

注意: 這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。如何保證?見上一節(jié)數(shù)據(jù)可靠性。

4.3 文件存儲(chǔ)

Topic 是邏輯上的概念,而 partition 是物理上的概念,每個(gè) partition 對(duì)應(yīng)一個(gè) log 文件,該文件中存儲(chǔ)的就是 producer 生產(chǎn)的數(shù)據(jù)。Producer 生產(chǎn)的數(shù)據(jù)會(huì)不斷追加到該 log 文件末端。為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下,kafka 采取了分片和索引機(jī)制,將每個(gè) partition 分為多個(gè) segment。每個(gè) segment 包括:.index 文件、.log 文件和 .timeindex 等文件,這些文件位于一個(gè)文件夾下,該文件夾命名規(guī)則:topic 名稱 + 分區(qū)序號(hào),例如:first-0。

文件存儲(chǔ)機(jī)制

兩個(gè)重要參數(shù):

  • log.segment.bytes:log 日志劃分成塊(即 segment)的大小,默認(rèn)值1G
  • log.index.interval.bytes:默認(rèn)4kb,每當(dāng)寫入了4kb大小的日志(.log),然后就往 index 文件里面記錄一個(gè)索引(稀疏索引)

Log 文件和 Index 文件示例

文件示例

高效讀寫數(shù)據(jù)

Kafka 如何做到高效讀寫數(shù)據(jù)?

  1. Kafka 本身是分布式集群,可以采用分區(qū)技術(shù),并行度高
  2. 讀數(shù)據(jù)采用稀疏索引,可以快速定位要消費(fèi)的數(shù)據(jù)
  3. 順序?qū)懘疟P,生產(chǎn)者數(shù)據(jù)是一直追加到 log 文件末端的順序?qū)懀樞驅(qū)?600M/s vs 隨機(jī)寫 100K/s)
  4. 零拷貝+頁緩存技術(shù)
    零拷貝:Kafka 的數(shù)據(jù)加工處理由生產(chǎn)者和消費(fèi)者處理,broker 應(yīng)用層不關(guān)心存儲(chǔ)的數(shù)據(jù),所以就不用了走應(yīng)用層,傳輸效率高。
    頁緩存:操作系統(tǒng)提供,當(dāng)上層由寫操作時(shí),操作系統(tǒng)只是將數(shù)據(jù)寫入 PageCache;讀操作時(shí)先從 PageCache 中查找,找不到再去磁盤中獲取。

關(guān)于零拷貝和頁緩存,具體可以參考:https://zhuanlan.zhihu.com/p/258513662

五、消費(fèi)者

5.1 消費(fèi)方式

Consumer 采用 pull(拉)模式從 broker 中讀取數(shù)據(jù);因?yàn)?push (推)模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者。

Pull 模式不足之處是,如果 kafka 沒有數(shù)據(jù),消費(fèi)者可能會(huì)陷入循環(huán)中,一直返回空數(shù)據(jù)。針對(duì)這一點(diǎn),kafka 的消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)會(huì)傳入一個(gè)時(shí)長參數(shù) timeout,如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi),consumer 會(huì)等待一段時(shí)間之后再返回,這段時(shí)長即為 timeout。

5.2 消費(fèi)者組

消費(fèi)者組(Consumer Group,CG)由多個(gè) consumer 組成。形成一個(gè)消費(fèi)者組的條件,是所有消費(fèi)者的 groupid 相同。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響,所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。

消費(fèi)者組初始化流程:


消費(fèi)者組初始化流程

消費(fèi)者組消費(fèi)流程:


消費(fèi)者組消費(fèi)流程

5.3 分區(qū)的分配與再平衡

一個(gè)消費(fèi)者組中有多個(gè) consumer,一個(gè) topic 有多個(gè) partition,所以必然會(huì)涉及到 partition 的分配問題,即確定那個(gè) partition 由哪個(gè) consumer 來消費(fèi)。當(dāng)消費(fèi)者組里面的消費(fèi)者個(gè)數(shù)發(fā)生改變的時(shí)候,也會(huì)觸發(fā)再平衡。

Kafka 有四種分配策略,可以通過參數(shù) partition.assignment.strategy 來配置,默認(rèn) Range + CooperativeSticky。

  • Range:針對(duì)每個(gè) topic。將 topic 中的分區(qū)與消費(fèi)者排序,通過分區(qū)數(shù)/消費(fèi)者數(shù)決定每個(gè)消費(fèi)者消費(fèi)幾個(gè)分區(qū),若除不盡則前面幾個(gè)消費(fèi)者會(huì)多消費(fèi)1個(gè)分區(qū)。注意,如果有N個(gè) topic,容易產(chǎn)生數(shù)據(jù)傾斜
  • RoundRobin:針對(duì)集群中的所有 topic。把所有分區(qū)和所有的消費(fèi)者都列出來,然后按照 hashcode 進(jìn)行排序,最后通過輪訓(xùn)算法來分配分區(qū)給到各個(gè)消費(fèi)者
  • Sticky:粘性分區(qū)從 0.11.x 版本開始引入,首先會(huì)盡量均衡的放置分區(qū)到消費(fèi)者上面,在出現(xiàn)同一消費(fèi)者組內(nèi)消費(fèi)者出現(xiàn)問題的時(shí)候,會(huì)盡量保持原有分配的分
    區(qū)不變化
  • CooperativeSticky:和 sticky 類似只是支持了cooperative 的 再平衡

5.4 Offset

由于 consumer 在消費(fèi)過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,consumer 恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi),所以 consumer 需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。

Kafka 0.9版本之前,consumer 默認(rèn)將 offset 保存在 zookeeper 中;從 0.9 版本開始,默認(rèn)將 offset 保存在 kafka 一個(gè)內(nèi)置的 topic 中,該 topic 為__consumer_offsets。__consumer_offsets 主題里面采用 key 和 value 的方式存儲(chǔ)數(shù)據(jù)。Key 是 group.id+topic+分區(qū)號(hào),value 就是當(dāng)前 offset 的值。 每隔一段時(shí)間,kafka 內(nèi)部會(huì)對(duì)這個(gè) topic 進(jìn)行 compact,也就是每個(gè) group.id+topic+分區(qū)號(hào) 就保留最新數(shù)據(jù)。

提交 offset

  • 自動(dòng)提交:為了使用戶專注自己的業(yè)務(wù)邏輯,kafka 提供了自動(dòng)提交 offset 的功能,相關(guān)參數(shù):
    enable.auto.commit:是否開啟自動(dòng)提交,默認(rèn) true
    auto.commit.inteval.ms:自動(dòng)提交的時(shí)間間隔,默認(rèn)5s
  • 手動(dòng)提交:包括兩種方式,同步提交(commitSync)和異步提交(commitAsync)

重復(fù)消費(fèi): 已經(jīng)消費(fèi)了數(shù)據(jù),但是 offset 沒提交。
漏消費(fèi): 先提交 offset 后消費(fèi),有可能會(huì)造成數(shù)據(jù)的漏消費(fèi)。

如何避免漏消費(fèi)和重復(fù)消費(fèi),做到精準(zhǔn)一次消費(fèi)呢?這依賴于消費(fèi)者事務(wù),要求消費(fèi)端將消費(fèi)過程和提交 offset 過程做原子綁定,也就是說需要將 offset 保存到支持事務(wù)的自定義介質(zhì)(如 Mysql)。

指定 offset 消費(fèi)

當(dāng) kafka 中沒有初始偏移量(消費(fèi)者組第一次消費(fèi))或服務(wù)器上不再存在當(dāng)前偏移量時(shí)(例如該數(shù)據(jù)已被刪除),該怎么辦?有以下幾種配置:

  • earliest:自動(dòng)將偏移量重置為最早的偏移量
  • latest(默認(rèn)值):自動(dòng)將偏移量重置為最新偏移量
  • none:如果未找到消費(fèi)者組的先前偏移量,則向消費(fèi)者拋出異常
  • 任意指定 offset 位移開始消費(fèi)

5.5 生產(chǎn)經(jīng)驗(yàn)

如何提高吞吐量(避免數(shù)據(jù)積壓)

  • 如果是消費(fèi)能力不足,可以考慮增加 topic 的分區(qū)數(shù),并提升消費(fèi)者組的消費(fèi)者數(shù)量,使消費(fèi)者數(shù) = 分區(qū)數(shù)
  • 如果是下游的數(shù)據(jù)處理不及時(shí),可以提高每批次拉取的數(shù)量。如果拉取數(shù)據(jù)/處理時(shí)間 < 生產(chǎn)速度,即處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會(huì)造成數(shù)據(jù)積壓

六、Kafka-Kraft 模式

kafka架構(gòu)

左圖為 kafka 原有架構(gòu),元數(shù)據(jù)在 zookeeper 中,運(yùn)行時(shí)動(dòng)態(tài)選舉 controller,由 controller 進(jìn)行 kafka 集群管理。右圖為 kraft 模式架構(gòu)(實(shí)驗(yàn)性),不再依賴 zookeeper 集群,而是用三臺(tái) controller 節(jié)點(diǎn)代替 zookeeper,元數(shù)據(jù)保存在 controller 中,由 controller 直接進(jìn)行 kafka 集群管理。這樣做的好處有以下幾個(gè):

  • Kafka 不再依賴外部框架,而是能夠獨(dú)立運(yùn)行
  • Controller 管理集群時(shí),不再需要從 zookeeper 中先讀取數(shù)據(jù),集群性能上升
  • 由于不依賴 zookeeper,集群擴(kuò)展時(shí)不再受到 zookeeper 讀寫能力限制
  • Controller 不再動(dòng)態(tài)選舉,而是由配置文件規(guī)定。這樣我們可以有針對(duì)性的加強(qiáng)
    controller 節(jié)點(diǎn)的配置,而不是像以前一樣對(duì)隨機(jī) controller 節(jié)點(diǎn)的高負(fù)載束手無策
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容