Netty內(nèi)存管理

概述

Netty使用ByteBuf作為其底層數(shù)據(jù)傳輸?shù)娜萜?,其?shí)現(xiàn)有兩種方式:基于堆內(nèi)存與基于直接內(nèi)存。為了減少傳輸過程中在用戶緩沖區(qū)與內(nèi)核緩沖區(qū)數(shù)據(jù)拷貝帶來的消耗,底層實(shí)現(xiàn)又分為unsafe與非unsafe兩種方式。同時(shí)為了降低內(nèi)存分配與釋放消耗,采用池化方式對(duì)ByteBuf進(jìn)行緩存。

ByteBuf數(shù)據(jù)結(jié)構(gòu)

ByteBuf底層是一個(gè)字節(jié)數(shù)組,內(nèi)部維護(hù)了兩個(gè)索引:readerIndex與writerIndex。其中0 --> readerIndex部分為可丟棄字節(jié),表示已被讀取過,readerIndex --> writerIndex部分為可讀字節(jié),writerIndex --> capacity部分為可寫字節(jié)。ByteBuf支持動(dòng)態(tài)擴(kuò)容,在實(shí)例化時(shí)會(huì)傳入maxCapacity,當(dāng)writerIndex達(dá)到capacity且capacity小于maxCapacity時(shí)會(huì)進(jìn)行自動(dòng)擴(kuò)容。

+-------------------+------------------+------------------+
| discardable bytes |  readable bytes  |  writable bytes  |
|                   |     (CONTENT)    |                  |
+-------------------+------------------+------------------+
|                   |                  |                  |
0      <=      readerIndex   <=   writerIndex    <=    capacity

ByteBuf類圖

ByteBuf子類可以按照以下三個(gè)緯度進(jìn)行分類:

  • 池化與非池化:是否對(duì)ByteBuf進(jìn)行緩存

  • unsafe與非unsafe:是否使用unsafe進(jìn)行讀寫

  • 堆內(nèi)存與直接內(nèi)存:在堆內(nèi)存或者直接內(nèi)存進(jìn)行分配

內(nèi)存管理

在進(jìn)入內(nèi)存分配核心邏輯前,我們先對(duì)Netty內(nèi)存分配相關(guān)概念做下了解。Netty內(nèi)存管理借鑒jemalloc思想,為了提高內(nèi)存利用率,根據(jù)不同內(nèi)存規(guī)格使用不同的分配策略,并且使用緩存提高內(nèi)存分配效率。

內(nèi)存分配相關(guān)概念

內(nèi)存規(guī)格

Netty有四種內(nèi)存規(guī)格,tiny表示16B ~ 512B之間的內(nèi)存塊,samll表示512B ~ 8K之間的內(nèi)存塊,normal表示8K ~ 16M的內(nèi)存塊,Huge表示大于16M的內(nèi)存塊。

Chunk是Netty向操作系統(tǒng)申請(qǐng)內(nèi)存的單位,默認(rèn)一次向操作系統(tǒng)申請(qǐng)16M內(nèi)存,Netty內(nèi)部將Chunk按照Page大小劃分為2048塊。我們申請(qǐng)內(nèi)存時(shí)如果大于16M,則Netty會(huì)直接向操作系統(tǒng)申請(qǐng)對(duì)應(yīng)大小內(nèi)存,如果申請(qǐng)內(nèi)存在8k到16M之間則會(huì)分配對(duì)應(yīng)個(gè)數(shù)Page進(jìn)行使用。如果申請(qǐng)內(nèi)存遠(yuǎn)小于8K,那么直接使用一個(gè)Page會(huì)造成內(nèi)存浪費(fèi),SubPage就是對(duì)Page進(jìn)行再次分配,減少內(nèi)存浪費(fèi)。

如果申請(qǐng)內(nèi)存小于8K,會(huì)對(duì)Page進(jìn)行再次劃分為SubPage,SubPage大小為Page大小/申請(qǐng)內(nèi)存大小。SubPage又劃分為tiny與small兩種。

PoolArena

負(fù)責(zé)管理從操作系統(tǒng)中申請(qǐng)到的內(nèi)存塊,Netty為了減少多線程競爭arena,采用多arena設(shè)計(jì),arena數(shù)量默認(rèn)為2倍CPU核心數(shù)。線程與arena關(guān)系如下:


