三.Kafka工作流程分析

3.1 Kafka生產(chǎn)過程分析

3.1.1 寫入方式

producer采用推(push)模式將消息發(fā)布到broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高,保障kafka吞吐率)。

3.1.2 分區(qū)(Partition)

Kafka集群有多個消息代理服務(wù)器(broker-server)組成,發(fā)布到Kafka集群的每條消息都有一個類別,用主題(topic)來表示。通常,不同應(yīng)用產(chǎn)生不同類型的數(shù)據(jù),可以設(shè)置不同的主題。一個主題一般會有多個消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生成者寫入的新消息。

Kafka集群為每個主題維護了分布式的分區(qū)(partition)日志文件,物理意義上可以把主題(topic)看作進(jìn)行了分區(qū)的日志文件(partition log)。主題的每個分區(qū)都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日志中。分區(qū)中的每條消息都會按照時間順序分配到一個單調(diào)遞增的順序編號,叫做偏移量(offffset),這個偏移量能夠唯一地定位當(dāng)前分區(qū)中的每一條消息。

消息發(fā)送時都被發(fā)送到一個topic,其本質(zhì)就是一個目錄,而topic是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:

下圖中的topic有3個分區(qū),每個分區(qū)的偏移量都從0開始,不同分區(qū)之間的偏移量都是獨立的,不會相互影響。

我們可以看到,每個Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offffset值。

發(fā)布到Kafka主題的每條消息包括鍵值和時間戳。消息到達(dá)服務(wù)器端的指定分區(qū)后,都會分配到一個自

增的偏移量。原始的消息內(nèi)容和分配的偏移量以及其他一些元數(shù)據(jù)信息最后都會存儲到分區(qū)日志文件中。消息的鍵也可以不用設(shè)置,這種情況下消息會均衡地分布到不同的分區(qū)。

1) 分區(qū)的原因

(1)方便在集群中擴展,每個Partition可以通過調(diào)整以適應(yīng)它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了;

(2)可以提高并發(fā),因為可以以Partition為單位讀寫了。傳統(tǒng)消息系統(tǒng)在服務(wù)端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務(wù)端會以消費存儲的順序依次發(fā)送給消費者。但由于消息是異步發(fā)送給消費者的,消息到達(dá)消費者的順序可能是無序的,這就意味著在并行消費時,傳統(tǒng)消息系統(tǒng)無法很好地保證消息被順序處理。雖然我們可以設(shè)置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執(zhí)行。

Kafka比傳統(tǒng)消息系統(tǒng)有更強的順序性保證,它使用主題的分區(qū)作為消息處理的并行單元。Kafka以分區(qū)作為最小的粒度,將每個分區(qū)分配給消費者組中不同的而且是唯一的消費者,并確保一個分區(qū)只屬于一個消費者,即這個消費者就是這個分區(qū)的唯一讀取線程。那么,只要分區(qū)的消息是有序的,消費者處理的消息順序就有保證。每個主題有多個分區(qū),不同的消費者處理不同的分區(qū),所以Kafka不僅保證了消息的有序性,也做到了消費者的負(fù)載均衡。

2)分區(qū)的原則

(1)指定了patition,則直接使用;

(2)未指定patition但指定key,通過對key的value進(jìn)行hash出一個patition

(3)patition和key都未指定,使用輪詢選出一個patition。

DefaultPartitioner類

public int partition(String topic, Object key, byte[] keyBytes, Object value,

byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {

int nextValue = nextValue(topic);

List<PartitionInfo> availablePartitions =

cluster.availablePartitionsForTopic(topic);

if (availablePartitions.size() > 0) {

int part = Utils.toPositive(nextValue) %

availablePartitions.size();

return availablePartitions.get(part).partition();

} else {

// no partitions are available, give a non-available partition

return Utils.toPositive(nextValue) % numPartitions;

}

} else {

// hash the keyBytes to choose a partition

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

3.1.3 副本(Replication)

同一個partition可能會有多個replication(對應(yīng) server.properties 配置中的default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費,同時producer也不能再將數(shù)據(jù)存于其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication作為follower從leader 中復(fù)制數(shù)據(jù)。

3.1.4 寫入流程

producer寫入消息流程如下:

1)producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader

2)producer將消息發(fā)送給該leader

3)leader將消息寫入本地log

4)followers從leader pull消息,寫入本地log后向leader發(fā)送ACK

5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offffset)并向producer發(fā)送ACK

3.2 Broker 保存消息

3.2.1 存儲方式

物理上把topic分成一個或多個patition(對應(yīng) server.properties 中的num.partitions=3配置),每個patition物理上對應(yīng)一個文件夾(該文件夾存儲該patition的所有消息和索引文件),如下:

[itstar@bigdata11 logs]$ ll

drwxrwxr-x. 2 itstar itstar 4096 8月 6 14:37 first-0

drwxrwxr-x. 2 itstar itstar 4096 8月 6 14:35 first-1

drwxrwxr-x. 2 itstar itstar 4096 8月 6 14:37 first-2

[itstar@bigdata11 logs]$ cd first-0

[itstar@bigdata11 first-0]$ ll

-rw-rw-r--. 1 itstar itstar 10485760 8月 6 14:33 00000000000000000000.index

-rw-rw-r--. 1 itstar itstar 219 8月 6 15:07 00000000000000000000.log

-rw-rw-r--. 1 itstar itstar 10485756 8月 6 14:33 00000000000000000000.timeindex

-rw-rw-r--. 1 itstar itstar 8 8月 6 14:37 leader-epoch-checkpoint

3.2.2 存儲策略

無論消息是否被消費,kafka都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

1)基于時間:log.retention.hours=168

2)基于大?。簂og.retention.bytes=1073741824

