系列
- RocketMq broker 配置文件
- RocketMq broker 啟動流程
- RocketMq broker CommitLog介紹
- RocketMq broker consumeQueue介紹
- RocketMq broker 重試和死信隊列
- RocketMq broker 延遲消息
- RocketMq IndexService介紹
- RocketMq 讀寫分離機制
- RocketMq broker過期文件刪除
開篇
這個系列的主要目的是介紹RocketMq broker的原理和用法,在這個系列當中會介紹 broker 配置文件、broker 啟動流程、broker延遲消息、broker消息存儲。
這篇文章主要介紹broker IndexService,主要介紹IndexService的數據結構和對應的建索引過程。
IndexFile 介紹

IndexFile文件的存儲位置是:\store\index${fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,文件大小是固定的,等于40+500W4+2000W20= 420000040個字節(jié)大小。如果消息的properties中設置了UNIQ_KEY這個屬性,就用 topic + “#” + UNIQ_KEY的value作為 key 來做寫入操作。如果消息設置了KEYS屬性(多個KEY以空格分隔),也會用 topic + “#” + KEY 來做索引。
其中的索引數據包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個字段,一共20 Byte。NextIndex offset 即前面讀出來的 slotValue,如果有 hash沖突,就可以用這個字段將所有沖突的索引用鏈表的方式串起來了。Timestamp記錄的是消息storeTimestamp之間的差,并不是一個絕對的時間。整個Index File的結構如圖,40 Byte 的Header用于保存一些總的統計信息,4500W的 Slot Table并不保存真正的索引數據,而是保存每個槽位對應的單向鏈表的頭。202000W 是真正的索引數據,即一個 Index File 可以保存 2000W個索引。
IndexFile在解決hash沖突的過程中會采用頭插法,即所有的沖突數據都往鏈表的頭部進行插入,然后每個新添加的元素都會包含后一個元素的位置,hash對應的slot Table會指向第一個索引元素。在實際元素存儲的數據的順序和查詢的順序是逆向映射的,這點需要理解。
IndexFile創(chuàng)建
public class IndexService {
private static final int MAX_TRY_IDX_CREATE = 3;
private final DefaultMessageStore defaultMessageStore;
private final int hashSlotNum;
private final int indexNum;
private final String storePath;
private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
// private int maxHashSlotNum = 5000000;
// private int maxIndexNum = 5000000 * 4;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
if (!tmp.isWriteFull()) {
indexFile = tmp;
} else {
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}
this.readWriteLock.readLock().unlock();
}
if (indexFile == null) {
try {
// 創(chuàng)建的文件名以時間戳作為文件名
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());
indexFile =
// 文件的hashSlotNum=5000000,indexNum=5000000 * 4
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);
} catch (Exception e) {
log.error("getLastIndexFile exception ", e);
} finally {
this.readWriteLock.writeLock().unlock();
}
if (indexFile != null) {
// 前置文件刷盤
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
}
- IndexService在不存在或者當前文件已滿的情況下會創(chuàng)建新的indexFile文件呢。
- indexFile文件的名為當前時間戳、hashSlotNum=5000000,indexNum=5000000 * 4。
Index存儲
public class IndexFile {
private static int hashSlotSize = 4;
private static int indexSize = 20;
private static int invalidIndex = 0;
private final int hashSlotNum;
private final int indexNum;
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 在Index的文件沒有滿的情況下放置索引數據
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 1、針對key計算hash值
int keyHash = indexKeyHashMethod(key);
// 2、記錄hash值應該保存的slot的位置
int slotPos = keyHash % this.hashSlotNum;
// 3、計算Index文件當中slotPos對應的實際物理偏移
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// 獲取absSlotPos位置記錄當前存儲的index的位移
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 4、計算index實際的存儲的偏移
// 實際位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的偏移(已有index的個數*indexSize)
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 5、生成Index的對象放置 keyHash、phyOffset、timeDiff、slotValue(保存的該hash值下前一個index的邏輯位移,也就是第幾個index對象)、
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 6、記錄SlotPos的當前的index的個數,即邏輯位移。
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
// 設置Index文件整體的index個數和slot個數、commitLog的最后物理偏移和最新的存儲時間戳。
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
}
IndexFile#putKey實現了整個index文件存儲過程,由于IndexFile實現的是類似hash的結果,所以存儲過程也跟hash的存儲流程比較相似。
1、針對key計算hash值,記錄hash值應該保存的slot的位置,計算Index文件當中slotPos對應的實際物理偏移。
2、根據slotPos對應的實際物理偏移獲取該slot下最新的index文件的邏輯位移,即index linked list的第幾個。
3、計算index實際的存儲的偏移,實際位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的物理偏移(已有index的個數*indexSize)。
4、生成當前Index的對象放置 keyHash、phyOffset(commitLog的實際偏移量)、timeDiff、slotValue(保存的該hash值下前一個index的邏輯位移,也就是第幾個index對象),slotValue起到了鏈表鏈接的作用。
5、設置Index文件整體的index個數和slot個數、commitLog的最后物理偏移和最新的存儲時間戳。