PoolThreadLocalCache

線程本地緩存,負(fù)責(zé)創(chuàng)建線程緩存PoolThreadCache。PoolThreadCache中會(huì)初始化三種類型MemoryRegionCache數(shù)組,用以緩存線程中不同規(guī)格的內(nèi)存塊,分別為:tiny、small、normal。tiny類型數(shù)組緩存的內(nèi)存塊大小為16B ~ 512B之間,samll類型數(shù)組緩存的內(nèi)存塊大小為512B ~ 8K之間的內(nèi)存塊,normal類型數(shù)組緩存的內(nèi)存塊大小受DEFAULT_MAX_CACHED_BUFFER_CAPACITY配置影響,默認(rèn)只緩存8K、16K、32K三種類型內(nèi)存塊。

MemoryRegionCache

內(nèi)存塊緩存容器,負(fù)責(zé)緩存tiny、small、normal三種內(nèi)存塊。其內(nèi)部維護(hù)一個(gè)隊(duì)列,用于緩存同種內(nèi)存大小的內(nèi)存塊。

PoolChunk

負(fù)責(zé)管理從操作系統(tǒng)申請(qǐng)的內(nèi)存,內(nèi)部采用伙伴算法以Page為單位進(jìn)行內(nèi)存的分配與管理。


PoolChunkList

負(fù)責(zé)管理Chunk列表,根據(jù)內(nèi)存使用率,分為:qInit、q000、q025、q050、q075、q100六種。每個(gè)PoolChunkList中存儲(chǔ)內(nèi)存使用率相同的Chunk,Chunk以雙向鏈表進(jìn)行關(guān)聯(lián),同時(shí)不同使用率的PoolChunkList也以雙向列表進(jìn)行關(guān)聯(lián)。這樣做的目的是因?yàn)殡S著內(nèi)存的分配,Chunk使用率會(huì)發(fā)生變化,以鏈表形式方便Chunk在不同使用率列表進(jìn)行移動(dòng)。


PoolSubpage

PoolSubpage負(fù)責(zé)tiny、small類型內(nèi)存的管理與分配,實(shí)現(xiàn)基于SLAB內(nèi)存分配算法。PoolArena中有兩種PoolSubpage類型數(shù)組,分別為:tinySubpagePools、smallSubpagePools。tinySubpagePools負(fù)責(zé)管理tiny類型內(nèi)存,數(shù)組大小為512/16=32種。smallSubpagePools負(fù)責(zé)管理small類型內(nèi)存,數(shù)組大小為4。

PoolSubpage數(shù)組中存儲(chǔ)不同內(nèi)存大小的PoolSubpage節(jié)點(diǎn),相同大小節(jié)點(diǎn)以鏈表進(jìn)行關(guān)聯(lián)。PoolSubpage內(nèi)部使用位圖數(shù)組記錄內(nèi)存分配情況。

內(nèi)存分配器

Netty通過ByteBufAllocator進(jìn)行內(nèi)存分配,ByteBufAllocator有兩個(gè)實(shí)現(xiàn)類:PooledByteBufAllocator與UnpooledByteBufAllocator,其中,是否在堆內(nèi)存或者直接內(nèi)存分配與是否使用unsafe進(jìn)行讀寫操作都封裝在其實(shí)現(xiàn)類中。

我們先看下ByteBufAllocator類圖:


PooledByteBufAllocator與UnpooledByteBufAllocator內(nèi)存分配類似,可以通過newHeapBuffer與newDirectBuffer進(jìn)行分配內(nèi)存,我們以PooledByteBufAllocator為例分析下內(nèi)存分配流程:

實(shí)例化內(nèi)存分配器

以PooledByteBufAllocator為例來分析下內(nèi)存分配器實(shí)例化過程。首先調(diào)用PooledByteBufAllocator#DEFAULT方法實(shí)例化PooledByteBufAllocator

PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;

#PooledByteBufAllocator
public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