需要注意的是,因為Kafka讀取特定消息的時間復(fù)雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)。

3.2.3 Zookeeper存儲結(jié)構(gòu)

注意:producer不在zk中注冊,消費者在zk中注冊。

3.3 Kafka消費過程分析

kafka提供了兩套consumer API:高級Consumer API和低級API。

3.3.1 消費模型

消息由生產(chǎn)者發(fā)布到Kafka集群后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。

基于推送模型(push)的消息系統(tǒng),由消息代理記錄消費者的消費狀態(tài)。消息代理在將消息推送到消費者后,標(biāo)記這條消息為已消費,但這種方式無法很好地保證消息被處理。比如,消息代理把消息發(fā)送出去后,當(dāng)消費進(jìn)程掛掉或者由于網(wǎng)絡(luò)原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經(jīng)把這條消息標(biāo)記為已消費了,但實際上這條消息并沒有被實際處理)。如果要保證消息被處理,消息代理發(fā)送完消息后,要設(shè)置狀態(tài)為“已發(fā)送”,只有收到消費者的確認(rèn)請求后才更新為“已消費”,這就需要消息代理中記錄所有的消費狀態(tài),這種做法顯然是不可取的。

Kafka采用拉取模型,由消費者自己記錄消費狀態(tài),每個消費者互相獨立地順序讀取每個分區(qū)的消息。如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進(jìn)度是3,消費者B的消費進(jìn)度是6。消費者拉取的最大上限通過最高水位(watermark)控制,生產(chǎn)者最新寫入的消息如果還沒有達(dá)到備份數(shù)量,對消費者是不可見的。這種由消費者控制偏移量的優(yōu)點是:消費者可以按照任意的順序消費消息。比如,消費者可以重置到舊的偏移量,重新處理之前已經(jīng)消費過的消息;或者直接跳到最近的位置,從當(dāng)前的時刻開始消費。

在一些消息系統(tǒng)中,消息代理會在消息被消費之后立即刪除消息。如果有不同類型的消費者訂閱同一個主題,消息代理可能需要冗余地存儲同一消息;或者等所有消費者都消費完才刪除,這就需要消息代理跟蹤每個消費者的消費狀態(tài),這種設(shè)計很大程度上限制了消息系統(tǒng)的整體吞吐量和處理延遲。Kafka的做法是生產(chǎn)者發(fā)布的所有消息會一致保存在Kafka集群中,不管消息有沒有被消費。用戶可以通過設(shè)置保留時間來清理過期的數(shù)據(jù),比如,設(shè)置保留策略為兩天。那么,在消息發(fā)布之后,它可以被不同的消費者消費,在兩天之后,過期的消息就會自動清理掉。

3.3.2 高級API

1)高級API優(yōu)點

高級API 寫起來簡單

不需要自行去管理offffset,系統(tǒng)通過zookeeper自行管理。

不需要管理分區(qū),副本等情況,.系統(tǒng)自動管理。

消費者斷線會自動根據(jù)上一次記錄在zookeeper中的offffset去接著獲取數(shù)據(jù)(默認(rèn)設(shè)置1分鐘更新一下

zookeeper中存的offffset)

可以使用group來區(qū)分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offffset,這樣不同程序讀取同一個topic才不會因為offffset互相影響)

2)高級API缺點

不能自行控制offffset(對于某些特殊需求來說)

不能細(xì)化控制如分區(qū)、副本、zk等

3.3.3 低級API

1)低級 API 優(yōu)點

能夠讓開發(fā)者自己控制offffset,想從哪里讀取就從哪里讀取

自行控制連接分區(qū),對分區(qū)自定義進(jìn)行負(fù)載均衡

對zookeeper的依賴性降低(如:offffset不一定非要靠zk存儲,自行存儲offffset即可,比如存在文件或者內(nèi)存中)

2)低級API缺點

太過復(fù)雜,需要自行控制offffset,連接哪個分區(qū),找到分區(qū)leader 等。

3.3.4 消費者組

消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區(qū)在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區(qū),另外兩個分別讀取一個分區(qū)。某個消費者讀取某個分區(qū),也可以叫做某個消費者是某個分區(qū)的擁有者。

在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負(fù)載均衡讀取之前失敗的消費者讀取的分區(qū)。

3.3.5 消費方式

consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。

push(推)模式很難適應(yīng)消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費能力以適當(dāng)?shù)乃俾氏M消息。

對于Kafka而言,pull模式更合適,它可簡化broker的設(shè)計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。

pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直等待數(shù)據(jù)到達(dá)。為了避免這種情況,我們在我們的拉請求中有參數(shù),允許消費者請求在等待數(shù)據(jù)到達(dá)的“長輪詢”中進(jìn)行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大?。?。

3.3.6 消費者組案例

1)需求:測試同一個消費者組中的消費者,同一時刻只能有一個消費者消費。

2)案例實操

(1)在bigdata11、bigdata12上修改/opt/module/kafka/confifig/consumer.properties配置文件中的group.id屬性為任意組名。

(2)在bigdata11、bigdata12上分別啟動消費者

(3)在bigdata13上啟動生產(chǎn)者

[itstar@bigdata12 config]$ vi consumer.properties

group.id=itstar

[itstar@bigdata11 kafka]$ ./bin/kafka-console-consumer.sh --bootstrap-server

node3:9092 --topic second --consumer.config config/consumer.properties

[itstar@bigdata12 kafka]$ ./bin/kafka-console-consumer.sh --bootstrap-server

node3:9092 --topic second --consumer.con

[itstar@bigdata13 kafka]$ bin/kafka-console-producer.sh --broker-list

bigdata11:9092 --topic first

>hello world

(4)查看bigdata11和bigdata12的接收者。

同一時刻只有一個消費者接收到消息。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容