概述
Netty使用ByteBuf作為其底層數(shù)據(jù)傳輸?shù)娜萜?,其?shí)現(xiàn)有兩種方式:基于堆內(nèi)存與基于直接內(nèi)存。為了減少傳輸過程中在用戶緩沖區(qū)與內(nèi)核緩沖區(qū)數(shù)據(jù)拷貝帶來的消耗,底層實(shí)現(xiàn)又分為unsafe與非unsafe兩種方式。同時(shí)為了降低內(nèi)存分配與釋放消耗,采用池化方式對(duì)ByteBuf進(jìn)行緩存。
ByteBuf數(shù)據(jù)結(jié)構(gòu)
ByteBuf底層是一個(gè)字節(jié)數(shù)組,內(nèi)部維護(hù)了兩個(gè)索引:readerIndex與writerIndex。其中0 --> readerIndex部分為可丟棄字節(jié),表示已被讀取過,readerIndex --> writerIndex部分為可讀字節(jié),writerIndex --> capacity部分為可寫字節(jié)。ByteBuf支持動(dòng)態(tài)擴(kuò)容,在實(shí)例化時(shí)會(huì)傳入maxCapacity,當(dāng)writerIndex達(dá)到capacity且capacity小于maxCapacity時(shí)會(huì)進(jìn)行自動(dòng)擴(kuò)容。
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
ByteBuf類圖

ByteBuf子類可以按照以下三個(gè)緯度進(jìn)行分類:
池化與非池化:是否對(duì)ByteBuf進(jìn)行緩存
unsafe與非unsafe:是否使用unsafe進(jìn)行讀寫
堆內(nèi)存與直接內(nèi)存:在堆內(nèi)存或者直接內(nèi)存進(jìn)行分配
內(nèi)存管理
在進(jìn)入內(nèi)存分配核心邏輯前,我們先對(duì)Netty內(nèi)存分配相關(guān)概念做下了解。Netty內(nèi)存管理借鑒jemalloc思想,為了提高內(nèi)存利用率,根據(jù)不同內(nèi)存規(guī)格使用不同的分配策略,并且使用緩存提高內(nèi)存分配效率。
內(nèi)存分配相關(guān)概念
內(nèi)存規(guī)格

Netty有四種內(nèi)存規(guī)格,tiny表示16B ~ 512B之間的內(nèi)存塊,samll表示512B ~ 8K之間的內(nèi)存塊,normal表示8K ~ 16M的內(nèi)存塊,Huge表示大于16M的內(nèi)存塊。
Chunk是Netty向操作系統(tǒng)申請(qǐng)內(nèi)存的單位,默認(rèn)一次向操作系統(tǒng)申請(qǐng)16M內(nèi)存,Netty內(nèi)部將Chunk按照Page大小劃分為2048塊。我們申請(qǐng)內(nèi)存時(shí)如果大于16M,則Netty會(huì)直接向操作系統(tǒng)申請(qǐng)對(duì)應(yīng)大小內(nèi)存,如果申請(qǐng)內(nèi)存在8k到16M之間則會(huì)分配對(duì)應(yīng)個(gè)數(shù)Page進(jìn)行使用。如果申請(qǐng)內(nèi)存遠(yuǎn)小于8K,那么直接使用一個(gè)Page會(huì)造成內(nèi)存浪費(fèi),SubPage就是對(duì)Page進(jìn)行再次分配,減少內(nèi)存浪費(fèi)。
如果申請(qǐng)內(nèi)存小于8K,會(huì)對(duì)Page進(jìn)行再次劃分為SubPage,SubPage大小為Page大小/申請(qǐng)內(nèi)存大小。SubPage又劃分為tiny與small兩種。
PoolArena
負(fù)責(zé)管理從操作系統(tǒng)中申請(qǐng)到的內(nèi)存塊,Netty為了減少多線程競爭arena,采用多arena設(shè)計(jì),arena數(shù)量默認(rèn)為2倍CPU核心數(shù)。線程與arena關(guān)系如下:

PoolThreadLocalCache
線程本地緩存,負(fù)責(zé)創(chuàng)建線程緩存PoolThreadCache。PoolThreadCache中會(huì)初始化三種類型MemoryRegionCache數(shù)組,用以緩存線程中不同規(guī)格的內(nèi)存塊,分別為:tiny、small、normal。tiny類型數(shù)組緩存的內(nèi)存塊大小為16B ~ 512B之間,samll類型數(shù)組緩存的內(nèi)存塊大小為512B ~ 8K之間的內(nèi)存塊,normal類型數(shù)組緩存的內(nèi)存塊大小受DEFAULT_MAX_CACHED_BUFFER_CAPACITY配置影響,默認(rèn)只緩存8K、16K、32K三種類型內(nèi)存塊。
MemoryRegionCache
內(nèi)存塊緩存容器,負(fù)責(zé)緩存tiny、small、normal三種內(nèi)存塊。其內(nèi)部維護(hù)一個(gè)隊(duì)列,用于緩存同種內(nèi)存大小的內(nèi)存塊。
PoolChunk
負(fù)責(zé)管理從操作系統(tǒng)申請(qǐng)的內(nèi)存,內(nèi)部采用伙伴算法以Page為單位進(jìn)行內(nèi)存的分配與管理。

PoolChunkList
負(fù)責(zé)管理Chunk列表,根據(jù)內(nèi)存使用率,分為:qInit、q000、q025、q050、q075、q100六種。每個(gè)PoolChunkList中存儲(chǔ)內(nèi)存使用率相同的Chunk,Chunk以雙向鏈表進(jìn)行關(guān)聯(lián),同時(shí)不同使用率的PoolChunkList也以雙向列表進(jìn)行關(guān)聯(lián)。這樣做的目的是因?yàn)殡S著內(nèi)存的分配,Chunk使用率會(huì)發(fā)生變化,以鏈表形式方便Chunk在不同使用率列表進(jìn)行移動(dòng)。

PoolSubpage
PoolSubpage負(fù)責(zé)tiny、small類型內(nèi)存的管理與分配,實(shí)現(xiàn)基于SLAB內(nèi)存分配算法。PoolArena中有兩種PoolSubpage類型數(shù)組,分別為:tinySubpagePools、smallSubpagePools。tinySubpagePools負(fù)責(zé)管理tiny類型內(nèi)存,數(shù)組大小為512/16=32種。smallSubpagePools負(fù)責(zé)管理small類型內(nèi)存,數(shù)組大小為4。
PoolSubpage數(shù)組中存儲(chǔ)不同內(nèi)存大小的PoolSubpage節(jié)點(diǎn),相同大小節(jié)點(diǎn)以鏈表進(jìn)行關(guān)聯(lián)。PoolSubpage內(nèi)部使用位圖數(shù)組記錄內(nèi)存分配情況。
內(nèi)存分配器
Netty通過ByteBufAllocator進(jìn)行內(nèi)存分配,ByteBufAllocator有兩個(gè)實(shí)現(xiàn)類:PooledByteBufAllocator與UnpooledByteBufAllocator,其中,是否在堆內(nèi)存或者直接內(nèi)存分配與是否使用unsafe進(jìn)行讀寫操作都封裝在其實(shí)現(xiàn)類中。
我們先看下ByteBufAllocator類圖:

PooledByteBufAllocator與UnpooledByteBufAllocator內(nèi)存分配類似,可以通過newHeapBuffer與newDirectBuffer進(jìn)行分配內(nèi)存,我們以PooledByteBufAllocator為例分析下內(nèi)存分配流程:
實(shí)例化內(nèi)存分配器
以PooledByteBufAllocator為例來分析下內(nèi)存分配器實(shí)例化過程。首先調(diào)用PooledByteBufAllocator#DEFAULT方法實(shí)例化PooledByteBufAllocator
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
#PooledByteBufAllocator
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
PooledByteBufAllocator實(shí)例化時(shí)會(huì)初始化幾個(gè)比較重要的屬性:
- DEFAULT_NUM_HEAP_ARENA:heap類型arena數(shù)量,默認(rèn)兩倍CPU核心數(shù)
- DEFAULT_NUM_DIRECT_ARENA: Arena數(shù)組長度,默認(rèn)為兩倍CPU核心數(shù)
- DEFAULT_PAGE_SIZE:頁大小,默認(rèn)為8K
- DEFAULT_MAX_ORDER:樹深度,默認(rèn)為11
- DEFAULT_TINY_CACHE_SIZE:緩存的tiny類型的內(nèi)存?zhèn)€數(shù),默認(rèn)為512
- DEFAULT_SMALL_CACHE_SIZE:緩存的small類型的內(nèi)存?zhèn)€數(shù),默認(rèn)為256
- DEFAULT_NORMAL_CACHE_SIZE:緩存的normal類型的內(nèi)存?zhèn)€數(shù),默認(rèn)為64
- DEFAULT_MAX_CACHED_BUFFER_CAPACITY:最大可以被緩存的內(nèi)存大小,默認(rèn)為32K
- DEFAULT_CACHE_TRIM_INTERVAL:緩存經(jīng)過多少次回收后被清理,默認(rèn)為8192
最終會(huì)調(diào)用PooledByteBufAllocator如下構(gòu)造方法:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
// 調(diào)用父類構(gòu)造方法初始化directByDefault屬性(默認(rèn)使用直接內(nèi)存,默認(rèn)值為true)、初始化一個(gè)空的ByteBuf對(duì)象
super(preferDirect);
// 初始化threadCache屬性,用于創(chuàng)建本地線程緩存
threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
// ...
// 初始化堆內(nèi)存類型PoolArena數(shù)組
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
// 初始化直接內(nèi)存類型PoolArena數(shù)組
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
PooledByteBufAllocator構(gòu)造方法主要做了兩件事情,一是:初始化PoolThreadLocalCache屬性,二是:初始化堆內(nèi)存與直接內(nèi)存類型PoolArena數(shù)組,我們進(jìn)入PoolArena.DirectArena構(gòu)造方法,來分析下PoolArena初始化時(shí)主要做了哪些事情:
#PoolArena.DirectArena
DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}
#PoolArena
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1);
## 初始化tiny類型PoolSubpage數(shù)組
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
## 初始化small類型PoolSubpage數(shù)組
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
##初始化PoolChunkList
q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}
DirectArena構(gòu)造方法會(huì)調(diào)用其父類PoolArena構(gòu)造方法,在PoolArena構(gòu)造方法中會(huì)初始化tiny類型與small類型PoolSubpage數(shù)組,并初始化六種不同內(nèi)存使用率的PoolChunkList,每個(gè)PoolChunkList以雙向鏈表進(jìn)行關(guān)聯(lián)。
分配內(nèi)存
以分配直接內(nèi)存為例,分析內(nèi)存分配的主要流程:
ByteBuf byteBuf = allocator.directBuffer(8);
PooledByteBufAllocator#directBuffer方法最終會(huì)調(diào)用如下構(gòu)造方法,其中maxCapacity為Integer.MAX_VALUE:
#PooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
## 獲取線程緩存
PoolThreadCache cache = (PoolThreadCache)this.threadCache.get();
## 從線程緩存中獲取PoolArena
PoolArena<ByteBuffer> directArena = cache.directArena;
Object buf;
## 分配內(nèi)存
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
## 將ByteBuf轉(zhuǎn)為具有內(nèi)存泄漏檢測功能的ByteBuf
return toLeakAwareBuffer((ByteBuf)buf);
}
該方法主要分三步,第一步:獲取線程緩存,第二步:分配內(nèi)存,第三步:將ByteBuf轉(zhuǎn)為具有內(nèi)存泄漏檢測功能的ByteBuf,我們來分析下每一步具體做了哪些事情:
1.獲取線程緩存,從PoolThreadLocalCache中獲取PoolThreadCache,首次調(diào)用會(huì)先進(jìn)行進(jìn)行初始化,并將結(jié)果緩存下來:
#FastThreadLocal
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 初始化PoolThreadCache
return initialize(threadLocalMap);
}
初始化方法在PoolThreadLocalCache中,首先會(huì)循環(huán)找到使用最少的PoolArena,然后調(diào)用PoolThreadCache構(gòu)造方法創(chuàng)建PoolThreadCache:
#PoolThreadLocalCache
@Override
protected synchronized PoolThreadCache initialValue() {
// 獲取使用最少的PoolArena
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
/**
*heapArena、directArena為PoolArena數(shù)組大小,默認(rèn)為2倍CPU核數(shù)
*tinyCacheSize tiny類型緩存池大小,默認(rèn)為512
*smallCacheSize small類型緩存池大小,默認(rèn)為256
*normalCacheSize normal類型緩存池大小,默認(rèn)為64
*DEFAULT_MAX_CACHED_BUFFER_CAPACITY normal類型緩存池緩存的最大內(nèi)存大小,默認(rèn)為32K
*DEFAULT_CACHE_TRIM_INTERVAL 分配次數(shù)閾值默認(rèn)為8192,超過后釋放內(nèi)存池
*/
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
PoolThreadCache構(gòu)造方法中會(huì)初始化tinySubPageDirectCaches、smallSubPageDirectCaches、normalDirectCaches這三種MemoryRegionCache數(shù)組:
#PoolThreadCache
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
// ...
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
// ...
}
createSubPageCaches方法中會(huì)創(chuàng)建并初始化MemoryRegionCache數(shù)組,其中tiny類型數(shù)組大小為32,small類型數(shù)組大小為4,normal類型數(shù)組大小為3:
#PoolThreadCache
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
最終會(huì)調(diào)用MemoryRegionCache構(gòu)造方法進(jìn)行創(chuàng)建,我們看下MemoryRegionCache結(jié)構(gòu):
#PoolThreadCache
private abstract static class MemoryRegionCache<T> {
// 隊(duì)列大小,tiny類型size大小為512,small類型size大小為256,normal類型size大小為64
private final int size;
// 同種內(nèi)存大小緩存隊(duì)列
private final Queue<Entry<T>> queue;
// sizeClass分為三種:Tiny、Small、Normal
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
}
2.分配內(nèi)存,首先會(huì)獲取PooledByteBuf,然后進(jìn)行內(nèi)存分配:
#PoolArena
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
newByteBuf方法會(huì)嘗試從對(duì)象池里面獲取pooledByteBuf,如果沒有則進(jìn)行創(chuàng)建。allocate方法為內(nèi)存分配核心邏輯,主要分為兩種分配方式:page級(jí)別內(nèi)存分配(8k16M)、subPage級(jí)別內(nèi)存分配(08K)、huge級(jí)別內(nèi)存分配(>16M)。page與subPage級(jí)別內(nèi)存分配首先會(huì)嘗試從緩存上進(jìn)行內(nèi)存分配,如果分配失敗則重新申請(qǐng)內(nèi)存。huge級(jí)別內(nèi)存分配不會(huì)通過緩存進(jìn)行分配。我們看下allocate方法主要流程:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 標(biāo)準(zhǔn)化容量,比如10B會(huì)標(biāo)準(zhǔn)化為16B,7K會(huì)標(biāo)準(zhǔn)化為8K
final int normCapacity = normalizeCapacity(reqCapacity);
// tiny或者small類型內(nèi)存分配
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
return;
}
}
allocateNormal(buf, reqCapacity, normCapacity);
return;
}
// normal類型內(nèi)存分配
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
allocateNormal(buf, reqCapacity, normCapacity);
} else {
// huge類型內(nèi)存分配
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
SubPage級(jí)別內(nèi)存分配
首先嘗試從緩存中進(jìn)行分配:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
cacheForTiney方法先根據(jù)分配內(nèi)存大小定位到對(duì)應(yīng)的tinySubPageDirectCaches數(shù)組中MemoryRegionCache,如果沒有定位到則不能在緩存中進(jìn)行分配。如果有則從MemoryRegionCache對(duì)應(yīng)的隊(duì)列中彈出一個(gè)PooledByteBuf對(duì)象進(jìn)行初始化,同時(shí)為了復(fù)用PooledByteBuf對(duì)象,會(huì)將其緩存下來。
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
// 初始化PooledByteBuf
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
// 將PooledByteBuf對(duì)象放入棧中復(fù)用
entry.recycle();
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}
如果從緩存中分配不成功,則會(huì)從對(duì)應(yīng)的PoolSubpage數(shù)組上進(jìn)行分配,如果PoolSubpage數(shù)組對(duì)應(yīng)的內(nèi)存大小下標(biāo)中有可分配空間則進(jìn)行分配,并對(duì)PooledByteBuf進(jìn)行初始化。
如果在PoolSubpage數(shù)組上分配不成功,則表示沒有可以用來分配的SubPage,則會(huì)嘗試從Page上進(jìn)行分配。先嘗試從不同內(nèi)存使用率的ChunkList進(jìn)行分配,如果仍分配不成功,則表示沒有可以用來分配的Chunk,此時(shí)會(huì)創(chuàng)建新的Chunk進(jìn)行內(nèi)存分配。
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 從不同內(nèi)存使用率的ChunkList進(jìn)行分配
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
++allocationsNormal;
return;
}
// 創(chuàng)建一個(gè)新Chunk
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
// 分配內(nèi)存
long handle = c.allocate(normCapacity);
++allocationsNormal;
assert handle > 0;
// 初始化PooledByteBuf
c.initBuf(buf, handle, reqCapacity);
qInit.add(c);
}
進(jìn)入PoolChunk#allocate方法看下分配流程:
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity);
} else {
return allocateSubpage(normCapacity);
}
}
allocateRun方法用來分配大于等于8K的內(nèi)存,allocateSubpage用來分配小于8K的內(nèi)存,進(jìn)入allocateSubpage方法:
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
// 找到Page
int id = allocateNode(d);
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 創(chuàng)建并初始化PoolSubpage
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
// 分配內(nèi)存
return subpage.allocate();
}
}
內(nèi)存分配成功后會(huì)調(diào)用initBuf方法初始化PoolByteBuf:
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
}
Page級(jí)別內(nèi)存分配
Page級(jí)別內(nèi)存分配和SubPage級(jí)別類似,同樣是先從緩存中進(jìn)行分配,分配不成功則嘗試從不同內(nèi)存使用率的ChunkList進(jìn)行分配,如果仍分配不成功,則表示沒有可以用來分配的Chunk,此時(shí)會(huì)創(chuàng)建新的Chunk進(jìn)行內(nèi)存分配,不同點(diǎn)在allocate方法中:
private long allocateRun(int normCapacity) {
int d = maxOrder - (log2(normCapacity) - pageShifts);
// 找到Page
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);
return id;
}
Huge級(jí)別內(nèi)存分配
因?yàn)榇笥?6M的內(nèi)存分配Netty不會(huì)進(jìn)行緩存,所以Huge級(jí)別內(nèi)存分配會(huì)直接申請(qǐng)內(nèi)存并進(jìn)行初始化:
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
// 創(chuàng)建Chunk
PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
activeBytesHuge.add(chunk.chunkSize());
// 初始化PoolByteBuf
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment();
}
內(nèi)存回收
調(diào)用ByteBuf#release方法會(huì)進(jìn)行內(nèi)存釋放,方法中會(huì)判斷當(dāng)前byteBuf 是否被引用,如果沒有被引用, 則調(diào)用deallocate方法進(jìn)行釋放:
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
// 釋放內(nèi)存
deallocate();
return true;
}
return false;
}
}
}
進(jìn)入deallocate方法看下內(nèi)存釋放流程:
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
// 釋放內(nèi)存
chunk.arena.free(chunk, handle, maxLength, cache);
// 將PoolByteBuf對(duì)象加入到對(duì)象池
recycle();
}
}
free方法會(huì)把釋放的內(nèi)存加入到緩存,如果加入緩存不成功則會(huì)標(biāo)記這段內(nèi)存為未使用:
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
// 加入到緩存
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
// 標(biāo)記內(nèi)存為未使用
freeChunk(chunk, handle, sizeClass);
}
}
recycle方法會(huì)將PoolByteBuf對(duì)象放入到對(duì)象池中:
#DefaultHandle
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}