PooledByteBufAllocator實(shí)例化時(shí)會(huì)初始化幾個(gè)比較重要的屬性:

  • DEFAULT_NUM_HEAP_ARENA:heap類型arena數(shù)量,默認(rèn)兩倍CPU核心數(shù)
  • DEFAULT_NUM_DIRECT_ARENA: Arena數(shù)組長度,默認(rèn)為兩倍CPU核心數(shù)
  • DEFAULT_PAGE_SIZE:頁大小,默認(rèn)為8K
  • DEFAULT_MAX_ORDER:樹深度,默認(rèn)為11
  • DEFAULT_TINY_CACHE_SIZE:緩存的tiny類型的內(nèi)存?zhèn)€數(shù),默認(rèn)為512
  • DEFAULT_SMALL_CACHE_SIZE:緩存的small類型的內(nèi)存?zhèn)€數(shù),默認(rèn)為256
  • DEFAULT_NORMAL_CACHE_SIZE:緩存的normal類型的內(nèi)存?zhèn)€數(shù),默認(rèn)為64
  • DEFAULT_MAX_CACHED_BUFFER_CAPACITY:最大可以被緩存的內(nèi)存大小,默認(rèn)為32K
  • DEFAULT_CACHE_TRIM_INTERVAL:緩存經(jīng)過多少次回收后被清理,默認(rèn)為8192

最終會(huì)調(diào)用PooledByteBufAllocator如下構(gòu)造方法:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
            // 調(diào)用父類構(gòu)造方法初始化directByDefault屬性(默認(rèn)使用直接內(nèi)存,默認(rèn)值為true)、初始化一個(gè)空的ByteBuf對(duì)象
        super(preferDirect);
            // 初始化threadCache屬性,用于創(chuàng)建本地線程緩存
        threadCache = new PoolThreadLocalCache();
        this.tinyCacheSize = tinyCacheSize;
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
        // ...

            // 初始化堆內(nèi)存類型PoolArena數(shù)組
        if (nHeapArena > 0) {
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
                heapArenas[i] = arena;
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }
                // 初始化直接內(nèi)存類型PoolArena數(shù)組
        if (nDirectArena > 0) {
            directArenas = newArenaArray(nDirectArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
            for (int i = 0; i < directArenas.length; i ++) {
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, maxOrder, pageShifts, chunkSize);
                directArenas[i] = arena;
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
    }

PooledByteBufAllocator構(gòu)造方法主要做了兩件事情,一是:初始化PoolThreadLocalCache屬性,二是:初始化堆內(nèi)存與直接內(nèi)存類型PoolArena數(shù)組,我們進(jìn)入PoolArena.DirectArena構(gòu)造方法,來分析下PoolArena初始化時(shí)主要做了哪些事情:

#PoolArena.DirectArena
DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}

#PoolArena
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
        this.parent = parent;
        this.pageSize = pageSize;
        this.maxOrder = maxOrder;
        this.pageShifts = pageShifts;
        this.chunkSize = chunkSize;
        subpageOverflowMask = ~(pageSize - 1);
            ## 初始化tiny類型PoolSubpage數(shù)組
        tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
        for (int i = 0; i < tinySubpagePools.length; i ++) {
            tinySubpagePools[i] = newSubpagePoolHead(pageSize);
        }

        numSmallSubpagePools = pageShifts - 9;
            ## 初始化small類型PoolSubpage數(shù)組
        smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
        for (int i = 0; i < smallSubpagePools.length; i ++) {
            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
        }
                
            ##初始化PoolChunkList
        q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize);
        q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize);
        q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize);
        q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize);
        q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize);
        qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize);

        q100.prevList(q075);
        q075.prevList(q050);
        q050.prevList(q025);
        q025.prevList(q000);
        q000.prevList(null);
        qInit.prevList(qInit);

        List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
        metrics.add(qInit);
        metrics.add(q000);
        metrics.add(q025);
        metrics.add(q050);
        metrics.add(q075);
        metrics.add(q100);
        chunkListMetrics = Collections.unmodifiableList(metrics);
    }

DirectArena構(gòu)造方法會(huì)調(diào)用其父類PoolArena構(gòu)造方法,在PoolArena構(gòu)造方法中會(huì)初始化tiny類型與small類型PoolSubpage數(shù)組,并初始化六種不同內(nèi)存使用率的PoolChunkList,每個(gè)PoolChunkList以雙向鏈表進(jìn)行關(guān)聯(lián)。

分配內(nèi)存

以分配直接內(nèi)存為例,分析內(nèi)存分配的主要流程:

