RocketMQ的消息存儲(chǔ)過程非常復(fù)雜, 本文先介紹存儲(chǔ)模塊中幾個(gè)重要對(duì)象.
1. MappedFile
對(duì)MappedByteBuffer的封裝, 具有創(chuàng)建文件(使用非堆區(qū)內(nèi)存), 寫入, 提交, 讀取, 釋放, 關(guān)閉等功能, RocketMQ使用該類實(shí)現(xiàn)數(shù)據(jù)從內(nèi)存到磁盤的持久化.
關(guān)鍵字段
- fileChannel: 該類對(duì)應(yīng)的文件通道.
- mappedByteBuffer: 文件在內(nèi)存中的映射. 如前文所述RocketMQ使用內(nèi)存映射的方式來操作文件, 這種方式要比流的方式快很多.
- fileSize: 文件尺寸
- wrotePosition: 當(dāng)前寫到哪一個(gè)位置.
- committedPosition: 已經(jīng)提交(已經(jīng)持久化到磁盤)的位置.
- flushedPosition: 已經(jīng)提交(已經(jīng)持久化到磁盤)的位置.
- writeBuffer: 內(nèi)存字節(jié)緩沖區(qū), RocketMQ提供兩種數(shù)據(jù)落盤的方式: 一種是直接將數(shù)據(jù)寫到映射文件字節(jié)緩沖區(qū)(mappedByteBuffer), 映射文件字節(jié)緩沖區(qū)(mappedByteBuffer)flush; 另一種是先寫到writeBuffer, 再從內(nèi)存字節(jié)緩沖區(qū)(write buffer)提交(commit)到文件通道(fileChannel), 然后文件通道(fileChannel)flush.
- fileFromOffset: fileFromOffset: 映射的起始偏移量, 拿commitlog文件來舉例, 下面有很多個(gè)文件夾(假設(shè)為1KB, 默認(rèn)是1G大小), 第一個(gè)文件名為00000000000000000000, 第二個(gè)文件名為00000000000000001024, 那么第一個(gè)文件的fileFromOffset就是0, 第二個(gè)文件的fileFromOffset就是1024
關(guān)鍵方法
- appendMessage: 插入消息到MappedFile, 并返回插入結(jié)果.
- selectMappedBuffer: 返回指定位置的內(nèi)存映射, 用于讀取數(shù)據(jù).
(1) appendMessage
源代碼如下:
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
assert msg != null;
assert cb != null;
int currentPos = this.wrotePosition.get(); //獲取當(dāng)前寫的位置
if (currentPos < this.fileSize) { //currentPos小于文件尺寸才能寫入
//獲取獲取需要寫入的字節(jié)緩沖區(qū), 之所以會(huì)有writeBuffer != null的判斷與使用的刷盤服務(wù)有關(guān).
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos); //設(shè)置寫入的postion
AppendMessageResult result =
cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); //執(zhí)行寫入
this.wrotePosition.addAndGet(result.getWroteBytes()); //更新wrotePosition
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
//返回錯(cuò)誤信息
log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
+ this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
可以看到MappedFile調(diào)用AppendMessageCallback來執(zhí)行msg到字節(jié)緩沖區(qū)的寫入.事實(shí)上整個(gè)RocketMQ只有一個(gè)類實(shí)現(xiàn)了AppendMessageCallback接口, 就是DefaultAppendMessageCallback. doAppend方法的具體實(shí)現(xiàn)與消息格式有關(guān), 并且不屬于MappedFile的范疇, 后文再分析.
(2) selectMappedBuffer
源代碼如下:
//返回從pos到 pos + size的內(nèi)存映射
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
int readPosition = getReadPosition(); //獲取當(dāng)前有效數(shù)據(jù)的最大位置
if ((pos + size) <= readPosition) { //內(nèi)存映射的最大位置必須小于readPosition
if (this.hold()) { //引用計(jì)數(shù)
ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // 復(fù)制一個(gè)byteBuffer(與原byteBuffer共享數(shù)據(jù), 只是指針位置獨(dú)立)
byteBuffer.position(pos); //設(shè)置position
//獲取目標(biāo)數(shù)據(jù)
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
} else {
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
+ this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
+ ", fileFromOffset: " + this.fileFromOffset);
}
return null;
}
2. MappedFileQueue
顧名思義, 該類代表了MappedFile組成的隊(duì)列(由大小相同的多個(gè)文件構(gòu)成). 無論是CommitLog(消息主體以及元數(shù)據(jù)), 還是ConsumeQueue(邏輯隊(duì)列), 底層使用的組件都是MappedFileQueue.
關(guān)鍵字段
- storePath: 文件隊(duì)列的存儲(chǔ)路徑
- mappedFiles: 存儲(chǔ)MappedFile的map
- mappedFileSize: MappedFile的尺寸
- flushedWhere: 已經(jīng)刷到磁盤的位置
- committedWhere: 已經(jīng)提交的位置
關(guān)鍵方法
- getLastMappedFile: 獲取隊(duì)列中最后一個(gè)MappedFile對(duì)象
- findMappedFileByOffset: 根據(jù)offset/filesize計(jì)算該offset所在那個(gè)文件中
(1) getLastMappedFile
源代碼如下:
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
//獲取當(dāng)前Queue中最后一個(gè)MappedFile
MappedFile mappedFileLast = getLastMappedFile();
//一個(gè)文件都不存在時(shí), 計(jì)算起始文件的offset
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
//計(jì)算需要新創(chuàng)建的文件的offset
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
//創(chuàng)建新的MappedFile
if (createOffset != -1 && needCreate) {
//計(jì)算文件名
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
//使用AllocateMappedFileService創(chuàng)建文件主要是更加安全一些, 會(huì)將一些并行的操作穿行化
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
//將新創(chuàng)建的文件添加到隊(duì)列中
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
從源碼中可見, 只有當(dāng)文件寫滿或者找不到文件時(shí), 才會(huì)創(chuàng)建新的文件.
(2) findMappedFileByOffset
主要是根據(jù)offset尋找對(duì)應(yīng)的MappedFile, 具體源代碼不再貼出.
為了理解findMapedFileByOffset, 我們假設(shè)每個(gè)文件的大小是1024K, 參考以下圖示:

如果現(xiàn)在想查找3021在那個(gè)文件中, 可以按如下計(jì)算:
(3021 - 0)/1024=2 即可知其在隊(duì)列下標(biāo)為2的MappedFile中
釋義如下: (offset-第一個(gè)文件的fileFromeOffset)/mappedFileSize
3. CommitLog
用于存儲(chǔ)消息的抽象封裝, 內(nèi)部采用MapedFileQueue實(shí)現(xiàn)了消息文件隊(duì)列功能.
關(guān)鍵字段
- HashMap topicQueueTable: 用于記錄某個(gè)topic在某個(gè)queueId共寫入了多少個(gè)消息, put一個(gè)消息加1.
關(guān)鍵方法
- putMessage: 存儲(chǔ)消息.
- getMessage: 讀取消息
(1) putMessage
存儲(chǔ)消息主要分3步: 查找文件(getLastMapedFile), 寫入數(shù)據(jù)(DefaultAppendMessageCallback), 刷盤(FlushRealTimeService). 最終產(chǎn)生實(shí)際存儲(chǔ)消息的隊(duì)列文件如下:
${storePathRootDir}/commitlog/消息隊(duì)列文件. (消息隊(duì)列文件名規(guī)則如MappedFileQueue).
(2)getMessage(final long offset, final int size)
offset: 絕對(duì)偏移量, 可以用其調(diào)用findMappedFileByOffset查詢MappedFile.
size: 欲查詢的數(shù)據(jù)大小.
4. ConsumeQueue
消費(fèi)隊(duì)列的實(shí)現(xiàn), 該消費(fèi)隊(duì)列主要存儲(chǔ)了消息在CommitLog的位置, 與CommitLog類似, 內(nèi)部采用MappedFileQueue實(shí)現(xiàn)了消息位置文件隊(duì)列功能.
一個(gè)topic和一個(gè)queueId對(duì)應(yīng)一個(gè)ConsumeQueue.
默認(rèn)queue存儲(chǔ)30W條消息, 每個(gè)消息大小為20個(gè)字節(jié), 詳細(xì)如下:
offset(long 8字節(jié)) + size(int 4字節(jié)) + tagsCode(long 8字節(jié))
關(guān)鍵方法
- putMessagePositionInfo: 消息位置的存儲(chǔ)
- getIndexBuffer: 該方法返回從offset之后的字節(jié)映射
(1)putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset)
offset: 消息在commitLog中的起始位置
size: 消息長(zhǎng)度
tagsCode: 消息tag的hash code
cqOffset: 該消息在topic對(duì)應(yīng)的queue中的下標(biāo)
該方法主要實(shí)現(xiàn)了消息位置的存儲(chǔ), 并產(chǎn)生消息文件:
storePathRootDir/consumequeue/{topic}/${queueId}/消息位置隊(duì)列文件
消息數(shù)(30W)*消息位置固定大小(20字節(jié))=6000000字節(jié)
故每6000000字節(jié)一個(gè)文件, 文件名依次遞增, 前綴不夠20位補(bǔ)0, 類似如下:
00000000000000000000
00000000000006000000
00000000000012000000
(2)getIndexBuffer(final long startIndex)
該方法源代碼如下:
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize;
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}
startIndex代表了起始偏移量索引.
該方法先根據(jù)startIndex找到對(duì)應(yīng)的MappedFile, 再在該MappedFile中找到對(duì)應(yīng)的字節(jié)映射.
5. 總結(jié)
RocketMQ的消息存儲(chǔ)非常復(fù)雜, 本文介紹了消息存儲(chǔ)中使用到的基礎(chǔ)組件類以及一些重要的API. 后文會(huì)進(jìn)一步介紹消息存儲(chǔ)的詳細(xì)流程.
參考資料:
1.http://www.tuicool.com/articles/6FFR7v
2.http://blog.csdn.net/a417930422/article/details/50606732