有兩個分區(qū)的名為“my_topic”的主題的日志包含兩個目錄(即my_topic_0和my_topic_1),其中填充了包含該主題的消息的數(shù)據(jù)文件。日志文件的格式是一系列“日志條目”;每個日志條目是一個4字節(jié)整型變量N,存儲消息長度,后跟N個消息字節(jié)。每條消息由64位整數(shù)偏移量給出消息在這個分去中所有發(fā)送到這個主題的消息的流中開始的字節(jié)位置。每個日志文件都以其包含的第一條消息的便宜量命名,因此創(chuàng)建的第一個文件都將是00000000000.kafka,并且每個附加文件將具有一個整數(shù)名稱,大約是從前一個文件中的S個字節(jié),其中S是配置中給出的最大日志文件的大小。
record的確切二進(jìn)制格式被版本化并維護(hù)為標(biāo)準(zhǔn)接口,因此record批次可以在生產(chǎn)者,broker,和客戶端之間傳輸,而無需進(jìn)行重新復(fù)制或轉(zhuǎn)換。上一節(jié)包含了有關(guān)磁盤上對record進(jìn)行格式化的詳細(xì)信息。
使用消息偏移量作為消息ID是不常見的。我們最初的想法是使用生產(chǎn)者生成的GUID,并在每個broker上維護(hù)從GUID到偏移的映射。但由于消費者必須為每個服務(wù)器維護(hù)一個ID,因此GUID的全局唯一性不提供任何價值。此外,保持從隨機id到偏移的映射的復(fù)雜性需要heavy的索引結(jié)構(gòu),其必須與磁盤同步,基本上需要完全持久的隨機訪問數(shù)據(jù)結(jié)構(gòu)。 因此,為了簡化查找結(jié)構(gòu),我們決定使用一個簡單的per-partition原子計數(shù)器,它可以由分區(qū)id和節(jié)點id組成來唯一的標(biāo)識消息;這使得查找的結(jié)構(gòu)更加簡單,盡管仍然可能針對每個消費者請求進(jìn)行多次搜索。但是,一旦我們確定了一個計數(shù)器,直接使用偏移的跳轉(zhuǎn)看起來就很自然了——畢竟這之后都是在分區(qū)中單調(diào)地增加整型。由于偏移量是從消費者API隱藏的,因此這個決定最終是一個實現(xiàn)細(xì)節(jié),我們采用了更加有效的方法。

寫入
日志允許串行追加始終去到最后一個文件。當(dāng)文件達(dá)到可配置的大小(例如1GB)時,改文件將轉(zhuǎn)移到一個新文件中。該日志有兩個配置參數(shù):M,它給出了在強制操作系統(tǒng)把文件flush到硬盤之前寫入的消息數(shù),以及S,它給出了強制刷新的秒數(shù)。這提供了在系統(tǒng)崩潰時最多丟失M個消息或S秒數(shù)據(jù)的持久性保證。
讀取
通過給出消息的64位邏輯偏移量和S字節(jié)的最大塊大小來完成讀取。這將返回包含著唉S字節(jié)緩沖區(qū)中的消息的迭代器。S旨在比任何單個消息都大,但是如果消息異常的大,則可以多次重試讀取,每次將緩沖區(qū)大小加倍,直到消息被成功讀取??梢灾付ㄗ畲笙⒑途彌_區(qū)大小,以使服務(wù)器拒絕大于某個大小的消息,并在需要讀取的最大值上為客戶端提供綁定以獲得完整的消息。讀緩沖區(qū)很可能以部分消息結(jié)束,這很容易通過大小分隔來實現(xiàn)。
從偏移量讀取數(shù)據(jù)的實際過程需要首先定位存儲數(shù)據(jù)的日志段文件,從全局偏移量計算文件特定的偏移量,然后從該文件偏移量中讀取。搜索值針對每個文件維護(hù)的內(nèi)存范圍的簡單二進(jìn)制搜索變體來完成的。
日志提供了獲取最近編寫信息的功能,以允許客戶端“立即”開始訂閱。在消費者未能在其SLA-specified的天數(shù)內(nèi)使用其數(shù)據(jù)的情況下,這很有用。在這種情況下,當(dāng)客戶端嘗試使用不存在的偏移量時,會給出OutOfRangeException,并且可以自行重置或根據(jù)用例進(jìn)行失敗。
以下是發(fā)送給消費者的結(jié)果格式:
MessageSetSend (fetch result)
total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
1
2
3
4
5
6
7
MultiMessageSetSend (multiFetch result)
total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n