RocketMq IndexService介紹

系列

開篇

  • 這個系列的主要目的是介紹RocketMq broker的原理和用法,在這個系列當中會介紹 broker 配置文件、broker 啟動流程、broker延遲消息、broker消息存儲。

  • 這篇文章主要介紹broker IndexService,主要介紹IndexService的數據結構和對應的建索引過程。

IndexFile 介紹

  • IndexFile文件的存儲位置是:\store\index${fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,文件大小是固定的,等于40+500W4+2000W20= 420000040個字節(jié)大小。如果消息的properties中設置了UNIQ_KEY這個屬性,就用 topic + “#” + UNIQ_KEY的value作為 key 來做寫入操作。如果消息設置了KEYS屬性(多個KEY以空格分隔),也會用 topic + “#” + KEY 來做索引。

  • 其中的索引數據包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個字段,一共20 Byte。NextIndex offset 即前面讀出來的 slotValue,如果有 hash沖突,就可以用這個字段將所有沖突的索引用鏈表的方式串起來了。Timestamp記錄的是消息storeTimestamp之間的差,并不是一個絕對的時間。整個Index File的結構如圖,40 Byte 的Header用于保存一些總的統計信息,4500W的 Slot Table并不保存真正的索引數據,而是保存每個槽位對應的單向鏈表的頭。202000W 是真正的索引數據,即一個 Index File 可以保存 2000W個索引。

  • IndexFile在解決hash沖突的過程中會采用頭插法,即所有的沖突數據都往鏈表的頭部進行插入,然后每個新添加的元素都會包含后一個元素的位置,hash對應的slot Table會指向第一個索引元素。在實際元素存儲的數據的順序和查詢的順序是逆向映射的,這點需要理解。


IndexFile創(chuàng)建

public class IndexService {

    private static final int MAX_TRY_IDX_CREATE = 3;
    private final DefaultMessageStore defaultMessageStore;
    private final int hashSlotNum;
    private final int indexNum;
    private final String storePath;
    private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public IndexService(final DefaultMessageStore store) {
        this.defaultMessageStore = store;
        // private int maxHashSlotNum = 5000000;
        // private int maxIndexNum = 5000000 * 4;
        this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
        this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
        this.storePath =
            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
    }

    public IndexFile getAndCreateLastIndexFile() {
        IndexFile indexFile = null;
        IndexFile prevIndexFile = null;
        long lastUpdateEndPhyOffset = 0;
        long lastUpdateIndexTimestamp = 0;

        {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {
                IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
                if (!tmp.isWriteFull()) {
                    indexFile = tmp;
                } else {
                    lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
                    lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                    prevIndexFile = tmp;
                }
            }

            this.readWriteLock.readLock().unlock();
        }

        if (indexFile == null) {
            try {
                // 創(chuàng)建的文件名以時間戳作為文件名
                String fileName =
                    this.storePath + File.separator
                        + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
                indexFile =
                    // 文件的hashSlotNum=5000000,indexNum=5000000 * 4
                    new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                        lastUpdateIndexTimestamp);
                this.readWriteLock.writeLock().lock();
                this.indexFileList.add(indexFile);
            } catch (Exception e) {
                log.error("getLastIndexFile exception ", e);
            } finally {
                this.readWriteLock.writeLock().unlock();
            }

            if (indexFile != null) {
                // 前置文件刷盤
                final IndexFile flushThisFile = prevIndexFile;
                Thread flushThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        IndexService.this.flush(flushThisFile);
                    }
                }, "FlushIndexFileThread");

                flushThread.setDaemon(true);
                flushThread.start();
            }
        }

        return indexFile;
    }
}
  • IndexService在不存在或者當前文件已滿的情況下會創(chuàng)建新的indexFile文件呢。
  • indexFile文件的名為當前時間戳、hashSlotNum=5000000,indexNum=5000000 * 4。


Index存儲

public class IndexFile {

    private static int hashSlotSize = 4;
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum;
    private final int indexNum;

    private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

            indexFile = retryGetAndCreateIndexFile();
            if (null == indexFile) {
                return null;
            }

            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
        }

        return indexFile;
    }


    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        // 在Index的文件沒有滿的情況下放置索引數據
        if (this.indexHeader.getIndexCount() < this.indexNum) {

            // 1、針對key計算hash值
            int keyHash = indexKeyHashMethod(key);
            // 2、記錄hash值應該保存的slot的位置
            int slotPos = keyHash % this.hashSlotNum;
            // 3、計算Index文件當中slotPos對應的實際物理偏移
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {
                // 獲取absSlotPos位置記錄當前存儲的index的位移
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                // 4、計算index實際的存儲的偏移
                // 實際位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的偏移(已有index的個數*indexSize)
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                // 5、生成Index的對象放置 keyHash、phyOffset、timeDiff、slotValue(保存的該hash值下前一個index的邏輯位移,也就是第幾個index對象)、

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                // 6、記錄SlotPos的當前的index的個數,即邏輯位移。
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                // 設置Index文件整體的index個數和slot個數、commitLog的最后物理偏移和最新的存儲時間戳。
                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }
}
  • IndexFile#putKey實現了整個index文件存儲過程,由于IndexFile實現的是類似hash的結果,所以存儲過程也跟hash的存儲流程比較相似。

  • 1、針對key計算hash值,記錄hash值應該保存的slot的位置,計算Index文件當中slotPos對應的實際物理偏移。

  • 2、根據slotPos對應的實際物理偏移獲取該slot下最新的index文件的邏輯位移,即index linked list的第幾個。

  • 3、計算index實際的存儲的偏移,實際位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的物理偏移(已有index的個數*indexSize)。

  • 4、生成當前Index的對象放置 keyHash、phyOffset(commitLog的實際偏移量)、timeDiff、slotValue(保存的該hash值下前一個index的邏輯位移,也就是第幾個index對象),slotValue起到了鏈表鏈接的作用。

  • 5、設置Index文件整體的index個數和slot個數、commitLog的最后物理偏移和最新的存儲時間戳。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容