ByteBuf byteBuf = allocator.directBuffer(8);

PooledByteBufAllocator#directBuffer方法最終會(huì)調(diào)用如下構(gòu)造方法,其中maxCapacity為Integer.MAX_VALUE:

#PooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ## 獲取線程緩存
    PoolThreadCache cache = (PoolThreadCache)this.threadCache.get();
    ## 從線程緩存中獲取PoolArena
    PoolArena<ByteBuffer> directArena = cache.directArena;
    Object buf;
    ## 分配內(nèi)存
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
        ## 將ByteBuf轉(zhuǎn)為具有內(nèi)存泄漏檢測功能的ByteBuf
    return toLeakAwareBuffer((ByteBuf)buf);
}

該方法主要分三步,第一步:獲取線程緩存,第二步:分配內(nèi)存,第三步:將ByteBuf轉(zhuǎn)為具有內(nèi)存泄漏檢測功能的ByteBuf,我們來分析下每一步具體做了哪些事情:

1.獲取線程緩存,從PoolThreadLocalCache中獲取PoolThreadCache,首次調(diào)用會(huì)先進(jìn)行進(jìn)行初始化,并將結(jié)果緩存下來:

#FastThreadLocal
public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
        // 初始化PoolThreadCache
    return initialize(threadLocalMap);
}

初始化方法在PoolThreadLocalCache中,首先會(huì)循環(huán)找到使用最少的PoolArena,然后調(diào)用PoolThreadCache構(gòu)造方法創(chuàng)建PoolThreadCache:

#PoolThreadLocalCache
@Override
protected synchronized PoolThreadCache initialValue() {
    // 獲取使用最少的PoolArena
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    /**
        *heapArena、directArena為PoolArena數(shù)組大小,默認(rèn)為2倍CPU核數(shù)
        *tinyCacheSize tiny類型緩存池大小,默認(rèn)為512
        *smallCacheSize small類型緩存池大小,默認(rèn)為256
        *normalCacheSize normal類型緩存池大小,默認(rèn)為64
        *DEFAULT_MAX_CACHED_BUFFER_CAPACITY normal類型緩存池緩存的最大內(nèi)存大小,默認(rèn)為32K
        *DEFAULT_CACHE_TRIM_INTERVAL 分配次數(shù)閾值默認(rèn)為8192,超過后釋放內(nèi)存池
      */
    return new PoolThreadCache(
            heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
            DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}

PoolThreadCache構(gòu)造方法中會(huì)初始化tinySubPageDirectCaches、smallSubPageDirectCaches、normalDirectCaches這三種MemoryRegionCache數(shù)組:

#PoolThreadCache
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
    // ...
    this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
    this.heapArena = heapArena;
    this.directArena = directArena;
    if (directArena != null) {
        tinySubPageDirectCaches = createSubPageCaches(
                tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
        smallSubPageDirectCaches = createSubPageCaches(
                smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

        numShiftsNormalDirect = log2(directArena.pageSize);
        normalDirectCaches = createNormalCaches(
                normalCacheSize, maxCachedBufferCapacity, directArena);

        directArena.numThreadCaches.getAndIncrement();
    } else {
        // No directArea is configured so just null out all caches
        tinySubPageDirectCaches = null;
        smallSubPageDirectCaches = null;
        normalDirectCaches = null;
        numShiftsNormalDirect = -1;
    }
    // ...
}

createSubPageCaches方法中會(huì)創(chuàng)建并初始化MemoryRegionCache數(shù)組,其中tiny類型數(shù)組大小為32,small類型數(shù)組大小為4,normal類型數(shù)組大小為3:

#PoolThreadCache
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
        int cacheSize, int numCaches, SizeClass sizeClass) {
    if (cacheSize > 0) {
        @SuppressWarnings("unchecked")
        MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
        for (int i = 0; i < cache.length; i++) {
            // TODO: maybe use cacheSize / cache.length
            cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
        }
        return cache;
    } else {
        return null;
    }
}

最終會(huì)調(diào)用MemoryRegionCache構(gòu)造方法進(jìn)行創(chuàng)建,我們看下MemoryRegionCache結(jié)構(gòu):

#PoolThreadCache
private abstract static class MemoryRegionCache<T> {
    // 隊(duì)列大小,tiny類型size大小為512,small類型size大小為256,normal類型size大小為64
    private final int size;
    // 同種內(nèi)存大小緩存隊(duì)列
    private final Queue<Entry<T>> queue;
    // sizeClass分為三種:Tiny、Small、Normal
    private final SizeClass sizeClass;
    private int allocations;

    MemoryRegionCache(int size, SizeClass sizeClass) {
        this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
        queue = PlatformDependent.newFixedMpscQueue(this.size);
        this.sizeClass = sizeClass;
    }
}

2.分配內(nèi)存,首先會(huì)獲取PooledByteBuf,然后進(jìn)行內(nèi)存分配:

#PoolArena
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    PooledByteBuf<T> buf = newByteBuf(maxCapacity);
    allocate(cache, buf, reqCapacity);
    return buf;
}

