PoolArena 是Netty内存池中的一个核心容器,它的主要作用是对创建的一系列的 PoolChunk 和 PoolSubpage 进行管理,根据申请的不同内存大小将最终的申请动作委托给这两个子容器进行管理。整体上, PoolArena 管理的内存有直接内存和堆内存两种方式,其是通过子类继承的方式来实现对不同类型的内存的申请与释放的。本文首先会对 PoolArena 的整体结构进行介绍,然后会介绍其主要属性,接着会从源码的角度对 PoolArena 申请和释放内存的过程进行介绍。
在整体上,PoolArena是对内存申请和释放的一个抽象,其有两个子类,结构如下图所示:
这里 DirectArena 和 HeapArena 是 PoolArena 对不同类型的内存申请和释放进行管理的两个具体的实现,内存的处理工作主要还是在PoolArena中。从结构上来看, PoolArena 中主要包含三部分子内存池:tinySubpagePools,smallSubpagePools和一系列的PoolChunkList。tinySubpagePools和smallSubpagePools都是PoolSubpage的数组,数组长度分别为32和4;PoolChunkList则主要是一个容器,其内部可以保存一系列的PoolChunk对象,并且,Netty会根据内存使用率的不同,将PoolChunkList分为不同等级的容器。如下是 PoolArena 在初始状态时的结构示意图:
关于 PoolArena 的结构,主要有如下几点需要说明:
PoolArena中有非常多的属性值,用于对PoolSubpage、PookChunk和PoolChunkList进行控制。在阅读源码时,如果能够理解这些属性值的作用,将会极大的加深对Netty内存池的理解。我们这里对PoolArena的主要属性进行介绍:
// 该参数指定了tinySubpagePools数组的长度,由于tinySubpagePools每一个元素的内存块差值为16, // 因而数组长度是512/16,也即这里的512 >>> 4 static final int numTinySubpagePools = 512 >>> 4; // 记录了PooledByteBufAllocator的引用 final PooledByteBufAllocator parent; // PoolChunk底层是一个平衡二叉树,该参数指定了该二叉树的深度 private final int maxOrder; // 该参数指定了PoolChunk中每一个叶节点所指代的内存块的大小 final int pageSize; // 指定了叶节点大小8KB是2的多少次幂,默认为13,该字段的主要作用是,在计算目标内存属于二叉树的 // 第几层的时候,可以借助于其内存大小相对于pageShifts的差值,从而快速计算其所在层数 final int pageShifts; // 指定了PoolChunk的初始大小,默认为16M final int chunkSize; // 由于PoolSubpage的大小为8KB=8196,因而该字段的值为 // -8192=>=> 1111 1111 1111 1111 1110 0000 0000 0000 // 这样在判断目标内存是否小于8KB时,只需要将目标内存与该数字进行与操作,只要操作结果等于0, // 就说明目标内存是小于8KB的,这样就可以判断其是应该首先在tinySubpagePools或smallSubpagePools // 中进行内存申请 final int subpageOverflowMask; // 该参数指定了smallSubpagePools数组的长度,默认为4 final int numSmallSubpagePools; // 指定了直接内存缓存的校准值 final int directMemoryCacheAlignment; // 指定了直接内存缓存校准值的判断变量 final int directMemoryCacheAlignmentMask; // 存储内存块小于512byte的PoolSubpage数组,该数组是分层次的,比如其第1层只用于大小为16byte的 // 内存块的申请,第2层只用于大小为32byte的内存块的申请,……,第31层只用于大小为496byte的内存块的申请 private final PoolSubpage<T>[] tinySubpagePools; // 用于大小在512byte~8KB内存的申请,该数组长度为4,所申请的内存块大小为512byte、1024byte、 // 2048byte和4096byte。 private final PoolSubpage<T>[] smallSubpagePools; // 用户维护使用率在50~100%的PoolChunk private final PoolChunkList<T> q050; // 用户维护使用率在25~75%的PoolChunk private final PoolChunkList<T> q025; // 用户维护使用率在1~50%的PoolChunk private final PoolChunkList<T> q000; // 用户维护使用率在0~25%的PoolChunk private final PoolChunkList<T> qInit; // 用户维护使用率在75~100%的PoolChunk private final PoolChunkList<T> q075; // 用户维护使用率为100%的PoolChunk private final PoolChunkList<T> q100; // 记录了当前PoolArena已经被多少个线程使用了,在每一个线程申请新内存的时候,其会找到使用最少的那个 // PoolArena进行内存的申请,这样可以减少线程之间的竞争 final AtomicInteger numThreadCaches = new AtomicInteger();
PoolArena对内存申请的控制,主要是按照前面的描述,对其流程进行控制。关于PoolChunk和PoolSubpage对内存申请和释放的控制,读者可以阅读本人前面的文章: Netty内存池之PoolChunk原理详解 和 Netty内存池之PoolSubpage详解 。这里我们主要在PoolArena层面上对内存的申请进行讲解,如下是其allocate()方法的源码:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// 这里newByteBuf()方法将会创建一个PooledByteBuf对象,但是该对象是未经初始化的,
// 也就是说其内部的ByteBuffer和readerIndex,writerIndex等参数都是默认值
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 使用对应的方式为创建的ByteBuf初始化相关内存数据,我们这里是以DirectArena进行讲解,因而这里
// 是通过其allocate()方法申请内存
allocate(cache, buf, reqCapacity);
return buf;
}
上述方法主要是一个入口方法,首先创建一个属性都是默认值的ByteBuf对象,然后将真正的申请动作交由allocate()方法进行:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 这里normalizeCapacity()方法的主要作用是对目标容量进行规整操作,主要规则如下:
// 1. 如果目标容量小于16字节,则返回16;
// 2. 如果目标容量大于16字节,小于512字节,则以16字节为单位,返回大于目标字节数的第一个16字节的倍数。
// 比如申请的100字节,那么大于100的16的倍数是112,因而返回112个字节
// 3. 如果目标容量大于512字节,则返回大于目标容量的第一个2的指数幂。
// 比如申请的1000字节,那么返回的将是1024
final int normCapacity = normalizeCapacity(reqCapacity);
// 判断目标容量是否小于8KB,小于8KB则使用tiny或small的方式申请内存
if (isTinyOrSmall(normCapacity)) {
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity); // 判断目标容量是否小于512字节,小于512字节的为tiny类型的
if (tiny) {
// 这里首先从当前线程的缓存中尝试申请内存,如果申请到了,则直接返回,该方法中会使用申请到的
// 内存对ByteBuf对象进行初始化
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
return;
}
// 如果无法从当前线程缓存中申请到内存,则尝试从tinySubpagePools中申请,这里tinyIdx()方法
// 就是计算目标内存是在tinySubpagePools数组中的第几号元素中的
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 如果目标内存在512byte~8KB之间,则尝试从smallSubpagePools中申请内存。这里首先从
// 当前线程的缓存中申请small级别的内存,如果申请到了,则直接返回
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
return;
}
// 如果无法从当前线程的缓存中申请到small级别的内存,则尝试从smallSubpagePools中申请。
// 这里smallIdx()方法就是计算目标内存块是在smallSubpagePools中的第几号元素中的
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// 获取目标元素的头结点
final PoolSubpage<T> head = table[tableIdx];
// 这里需要注意的是,由于对head进行了加锁,而在同步代码块中判断了s != head,
// 也就是说PoolSubpage链表中是存在未使用的PoolSubpage的,因为如果该节点已经用完了,
// 其是会被移除当前链表的。也就是说只要s != head,那么这里的allocate()方法
// 就一定能够申请到所需要的内存块
synchronized (head) {
final PoolSubpage<T> s = head.next;
// s != head就证明当前PoolSubpage链表中存在可用的PoolSubpage,并且一定能够申请到内存,
// 因为已经耗尽的PoolSubpage是会从链表中移除的
if (s != head) {
// 从PoolSubpage中申请内存
long handle = s.allocate();
// 通过申请的内存对ByteBuf进行初始化
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
// 对tiny类型的申请数进行更新
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
// 走到这里,说明目标PoolSubpage链表中无法申请到目标内存块,因而就尝试从PoolChunk中申请
allocateNormal(buf, reqCapacity, normCapacity);
}
// 对tiny类型的申请数进行更新
incTinySmallAllocation(tiny);
return;
}
// 走到这里说明目标内存是大于8KB的,那么就判断目标内存是否大于16M,如果大于16M,
// 则不使用内存池对其进行管理,如果小于16M,则到PoolChunkList中进行内存申请
if (normCapacity <= chunkSize) {
// 小于16M,首先到当前线程的缓存中申请,如果申请到了则直接返回,如果没有申请到,
// 则到PoolChunkList中进行申请
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
return;
}
synchronized (this) {
// 在当前线程的缓存中无法申请到足够的内存,因而尝试到PoolChunkList中申请内存
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// 对于大于16M的内存,Netty不会对其进行维护,而是直接申请,然后返回给用户使用
allocateHuge(buf, reqCapacity);
}
}
上述代码就是PoolArena申请目标内存块的主要流程,首先会判断目标内存是在哪个内存层级的,比如tiny、small或者normal,然后根据目标层级的分配方式对目标内存进行扩容。接着首先会尝试从当前线程的缓存中申请目标内存,如果能够申请到,则直接返回,如果不能申请到,则在当前层级中申请。对于tiny和small层级的内存申请,如果无法申请到,则会将申请动作交由PoolChunkList进行。这里我们主要看一下PoolArena是如何在PoolChunkList中申请内存的,如下是allocateNormal()的源码:
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 将申请动作按照q050->q025->q000->qInit->q075的顺序依次交由各个PoolChunkList进行处理,
// 如果在对应的PoolChunkList中申请到了内存,则直接返回
if (q050.allocate(buf, reqCapacity, normCapacity)
|| q025.allocate(buf, reqCapacity, normCapacity)
|| q000.allocate(buf, reqCapacity, normCapacity)
|| qInit.allocate(buf, reqCapacity, normCapacity)
|| q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// 由于在目标PoolChunkList中无法申请到内存,因而这里直接创建一个PoolChunk,
// 然后在该PoolChunk中申请目标内存,最后将该PoolChunk添加到qInit中
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, normCapacity);
qInit.add(c);
}
这里申请过程比较简单,首先是按照一定的顺序分别在各个PoolChunkList中申请内存,如果申请到了,则直接返回,如果没申请到,则创建一个PoolChunk进行申请。这里需要说明的是,在PoolChunkList中申请内存时,本质上还是将申请动作交由其内部的PoolChunk进行申请,如果申请到了,其还会判断当前PoolChunk的内存使用率是否超过了当前PoolChunkList的阈值,如果超过了,则会将其移动到下一PoolChunkList中。
对于内存的释放,PoolArena主要是分为两种情况,即池化和非池化,如果是非池化,则会直接销毁目标内存块,如果是池化的,则会将其添加到当前线程的缓存中。如下是free()方法的源码:
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity,
PoolThreadCache cache) {
// 如果是非池化的,则直接销毁目标内存块,并且更新相关的数据
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
// 如果是池化的,首先判断其是哪种类型的,即tiny,small或者normal,
// 然后将其交由当前线程的缓存进行处理,如果添加成功,则直接返回
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, nioBuffer, handle,
normCapacity, sizeClass)) {
return;
}
// 如果当前线程的缓存已满,则将目标内存块返还给公共内存块进行处理
freeChunk(chunk, handle, sizeClass, nioBuffer);
}
}
本文首先对PoolArena的整体结构进行了讲解,并且讲解了PoolArena是如何控制内存申请流转的,然后介绍了PoolArena中各个属性的作用,最后从源码的角度讲解了PoolArena是如何控制内存的申请的。