Kafka生產(chǎn)過程分析
寫入方式
producer采用推(push)模式將消息發(fā)布到broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高,保障kafka吞吐率)
分區(qū)(Partition)
消息發(fā)送時都被發(fā)送到一個topic,其本質(zhì)就是一個目錄,而topic是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示


每個Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值
分區(qū)的原因
- 方便在集群中擴展,每個Partition可以通過調(diào)整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數(shù)據(jù)了;
- 可以提高并發(fā),因為可以以Partition為單位讀寫了
分區(qū)的原則
- 指定了patition,則直接使用;
- 未指定patition但指定key,通過對key的value進行hash出一個patition;
- patition和key都未指定,使用輪詢選出一個patition
副本(Replication)
同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費,同時producer也不能再將數(shù)據(jù)存于其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication作為follower從leader 中復制數(shù)據(jù)
寫入流程

- producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader
- producer將消息發(fā)送給該leader
- leader將消息寫入本地log
- followers從leader pull消息
- 寫入本地log后向leader發(fā)送ACK
- leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer發(fā)送ACK
Broker 保存消息
存儲方式
物理上把topic分成一個或多個patition(對應 server.properties 中的num.partitions=3配置),每個patition物理上對應一個文件夾(該文件夾存儲該patition的所有消息和索引文件),如下
[hadoop@hadoop-100 logs]$ kafka-topics.sh --zookeeper hadoop-101:2181 --create --topic second --partitions 3 --replication-factor 3
Created topic "second".
[hadoop@hadoop-100 logs]$ ll
總用量 312
drwxrwxr-x 2 hadoop hadoop 141 11月 13 17:42 second-0
drwxrwxr-x 2 hadoop hadoop 141 11月 13 17:42 second-1
drwxrwxr-x 2 hadoop hadoop 141 11月 13 17:42 second-2
[hadoop@hadoop-100 logs]$ cd second-0
[hadoop@hadoop-100 second-0]$ ll
總用量 0
-rw-rw-r-- 1 hadoop hadoop 10485760 11月 13 17:42 00000000000000000000.index
-rw-rw-r-- 1 hadoop hadoop 0 11月 13 17:42 00000000000000000000.log
-rw-rw-r-- 1 hadoop hadoop 10485756 11月 13 17:42 00000000000000000000.timeindex
-rw-rw-r-- 1 hadoop hadoop 0 11月 13 17:42 leader-epoch-checkpoint
存儲策略
無論消息是否被消費,kafka都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù)
- 基于時間:log.retention.hours=168
- 基于大?。簂og.retention.bytes=1073741824
需要注意的是,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)
Zookeeper存儲結(jié)構(gòu)

Kafka消費過程分析
kafka提供了兩套consumer API,高級Consumer API和低級Consumer API
高級API
高級API優(yōu)點
- 高級API 寫起來簡單
- 不需要自行去管理offset,系統(tǒng)通過zookeeper自行管理
- 不需要管理分區(qū),副本等情況,.系統(tǒng)自動管理
- 消費者斷線會自動根據(jù)上一次記錄在zookeeper中的offset去接著獲取數(shù)據(jù)(默認設置1分鐘更新一下zookeeper中存的offset)
- 可以使用group來區(qū)分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響)
高級API缺點
- 不能自行控制offset(對于某些特殊需求來說)
- 不能細化控制如分區(qū)、副本、zk等
低級API
低級 API 優(yōu)點
- 能夠讓開發(fā)者自己控制offset,想從哪里讀取就從哪里讀取。
- 自行控制連接分區(qū),對分區(qū)自定義進行負載均衡
- 對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內(nèi)存中)
低級API缺點
太過復雜,需要自行控制offset,連接哪個分區(qū),找到分區(qū)leader 等
消費者組

消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區(qū)在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。在上圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區(qū),另外兩個分別讀取一個分區(qū)。某個消費者讀取某個分區(qū),也可以叫做某個消費者是某個分區(qū)的擁有者。
在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區(qū)
消費方式
consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
push(推)模式很難適應消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務以及網(wǎng)絡擁塞。而pull模式則可以根據(jù)consumer的消費能力以適當?shù)乃俾氏M消息。
對于Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。
pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直等待數(shù)據(jù)到達。為了避免這種情況,我們在我們的拉請求中有參數(shù),允許消費者請求在等待數(shù)據(jù)到達的“長輪詢”中進行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大?。?/p>
消費者組案例
需求:測試同一個消費者組中的消費者,同一時刻只能有一個消費者消費
步驟
- 在hadoop-100和hadoop-101上修改/opt/module/kafka/consumer.properties,配置group.id
group.id=hadoop - 啟動消費者
[hadoop@hadoop-100 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop-100:2181 --topic first --consumer.config config/consumer.properties
[hadoop@hadoop-101 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop-100:2181 --topic first --consumer.config config/consumer.properties - 啟動生產(chǎn)者
[hadoop@hadoop-102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop-102:9092 --topic first -
只有一個消費者得到消息
消費者組案例.png