newByteBuf方法會(huì)嘗試從對(duì)象池里面獲取pooledByteBuf,如果沒有則進(jìn)行創(chuàng)建。allocate方法為內(nèi)存分配核心邏輯,主要分為兩種分配方式:page級(jí)別內(nèi)存分配(8k16M)、subPage級(jí)別內(nèi)存分配(08K)、huge級(jí)別內(nèi)存分配(>16M)。page與subPage級(jí)別內(nèi)存分配首先會(huì)嘗試從緩存上進(jìn)行內(nèi)存分配,如果分配失敗則重新申請(qǐng)內(nèi)存。huge級(jí)別內(nèi)存分配不會(huì)通過緩存進(jìn)行分配。我們看下allocate方法主要流程:

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // 標(biāo)準(zhǔn)化容量,比如10B會(huì)標(biāo)準(zhǔn)化為16B,7K會(huì)標(biāo)準(zhǔn)化為8K
    final int normCapacity = normalizeCapacity(reqCapacity);
    // tiny或者small類型內(nèi)存分配
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        final PoolSubpage<T> head = table[tableIdx];

        /**
         * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
         * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
         */
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {
                    allocationsTiny.increment();
                } else {
                    allocationsSmall.increment();
                }
                return;
            }
        }
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    // normal類型內(nèi)存分配
    if (normCapacity <= chunkSize) {
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        // huge類型內(nèi)存分配
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, reqCapacity);
    }
}

SubPage級(jí)別內(nèi)存分配

首先嘗試從緩存中進(jìn)行分配:

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

cacheForTiney方法先根據(jù)分配內(nèi)存大小定位到對(duì)應(yīng)的tinySubPageDirectCaches數(shù)組中MemoryRegionCache,如果沒有定位到則不能在緩存中進(jìn)行分配。如果有則從MemoryRegionCache對(duì)應(yīng)的隊(duì)列中彈出一個(gè)PooledByteBuf對(duì)象進(jìn)行初始化,同時(shí)為了復(fù)用PooledByteBuf對(duì)象,會(huì)將其緩存下來。

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    // 初始化PooledByteBuf
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    // 將PooledByteBuf對(duì)象放入棧中復(fù)用
    entry.recycle();

    // allocations is not thread-safe which is fine as this is only called from the same thread all time.
    ++ allocations;
    return true;
}

如果從緩存中分配不成功,則會(huì)從對(duì)應(yīng)的PoolSubpage數(shù)組上進(jìn)行分配,如果PoolSubpage數(shù)組對(duì)應(yīng)的內(nèi)存大小下標(biāo)中有可分配空間則進(jìn)行分配,并對(duì)PooledByteBuf進(jìn)行初始化。

如果在PoolSubpage數(shù)組上分配不成功,則表示沒有可以用來分配的SubPage,則會(huì)嘗試從Page上進(jìn)行分配。先嘗試從不同內(nèi)存使用率的ChunkList進(jìn)行分配,如果仍分配不成功,則表示沒有可以用來分配的Chunk,此時(shí)會(huì)創(chuàng)建新的Chunk進(jìn)行內(nèi)存分配。

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    // 從不同內(nèi)存使用率的ChunkList進(jìn)行分配
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        ++allocationsNormal;
        return;
    }

    // 創(chuàng)建一個(gè)新Chunk
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    // 分配內(nèi)存
    long handle = c.allocate(normCapacity);
    ++allocationsNormal;
    assert handle > 0;
    // 初始化PooledByteBuf
    c.initBuf(buf, handle, reqCapacity);
    qInit.add(c);
}

進(jìn)入PoolChunk#allocate方法看下分配流程:

long allocate(int normCapacity) {
    if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
        return allocateRun(normCapacity);
    } else {
        return allocateSubpage(normCapacity);
    }
}

allocateRun方法用來分配大于等于8K的內(nèi)存,allocateSubpage用來分配小于8K的內(nèi)存,進(jìn)入allocateSubpage方法:

private long allocateSubpage(int normCapacity) {
    // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
    // This is need as we may add it back and so alter the linked-list structure.
    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
    synchronized (head) {
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        // 找到Page
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }

        final PoolSubpage<T>[] subpages = this.subpages;
        final int pageSize = this.pageSize;

        freeBytes -= pageSize;

        int subpageIdx = subpageIdx(id);
        PoolSubpage<T> subpage = subpages[subpageIdx];
        if (subpage == null) {
            // 創(chuàng)建并初始化PoolSubpage
            subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
            subpages[subpageIdx] = subpage;
        } else {
            subpage.init(head, normCapacity);
        }
        // 分配內(nèi)存
        return subpage.allocate();
    }
}

內(nèi)存分配成功后會(huì)調(diào)用initBuf方法初始化PoolByteBuf:

void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);
    if (bitmapIdx == 0) {
        byte val = value(memoryMapIdx);
        assert val == unusable : String.valueOf(val);
        buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
                 arena.parent.threadCache());
    } else {
        initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
    }
}

Page級(jí)別內(nèi)存分配

Page級(jí)別內(nèi)存分配和SubPage級(jí)別類似,同樣是先從緩存中進(jìn)行分配,分配不成功則嘗試從不同內(nèi)存使用率的ChunkList進(jìn)行分配,如果仍分配不成功,則表示沒有可以用來分配的Chunk,此時(shí)會(huì)創(chuàng)建新的Chunk進(jìn)行內(nèi)存分配,不同點(diǎn)在allocate方法中:

private long allocateRun(int normCapacity) {
    int d = maxOrder - (log2(normCapacity) - pageShifts);
    // 找到Page
    int id = allocateNode(d);
    if (id < 0) {
        return id;
    }
    freeBytes -= runLength(id);
    return id;
}

Huge級(jí)別內(nèi)存分配

因?yàn)榇笥?6M的內(nèi)存分配Netty不會(huì)進(jìn)行緩存,所以Huge級(jí)別內(nèi)存分配會(huì)直接申請(qǐng)內(nèi)存并進(jìn)行初始化:

private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
    // 創(chuàng)建Chunk
    PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
    activeBytesHuge.add(chunk.chunkSize());
    // 初始化PoolByteBuf
    buf.initUnpooled(chunk, reqCapacity);
    allocationsHuge.increment();
}

內(nèi)存回收

調(diào)用ByteBuf#release方法會(huì)進(jìn)行內(nèi)存釋放,方法中會(huì)判斷當(dāng)前byteBuf 是否被引用,如果沒有被引用, 則調(diào)用deallocate方法進(jìn)行釋放:

private boolean release0(int decrement) {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt < decrement) {
            throw new IllegalReferenceCountException(refCnt, -decrement);
        }

        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
            if (refCnt == decrement) {
                // 釋放內(nèi)存
                deallocate();
                return true;
            }
            return false;
        }
    }
}

進(jìn)入deallocate方法看下內(nèi)存釋放流程:

@Override
protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        // 釋放內(nèi)存
        chunk.arena.free(chunk, handle, maxLength, cache);
        // 將PoolByteBuf對(duì)象加入到對(duì)象池
        recycle();
    }
}

free方法會(huì)把釋放的內(nèi)存加入到緩存,如果加入緩存不成功則會(huì)標(biāo)記這段內(nèi)存為未使用:

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
    if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        SizeClass sizeClass = sizeClass(normCapacity);
        // 加入到緩存
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }
                // 標(biāo)記內(nèi)存為未使用
        freeChunk(chunk, handle, sizeClass);
    }
}

recycle方法會(huì)將PoolByteBuf對(duì)象放入到對(duì)象池中:

#DefaultHandle
@Override
public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }
    stack.push(this);
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容