前言:本文将要介绍的高并发内存池,它的原型是Google的⼀个开源项⽬tcmalloc,全称Thread-Caching Malloc,该篇文章将以学习为目的来模拟实现精简版的高并发内存池,并对核心技术分块进行精细剖析。 项目专栏:高并发内存池_敲上瘾的博客-CSDN博客
Thread-Caching Malloc,即线程缓存的malloc,实现了⾼效的多线程内存管理,⽤于替代系统的内存分配相关的函数(malloc、free)。
在多线程环境下进行内存申请本质上属于对公共资源的访问,高并发场景下对内存分配锁的竞争会异常激烈,这将严重拖慢程序运行效率。此外,频繁的系统级内存申请不仅会产生额外开销,还可能引发内存碎片问题。传统的malloc内存分配机制由于这些固有缺陷,已难以满足现代高性能开发中对内存管理效率的需求。
而tcmalloc在高并发场景下是如何高效、安全的分配和释放内存呢?它的设计核心如下:
brk或mmap)。concurrent memory pool主要由以下3个部分构成:

thread cache:
线程缓存是每个线程独有的,⽤于⼩于256KB的内存的分配,线程从这⾥申请内存不需要加锁,每个线程独享⼀个cache,这也是该并发线程池⾼效的地⽅。
central cache:
中⼼缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免⼀个线程占⽤了太多的内存,⽽其他线程的内存吃紧,达到内存分配在多个线程中均衡调度的⽬的。central cache是存在竞争的,所以从这⾥取内存对象是需要加锁,⾸先这⾥⽤的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这⾥竞争不会很激烈。
page cache:
⻚缓存是在central cache缓存上⾯的⼀层缓存,存储的内存是以⻚为单位存储及分配的,central cache没有内存对象时,从page cache分配出⼀定数量的page,并切割成定⻓⼤⼩的⼩块内存,分配给central cache。当⼀个span的⼏个跨度⻚的对象都回收以后,page cache会回收central cache满⾜条件的span对象,并且合并相邻的⻚,组成更⼤的⻚,缓解内存碎⽚的问题。
在做高并发内存池之前我们先完成一个定长内存池,主要是用来熟悉自由链表和为高并发内存池提供固定的空间申请。
池化技术:
所谓“池化技术”,就是程序先向系统申请过量的资源,然后⾃⼰管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较⼤的开销,不如提前申请好了,这样使⽤时就会变得⾮常快捷,⼤⼤提⾼程序运⾏效率。 在计算机中,有很多使⽤“池”这种技术的地⽅,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若⼲数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程⼜进⼊睡眠状态。
内存池:
内存池是指程序预先从操作系统申请⼀块⾜够⼤的内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,⽽是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,⽽是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决的问题:
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的⻆度,还需要解决⼀下内存碎⽚的问题。那么什么是内存碎⽚呢? 内存碎⽚分为外碎⽚和内碎⽚,外部碎⽚是⼀些空闲的连续内存区域太⼩,这些内存空间不连续,以⾄于合计的内存⾜够,但是不能满⾜⼀些的内存分配申请需求。内部碎⽚是由于⼀些对⻬的需求,导致分配出去的空间中⼀些内存⽆法被利⽤。
malloc申请内存能适应于大多数场景,是通用的,但在某些特定场景下效率并不高,而且可能会产生大量的内存碎片。
定长内存池的原理

如上所述,我们先向系统申请一大块空间,然后当我们需要申请内存时就从这块空间上获取,我们把空间用完后不用急着释放回系统,而是把它用一个自由链表连接起来,进行二次利用。也就是说我们再次申请空间时可以在这块废弃的空间上找,如果没有再去大空间上找,如果还不够再去向系统申请。
框架
封装一个类,它的成员变量应包括:一个size_t类型储存该块空间的字节大小,一个void*指针储存自由链表的头,char*指针指向这块空间的起始地址,char*类型指针每加1只向后移动一字节,方便指针的移动。因为我们每被用户申请一小块空间后,指针往后移动到下一个为没利用的空间的起始地址。
template <typename T>
class FixedMemoryPool
{
public:
T *New();
void Delete(T *p);
private:
size_t _sumSize = 0;
char *_memory = nullptr;
void *_list_head = nullptr;
};初始化操作只需要在成员变量声明这里进行,然后使用默认的构造函数就可以,主要需要我们来实现New和Delete的逻辑。
Delete实现
Delete主要就是来维护自由链表,我们先来实现自由链表,因为在New中需要先在自由链表中查找空间。首先执行它的析构清空在它身上申请的空间。因为头插比较方便,我们直接把它头插到自由链表中。接下来就是头插操作:
我们把这块需要插入到链表的废弃空间记为p,首先把_list_head储存到p空间的前4/8字节,因为并不确定用户用的是32位系统还是64位系统,所以可以用这样一个操作来解决:
*(void **)p = _list_head;
然后_list_head = p,如下:
void Delete(T *p)
{
p->~T();
*(void **)p = _list_head;
_list_head = p;
}New实现
我们先来定义一个T* ret用来储存返回值,然后先去自由链表里找空间,如:
if (_list_head != nullptr)
{
ret = (T *)_list_head;
_list_head = *(void **)_list_head;
}大家看到 _list_head = *(void **)_list_head 这个代码可能会一点懵我来逐一讲解以下:
_list_head是一个void*指针,储存了一个地址,而这个地址空间里面又储存了下一个节点的指针,(void**)相当于告诉编译器我是指向一个void*类型的指针,然后进行解引用得到这块地址,并更新_list_head。
如果链表里没有空间向大空间里获取,但这个块大空间是需要向系统申请的,不是一开始就存在。
对于向系统申请大块内存,在windows下我们使用VirtualAlloc函数,与malloc相比VirtualAlloc直接与操作系统交互,无额外开销,效率高,通常用来申请超大块内存。
VirtualAlloc的声明
LPVOID VirtualAlloc(
LPVOID lpAddress,
SIZE_T dwSize,
DWORD flAllocationType,
DWORD flProtect
);返回值:LPVOID ,本质是void*,需强制类型转换后使用。
MEM_COMMIT:提交内存,使其可用。
MEM_RESERVE:保留地址空间,暂不分配物理内存。
MEM_RESERVE | MEM_COMMIT),同时保留并提交。
PAGE_READWRITE:可读/写。
PAGE_NOACCESS:禁止访问(触发访问违规)。
PAGE_EXECUTE_READ:可执行/读。
因为还要考虑其他系统的需求,我们在Common.h内封装一个向系统申请内存的函数,并使用条件编译来选择不同的函数调用,如下:
#ifdef _WIN32
#include <windows.h>
#else
//Linux...
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}else
{
int objSize = max(sizeof(T), sizeof(void *));
if (_sumSize < objSize)//空间不够向系统申请
{
_sumSize = 128 * 1024;
_memory = (char*)SystemAlloc(_sumSize >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
ret = (T *)_memory;
_memory += objSize;//移向未被利用的起始空间
_sumSize -= objSize;//更新剩余空间的大小
}objSize的作用:
因为我们要保证这块空间至少要能存放得下一个地址,要不然被弃用后无法连接到自由链表。
注意这里_sumSize不能是+=128*1024,因为_memory的初始地址已经更新了,要与_memory匹配。
最后对ret使用定位 new 后返回即可。
性能测试
源码已放在下文,如下是测试结果:

我们可以看到用定长内存池比new申请内存要快得多得多。
注:new的底层调用的就是malloc。
源码
ObjectPool.h
#define _CRT_SECURE_NO_WARNINGS 1
#include <iostream>
#include <string>
#include <algorithm>
#include <memory>
using namespace std;
#ifdef _WIN32
#include<windows.h>
#else
//
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
namespace my_MemoryPool
{
template <typename T>
class FixedMemoryPool
{
public:
T* New()
{
T* ret = nullptr;
if (_list_head != nullptr)
{
ret = (T*)_list_head;
_list_head = *(void**)_list_head;
}
else
{
int objSize = max(sizeof(T), sizeof(void*));
if (_sumSize < objSize)//空间不够向系统申请
{
_sumSize = 128 * 1024;
_memory = (char*)SystemAlloc(_sumSize >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
ret = (T*)_memory;
_memory += objSize;
_sumSize -= objSize;
}
new (ret) T;
return ret;
}
void Delete(T* p)
{
p->~T();
*(void**)p = _list_head;
_list_head = p;
}
private:
size_t _sumSize = 0;
char* _memory = nullptr;
void* _list_head = nullptr;
};
}test.cc
#include "ObjectPool.h"
#include <vector>
using namespace my_MemoryPool;
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
void TestObjectPool()
{
// 申请释放的轮次
const size_t Rounds = 5;
// 每轮申请释放多少次
const size_t N = 100000;
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
v1.push_back(new TreeNode);
for (int i = 0; i < N; ++i)
delete v1[i];
v1.clear();
}
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
FixedMemoryPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
v2.push_back(TNPool.New());
for (int i = 0; i < N; ++i)
TNPool.Delete(v2[i]);
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
int main()
{
TestObjectPool();
return 0;
}Thread Cache实现有些类似定长内存池,但是定长内存池有一个缺点,即它每次申请的空间只能是固定的,它的核心原理是:
先向系统申请一大块空间,然后当我们再需要申请内存时就从这块空间上获取,我们把空间用完后不用急着释放回系统,而是把它用一个自由链表连接起来,进行二次利用。也就是说我们再次申请空间时直接到自由链表上找,如果没有再去大空间上找,如果还不够再去向系统申请。
但我们希望从开辟的内存池申请的空间不是固定的,它可以是4字节、10字节、50字节,500字节等等。申请多少字节不是关键的问题,因为字节大小本来就是可以随意申请的,问题在于这些空间被弃用后,我们如何把它们管理起来并进行二次利用。
定长内存池使用一个自由链表来维护弃用的空间,那么我们用多个自由链表来维护不同的字节大小空间,需要多少字节就到指定的自由链表去找不就行了吗?答案是:是的!
为了高效的找到相应的自由链表,我们可以做一个哈希表。如下:

需要注意几个问题:
为突出该哈希表的性质,下文就称它为自由链表桶。
框架设计
首先用一个common.h文件来写一些需要用到的公共的类,比如自由链表的封装,对齐数的计算 和 哈希表下标映射的计算也封装成类放在里面。如下:

FreeList:
SizeClass:
这里把它写成静态成员函数是为方便在没有类对象情况下对函数的直接调用,因为现在并没有成员变量所以没必要实例化出对象。
可以把它做为内联函数,减少函数栈帧的开辟。
创建ConcurrentAlloc.h,在这个文件里提供一些直接能让用户使用的类和接口。我们封装一个ConcurrentAlloc类,如下:

接下来就是创建ThreadCache.h文件,来声明一些在thread cache这一层需要用到的类方法等。如下:

自由链表封装
Push实现
Push功能是把不用的空间放到自由链表中,因为头插效率比较高时间复杂度为O(1),我们直接把它头插到自由链表中。接下来就是头插操作:
把这块需要插入到链表的空间记为obj,首先把_list_head值储存到obj空间的前4/8字节,因为并不确定用户用的是32位系统还是64位系统,所以可以用这样一个操作来解决:
*(void **)obj = _list_head;
然后让obj成为新的头,即_list_head = obj,如下:
void Push(void *obj)
{
*(void **)obj = _list_head;
_list_head = obj;
}Pop实现
Pop是从自由链表中获取到内存,我们直接取头节点即可,如下:
void *Pop()
{
void *obj = _list_head;
_list_head = *(void **)obj;
return obj;
}Empty实现
直接返回 _list_head == nullptr即可
对齐数计算和下映射计算
在这里做这样一个规则把内存浪费率控制在10%左右:
[1,128] | 8byte对齐 | _freelist[0,16) |
|---|---|---|
[128+1,1024] | 16byte对齐 | _freelist[16,72) |
[1024+1,8*1024] | 128byte对齐 | _freelist[72,128) |
[8*1024+1,64*1024] | 1024byte对齐 | _freelist[128,184) |
[64*1024+1,256*1024] | 8*1024byte对齐 | _freelist[184,208) |
浪费率:
(浪费的字节数 / 对齐后的字节数)*100%
[1,128] 区间对齐数为8byte,所以最大的浪费数为7byte,而区间内的前8个字节对齐后申请的内存是8,所以浪费率 = (7/8)*100%

87.50%
同理各个区间的浪费率为:
[1,128] | (7/8)*100%87.50% |
|---|---|
[128+1,1024] | (15/16*9)*100%10.42% |
[1024+1,8*1024] | (127/128*9)*100%11.02% |
[8*1024+1,64*1024] | (1023/1024*9)*100%11.10% |
[64*1024+1,256*1024] | ( (8*1024-1) / (8*1024*9) )*100%11.10% |

虽然[1,128]区间浪费率有点高但实际浪费的并不多,无伤大雅。
对齐后字节大小计算
接下来就是计算对齐数,也就是让用户任意扔一个数字过来,需要通过对齐规则计算它实际开辟的空间。在RoundUp中实现。
我们可以用 if else 把数据分开处理,如下:
static inline int RoundUp(size_t bytes)
{
if (bytes <= 128)
return _RoundUp(bytes, 8);
else if (bytes <= 1024)
return _RoundUp(bytes, 16);
else if (bytes <= 8 * 1024)
return _RoundUp(bytes, 128);
else if (bytes <= 64 * 1024)
return _RoundUp(bytes, 1024);
else if (bytes <= 256 * 1024)
return _RoundUp(bytes, 8 * 1024);
else
{
assert(false);
return 1;
}
}_RoundUp函数实现也很简单,如果这个数本身就对齐直接返回即可,如果不是再做简单的处理,如下:
int _RoundUp(size_t size, size_t alignNum)
{
if (size % alignNum == 0)
return size;
else
return (size / alignNum + 1) * alignNum;
}但是代码中又是取模,又是除,效率还略逊一筹,可以考虑把它改成位运算,即:
static inline int _RoundUp(size_t bytes, size_t alignNum)
{
return (bytes + alignNum - 1) & ~(alignNum - 1);
}这里大家可以带进去数值来理一理。
下标映射计算
接下来是实现Index,假设我们已经知道用户要申请的空间大小(size)和对齐数是2的多少次方(align_shift),那么可以做这样的计算:
static inline int _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}1左移align_shift位得到对齐数,再减1,加上bytes,作用是把bytes中不足下一个alignNum倍数的部分补上。然后右移align_shift相当于除以对齐数,最后再减去1因为数组下标是从0开始的。
以上计算出来的只是这个对齐后的字节数 对于相应 对齐数 在数组上起始位置的相对位置,还需要加上前面对齐占用的数组元素个数才能得到正确的下标。即:
static inline int _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128)
return _Index(bytes, 3);
else if (bytes <= 1024)
return _Index(bytes - 128, 4) + group_array[0];
else if (bytes <= 8 * 1024)
return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];
else if (bytes <= 64 * 1024)
return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
else if (bytes <= 256 * 1024)
return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
else
{
assert(false);
return -1;
}
}回顾上文框架,以上我们相当于完成了common.h文件,接下来处理ThreadCache和ConcurrentAlloc的封装就简单多了。
刚才我们设计了对齐数,接下来就可以确定自由链表桶的大小了,即208(从对齐规则表里得到)。我们可以把208做成一个全局的 static const size_t 类型 或者 宏定义。
单独创建一个ThreadCache.cpp文件用来做接口的实现:
#include "ThreadCache.h"
void* ThreadCache::Allocate(size_t bytes)
{
//得到对齐后的字节数
int objSize = SizeClass::RoundUp(bytes);
//得到自由链表桶的下标映射
int intex = SizeClass::Index(bytes);
//先去链表桶里面申请内存,如果没有到CentralCache中获取
if(!_freelists[intex].Empty())
return _freelists[intex].Pop();
else
return FetchFromCentralCache(intex, objSize);
}
void ThreadCache::Deallocate(void* obj,size_t bytes)
{
int intex = SizeClass::Index(bytes);
_freelists[intex].Push(obj);
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//... ...
return nullptr;
}我们希望每个线程都有自己独立的Thread Cache对象,但又不想让用户在线程内自己申请。那么可以直接把它定义为全局变量 并在前面加上static thread_local关键字,让各自线程在自己的线程局部储存内都申请一份Thread Cache对象,而不是共享。
所以在ThreadCache.h文件中定义:
static thread_local ThreadCache* pTLSThreadCache = nullptr;
ConcurrentAlloc封装如下:
#pragma once
#include "Common.h"
#include "ThreadCache.h"
class ConcurrentAlloc
{
public:
static void* ConAlloc(size_t size)
{
if(pTLSThreadCache == nullptr)
pTLSThreadCache = new ThreadCache;
return pTLSThreadCache->Allocate(size);
}
static void ConFree(void* ptr,size_t size)
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr,size);
}
};CentralCache结构
Central Cache的结构和Thread Cache类似,同样是用一个哈希桶来管理内存,只不过更复杂了些。这种类似的结构也使得在Thread Cache层封装的各种处理方法可以直接拿过来用,方便了很多。
Central Cache的内存管理图如下:

这是一个哈希桶,里面储存的是一个span类型双向链表,span这个单词的意思是:跨度、宽度、跨距...... span是以页为单位的内存,里面又被分割成了多个小的内存块,即自由链表。然后又根据不同的对齐规则得到多个不同span双向链表组成一个桶。
当然一个span里面不单只有自由链表,里面还有各种管理信息,在下文会细讲。
CentralCache与ThreadCache的区别
ThreadCache在每个线程内各自有一份,不存在锁的竞争。
CentralCache是所有线程共用唯一一份,它是临界资源,需要加锁,但它不是直接就给哈希桶加一把大锁,而是在哈希桶内每一个span链有独立的锁,把它称为桶锁,所以锁竞争并不激烈。也就是只有当两个线程同类型内存块用完了,并且同时申请span桶,并映射到同一个span链上才会有锁竞争,所以触发概率非常小,效率依旧很高。
CentralCache如何与ThreadCache联动
1.申请过程
ThreadCache内自由链表桶没有相应的内存了,会到span桶中找,通过映射关系找到对应的span链,而在span链中只在一个span块中找内存,自由链表桶向span桶申请内存实际上是只要一块的,但是这样一个一个的给效率未免有些太低。就需要多给几块,那该怎么分呢?下文会解答。
当span链中没内存了再向下一层Page Cache申请。
2.释放过程
有时某些线程(线程1)可能需要大量的内存,不断地向span桶中申请。而用完后就被放回它自己的自由链表桶了,后面有线程再需要向span桶申请内存就没得了,大部分内存都堆积在线程1自由链表桶,而线程1又暂时用不了这么多,这个时候就需要还给span桶,来提供给其他线程使用。
因此在span链中,每个span里面的自由链表的节点个数是随机的,并不是第一个span的自由链表空间就是满,也可能是nullptr。
CentralCache在等线程把拿去用的空间全部还回来后,交个下一层PageCache进行整合,缓解内存碎片问题。这也是把span链做成双链表的一个原因,方便任意的span断开给PageCache。而span是怎么知道内存是否被全部还回来呢?其实在span内通过一个_useCount计数器来判断,在下文会提到。
所以CentralCache就起到一个承上启下均衡调度的作用。
span结构与span链
span链表的创建和自由链表一样我们放在common.h这个公共的文件里,因为span的内存单位是页,在span里面需要储存起始页的地址,那么这个地址要用什么类型来存是个问题。
地址实际上就是一个数字,假设以8kb为一页,在32位机器上的最大编址就是2^32/8kb = 2^19,如果是64位机器的话,最大的编址是2^64/8kb = 2^51,所以我们通过条件编译来做不同的储存方案。如下:
#ifdef _WIN64
typedef unsigned long long PANGE_ID;
#elif _WIN32
typedef size_t PANGE_ID;
#else
//linux
#endif接下来就只用拿着PANGE_ID这个类型去储存页号就行。
注意:32位机器有_WIN32的定义,但64位机器既有_WIN64的定义又有_WIN32的定义,所以_WIN64的判断必须放在第一位。
Span结构如下:
struct Span
{
PANGE_ID _pageId = 0;//⼤块内存起始⻚的⻚号
size_t _n = 0;//页的数量
struct Span* _prev = nullptr;
struct Span* _next = nullptr;
size_t _useCount = 0;
void* _freeList = nullptr;
//... ...
};其余的成员函数在本章暂时用不到,就不列出来了。
然后封装一个类,用来做span链表的头,和提供一些操作方法:
class SpanList
{
public:
SpanList()
{}
void Insert(Span* pos, Span* newNode);
void Erase(Span* node);
private:
Span* _head;
public:
std::mutex _mtx;
};这个是一个双向链表所以在构造函数时就需要申请节点空间,并把指针头尾做连接,如下:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}剩下的两个接口实现就比较基础, 在这里就不再讲解,文末会给出源码。
CentralCache类
CentralCache类核心就是一个span链表桶,因为所有线程共用一份,所以把它作为单例模式(饿汉模式)防止被创建多份,创建在CentralCache.h文件里。
单例模式需要注意的事项:
class CentralCache
{
private:
CentralCache() {}
CentralCache(const CentralCache&) = delete;
CentralCache& operator=(const CentralCache&) = delete;
public:
static CentralCache* GetInstance();
//......
private:
SpanList _spanLists[FREE_LIST_SIZE];//span桶
static CentralCache _sInst;//CentralCache对象声明
};其它类方法在这里先不设,要不然挺突兀的,在下文分析如何申请内存时再根据需要添加。
CentralCache类方法
接下来讲的所有内容都是对FetchFromCentralCache接口进行实现。

申请多少空间的问题
上文提到一次不能就只给一块空间,这样效率太低,而给多浪费,给少效率低,该怎么平衡呢?可以做这样的算法处理:
说明:
慢开始反馈调节算法:
static int const MAX_BYTES = 256 * 1024;
static inline size_t NumMoveSize(size_t size)
{
int ret = MAX_BYTES / size;
//把个数控制在[2,512]之间
if (ret < 2) ret = 2;
if (ret > 512) ret = 512;
return ret;
}因为是对申请内存相关的计算所以把这个函数放到common.h的SizeClass类里,而MAX_BYTES定义放在文件开头更为合理。
以上满足了1,2点要求,针对3,4点,在ThreadCache的自由链表桶里添加成员变量来记录这次该申请多少,然后这个计数会根据申请的次数增加而增加。

注意:为了在外部对_maxSize修改,这个把返回值设为左值引用。
int batchNum = std::min(SizeClass::NumMoveSize(size), _freelists[index].MaxSize());
if (batchNum == _freelists[index].MaxSize())
_freelists[index].MaxSize() += 1;取两种方案得到的最小值,如果这个最小值是_maxSize则_maxSize增大,增大的幅度可以是1,2,3等等,这里就设为1。
内存申请
接下来就是去CentralCache里取batchNum个大小为size的内存,实际上就是取对应的span块中的一小段自由链表,用什么来存储呢?
因为是一段自由链表,就需要获得它的头尾,所以用这样两个变量:
先把代码实现给出,然后再来解释:
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//计算batchNum大小和一些准备工作
int batchNum = std::min(SizeClass::NumMoveSize(size), _freelists[index].MaxSize());
if (batchNum == _freelists[index].MaxSize())
_freelists[index].MaxSize() += 1;
void* start = nullptr;
void* end = nullptr;
//向CentralCache申请内存
int actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 0);
if (actualNum == 1)
{
assert(start == end);
return start;
}
else
{
//插入到线程自己的自由链表桶中,因为start将要被使用所以不用插。
_freelists[index].PushRange(Nextobj(start), end);
return start;
}
}内存申请是在span桶内完成的所以在CentralCache中做一个类方法FetchRangeObj,这个函数我们待会来实现,现在先把当前函数逻辑走完。
注意:我们最终只在一个span节点上申请内存,所以span中自由链表节点个数可能不够batchNum个,不过没关系,我们把实际申请到的节点个数返回,这里使用actualNum记录。
现在申请到内存了,需要把它连接到线程自己的自由链表里面,但需要分两种情况:
这里封装了Nextobj接口用来取到自由链表的下一个节点,实现很简单如下:
static void*& Nextobj(void* obj)
{
return *(void**)obj;
}注意:把它做成静态函数放在common.h文件里供全局使用,返回值类型设为一个引用,这样就可以对节点进行修改。
然后在自由链表里面封装一个方法PushRange,用来实现插入一段自由链表,比较简单直接把它插到头节点后面就行,这里就不在展示,文末会附有源码。
FetchRangeObj的实现
FetchRangeObj是CentralCache的类方法,我们再做一个对应的源文件CentralCache.cpp,然后记得把CentralCache(单例模式)的定义放在这个.cpp文件里面。
在FetchRangeObj首先调用类方法得到span链表桶的下标从而确定相应的span链表,然后上锁。
此时我们只是找到了span链表,然后需要在span链表里找到一个span节点,找span节点这个过程我们再单独封装一个方法GetOneSpan,同样放CentralCache类里面,因为它比较复杂,如果没有可以用的span节点还需要向下一层PageCache申请,该函数在本文不会实现,请期待下一篇博客的讲解!
现在假设我们在GetOneSpan内已经申请到一个span节点,接下来去取内存,让start和end都指向span的自由链表头,然后end往后移并做计数,但是需要注意几个问题:
实现如下:
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
//... ...
return nullptr;
}
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
//计算Span链表桶下标
size_t index = SizeClass::Index(size);
//访问公共资源需要加锁
_spanLists[index]._mtx.lock();
//获取一个Span节点
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
//从Span节点中取到小块内存,如果足够batchNum个就取batchNum个,不足则全部取出。
start = end = span->_freeList;
int ret = 1;
while (ret < batchNum && Nextobj(end) != nullptr)
{
end = Nextobj(end);
ret++;
}
span->_freeList = Nextobj(end);
Nextobj(end) = nullptr;
span->_unCount += ret;
_spanLists[index]._mtx.unlock();
return ret;
}当取完内存后需要做善后处理:
在以上内容中,我们深入探讨了内存管理机制中在 ThreadCache 和 CentralCache两个层级进行内存申请的具体实现。这两层缓存作为高效的内存分配策略,能够快速响应线程的内存需求,减少锁竞争,提升程序性能。 本期文章将继续沿着这一脉络,聚焦于PageCache层级的内存申请逻辑。作为内存分配体系的更高层级,PageCache承担着从操作系统获取大块内存,并对其进行管理和再分配的重要职责。通过对PageCache内存申请流程的剖析,我们将完整地勾勒出整个内存分配体系的工作流程。这不仅有助于我们理解内存分配的底层机制,也为后续深入探讨内存释放策略奠定了基础。 接下来,让我们一同走进PageCache的内存世界。
PageCache的概述
⻚缓存是在central cache缓存上⾯的⼀层缓存,存储的内存是以⻚为单位存储及分配的。当central cache没有内存对象时,从page cache分配出⼀定数量的page,并切割成定⻓⼤⼩的⼩块内存,分配给central cache。当⼀个span的⼏个跨度⻚的对象都回收以后,page cache会回收central cache满⾜条件的span对象,并且合并相邻的⻚,组成更⼤的⻚,缓解内存碎⽚的问题。
PageCache结构

PageCache的结构是一个哈希表,如上图所示,也就是一个储存span双链表的数组,这一点和CentralCache的结构完全类似,区别在于这里的Span是大块的页(1页~128页)没有进行分割,而CentralCache中的Span是分割好的小块内存(自由链表),这样做的好处是方便页与页之间的分割与合并,有效的缓解内存外碎片,在下文会详细讲解。
PageCache结构的哈希映射方法是直接定址法,下标为n的位置储存的Span双链表中每个节点有n个页。
这个哈希表也是所有线程共用一个,属于临界资源需要加锁,但不是桶锁,而是一整个大锁,为什么呢?同样在下文细讲。
注:通常以4KB或8KB为一页,这里我们以8KB为一页。
内存申请的核心流程

CentralCache

PageCache

系统
当ThreadCache的自由链表内没有内存时,会向CentralCache中的一个Span申请,如果没有可用的Span,就向PageCache申请一个Span大块页,如果没有Span大块页了,最后才会向系统申请。
那么CentralCache具体是如何向PageCache申请内存的呢?
同样的PageCache一次只给一个Span,然后把这个Span切割成小块连接到CentralCache的哈希桶中,要给几页的Span呢?,这需要根据要将它切割成多大为单位的小块内存来进行具体分配,如果要切较小的内存块,页数就给小一些,如果要切大内存块,页数就给多一些。这里我们就先记为n页的Span。
因为PageCache中的哈希映射是直接定址法,所以直接找到n下标位置的Span链,如果有Span则取出并切成小块连接到CentralCache的哈希桶中。
如果没有n页的Span并不是直接去系统申请,而是到更大的页单位去找Span。比如找到有(n+k)页的Span,k>0,n+k<=128,那么把(n+k)拆开为n页与k页,并连接到相应位置。这样就有了n页的Span,把它切割和连接就行。
如果直到找到128页也没有找到可用的Span那么就向系统申请128页的内存,即128*8*1024字节,然后再执行上面逻辑。这就是整个在PageCache申请内存的逻辑。
处理临界资源互斥问题
如上,PageCache的哈希桶中各个桶是互相联动的,主要体现在这三方面:
所以这里不像CentralCache结构的桶相互独立,如果用桶锁会造成频繁的锁申请和释放,效率反而变得很低。用一个大锁来管理整个哈希桶更为合理。
PageCache类的设计
把这个类的设计放在头文件PageCache.h里,它核心就两个成员变量:锁和哈希桶,然后再声明一个用来申请Span的成员函数,如下:
class PageCache
{
public:
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
};PageCache类和CentralCache类一样所有线程共用一份,所以把它创建为单例模式,它分为以下几步:
代码示例:
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
PageCache() {}
PageCache(const PageCache&) = delete;
SpanList _spanLists[NPAGES];
static PageCache _sInst;
};代码实现
1.GetOneSpan的实现
回顾上一期我们只是假设通过GetOneSpan从CentralCache中取到了Span,然后继续做后面的处理。接下来我们一起来实现GetOneSpan函数。
因为要在Span链表中取一个有用的Span节点,所以需要遍历Span链表,那么可以模拟一个迭代器。我们在SpanList类中封装这两个函数:
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}注:SpanList是一个带头双向环形链表。
接下来遍历链表找到可用的Span并返回,如果没有则到PageCache中申请。
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
it = it->_next;
}
//走到这里说明没有可用的Span了,向PageCache中申请。
//......
}注意在PageCache中申请的是大块的Span页,还需要把它切割,然后连接到CentralCache的桶中并返回。
在PageCache类中我们准备实现的函数NewSpan就是用来实现这个功能,现在需要考虑的是要传入的参数是多少,即要申请多少页的Span。
这里我们是以8KB为一页,即 1页=8*1024字节(2^13字节)为方便后面做位运算,我们在Common.h文件定义一个这样一个变量:
static int const PAGE_SHIFT = 13;
当然了这里也可以做成宏定义。
接下来在SizeClass类里封装一个函数NumMovePage用来计算一个size需要申请几页的Span,如下:
//用来计算ThreadCache向CentralCache申请几个小内存块
static inline size_t NumMoveSize(size_t size)
{
int ret = MAX_BYTES / size;
if (ret < 2) ret = 2;
if (ret > 512) ret = 512;
return ret;
}
//用来计算CentralCache向PageCache申请几个页的Span
static inline size_t NumMovePage(size_t size)
{
int num = NumMoveSize(size);
int npage = num * size;
npage >>= PAGE_SHIFT; //除以8KB
if (npage < 1) npage = 1;
return npage;
}在此,大家无需过度纠结于该设计背后的原理。这或许是相关领域的资深专家在历经大量测试与实践后,所总结提炼出的有效策略,我们继续走下面的逻辑。
//申请Span
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
//切割Span
//......NewSpan函数我们待会再设计,现在假设已经申请到一个Span的大块页,接下来就是进行切割。
首先我们需要得到的是这个页的起始地址与终止地址,在这个范围内以size的跨度切割。
用char*的指针变量start,end分别来储存起始地址和终止地址,用char*类型是因为char*类型指针变量加多少,就移动多少字节,方便后面做切割运算。
起始页地址的获取:
取到Span的页号,用页号乘以8KB(2^13字节)就能得到页的起始地址,即:
char* start = (char*)(span->_pageId << PAGE_SHIFT);
终止页地址的获取:
取到Span的页数,用页数乘以8KB再加上起始页地址就能得到终止页地址,即:
int bytes = span->_n << PAGE_SHIFT; char* end = start + bytes;
注:页号是通过申请到的内存的虚拟地址除以8KB得到的。
接下来把start连接到Span的自由链表中,然后使用循环把[start,end]这块内存以size为跨度进行切割,如下:
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
it = it->_next;
}
//申请Span
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
//切割Span
char* start = (char*)((PAGE_ID)span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
span->_freeList = start;
char* tmp = start;
start += size;
while (start < end)
{
Nextobj(tmp) = start;
tmp = (char*)Nextobj(tmp);
start += size;
}
Nextobj(tmp) = nullptr;
span->_objSize = size;
// 切好span以后,需要把span挂到桶里面去的时候,再加锁
list._mtx.lock();
list.PushFront(span);
return span;
}这里直接以尾插的方式切,好处在于用户在使用内存时是连在一块的,相关的数据会被一并载入寄存器中,可以增加高速缓存命中率,提高效率。
最后实现一个类方法PushFront,用来把Span连接到哈希桶里。
void PushFront(Span* node)
{
//在Begin()前插入node,即头插
Insert(Begin(), node);
}NewSpan的实现
NewSpan我们放在源文件PageCache.cpp中实现。
查找当前桶
首先可以用assert断言页数的有效性,然后直接定址找到Span链,如果不为空返回一个Span节点,如果为空到后面的大页去找,如下:
Span* PageCache::NewSpan(size_t k)
{
//判断页数的有效性
assert(k > 0 && k < NPAGES);
if (!_spanLists[k].Empty())
{
return _spanLists[k].PopFront();
}
//到后面更大块的Span页切割
//......
//向系统申请内存
//......
}到大页切割
当k号桶没有Span后,从k+1号桶开始往后找,比如找到i号桶不为空,则把Span节点取出来,记为nSpan,然后切割为k页和i-k页,切割的本质就是改变页数_n和页号_pageId。
new一个Span空间,用kSpan变量指向,然后把nSpan头k个页切出来储存到一个kSpan中,即:
这样就相当于把原来大块的nSpan切割成了两块,最后把割剩下的nSpan插入到对应的哈希桶中,把kSpan返回。
for (int i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
}向系统申请
当k号桶往后的桶都是空的,那么我们就需要向系统申请内存了,直接申请一个128的页,放在128号桶,再执行上面的逻辑。
对于内存申请,在windows下我们使用VirtualAlloc函数,与malloc相比VirtualAlloc直接与操作系统交互,无额外开销,效率高,通常用来申请超大块内存。在定长内存池部分已封装。
接下来继续NewSpan的执行逻辑,向系统申请内存后,new一个Span储存相关的页信息,即计算页号(地址/8KB),填写页数,然后把Span连接到128号哈希桶,最后需要做的就是把它切割为k页的Span和(128-k)页的Span和前面的逻辑一模一样,为提高代码复用率以递归的方式返回。如下:
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
if (!_spanLists[k].Empty())
{
return _spanLists[k].PopFront();
}
for (int i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
}
//向系统申请内存
Span* span = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
span->_pageId = (PANGE_ID)ptr>>PAGE_SHIFT;
span->_n = NPAGES - 1;
_spanLists[span->_n].PushFront(span);
return NewSpan(k);
}加锁保护
PageCache哈希桶是临界资源,需要对它进行加锁保护。上文已经讲过只用加一个大锁,而不是用桶锁。
因为NewSpan函数涉及递归,需要使用递归锁,这样比较高效。但这里也可以把锁加到NewSpan外面,也就是GetOneSpan函数里,如下:

其次有一个优化的点:线程进入PageCache这一层之前是先进入了CentralCache的,并在这一层加了桶锁,那么此时CentralCache的哈希桶它暂时不用,但可能其他线程要用,可以先把锁释放掉,等从PageCache申请完内存后再去申请锁。
虽然说线程走到PageCache这一层说明CentralCache的哈希桶已经没有内存了,其他线程来了也申请不到内存,但别忘了还有内存的释放呢。把桶锁释放了,其他线程释放内存对象回来,就不会阻塞。如下:

经过前面攻坚,我们已完整实现了内存动态申请的核心模块。接下来将进入关键阶段——内存释放机制的理解与实现,这是构建完整 高并发内存池 的最后一块技术拼图。该模块完成后,项目主体架构将基本成型(达90%),后续主要聚焦于边界测试和性能调优。


在ThreadCache层我们探讨过这样一个问题,某一时段某个线程可能需要大量内存,不断向span桶中申请。而用完后这些内存就被放回它自己的自由链表桶了,导致后续线程再需要向span桶申请内存时资源不足,大部分内存都堆积在这个线程自由链表桶。
为了解决这样的问题,我们需要将部分内存归还给CentralCache。那么,自由链表达到多长时需要放回span桶呢?
之前在设计应该从span桶取多少内存块比较合适时我们也有过类似的困扰,当时设计了一个慢增长算法,每个自由链表中存了一个_maxSize,这里我们就做简单一点,用_maxSize来判断,即当一个自由链表长度超过一个批量(_maxSize)应该申请的内存时,我们把自由链表的_maxSize个节点还回span桶。
首先在class ThreadCache中声明一个函数,用来把要释放的内存截下来送入CentralCache层。

作用是把list中自由链表的_maxSize个节点截下来,然后送入CentralCache层,因为需要找到对应的哈希桶,所以传入内存块大小。
然后在Deallocate函数(释放内存)中检查是否满足归还span桶的条件,如下:
void ThreadCache::Deallocate(void* obj, size_t bytes)
{
int index = SizeClass::Index(bytes);
_freelists[index].Push(obj);
if (_freelists[index].Size() >= _freelists[index].MaxSize())
{
size_t size = SizeClass::RoundUp(bytes);
ListTooLong(_freelists[index], size);
}
}注:Size用来获取链表长度,这里就不展示了,可以用这两种方法之一:
因为每次使用Size函数都遍历链表会影响效率,所以这里选用添加_size成员的方法。
接下来实现ListTooLong函数:
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
//交给CentralCache
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}同样的使用start和end指针去取内存,在class FreeList里封装一个函数PopRange,如果你看懂了之前内存申请的代码,这里的实现就显得很简单,不在一一讲解。代码示例:
//取一段内存
void PopRange(void* start, void* end, int num)
{
assert(num <= _size);
start = end = _list_head;
for (int i = 0; i < num - 1; i++)
{
end = Nextobj(end);
}
_list_head = Nextobj(end);
Nextobj(end) = nullptr;
_size -= num;
}小节目标:ReleaseListToSpans(start, size)的实现

CentralCache内存回收的主要任务就是把ThreadCache送来的内存连接到span桶中,其次,同样的问题,什么时候送到下一层呢?这个问题我们探讨过,就是当span中的所有内存块都回收回来时。
记住:理解为什么要span中的所有内存块回收回来后送入下一层这一点很重要,在下文会讲到。
回收的内存插入到Span的自由链表中
现在有一个问题:内存块要插入到哪个span?我们知道的size仅能找到它所在的哈希桶,也就是一个span链,要具体到某个span节点是一个很让人头疼的问题。
注意:不能粗鲁地把内存块连接到一个随便找的span,这样的话,属于不同页的内存块就混乱在一起,后面的PageCache就没法玩了!
追溯到本质(思考内存块从“出生”到现在的这个过程):内存块连接到某个span,实际上是要找到内存块属于那个页号,然后连接到这个页号对应的span中。
页号成为了它们之间的脐带,内存块地址除以8KB就是它所在的页,而一个span的信息中也能找到它管理着哪些页,只需要做一个页号到span的映射(即哈希表)就能解决问题。页级别的内存管理是在PageCache,我们把哈希映射在PageCache层创建和维护。
在PageCache中添加成员:
_idSpanMap在哪里填写呢?答:在页分配时。在向系统申请内存和给span分配页前是不能确定页号和span的对应关系的,所以哈希映射是随程序运行的动态填写过程。
将要被连接到CentralCache层的span,需要把它管理的所有页都映射到span中,因为拆开的内存块能获取到的是一个确定的页号。放回PageCache的span只需要它管理的开始页号和尾页号映射到span,用于span合并时对相邻页的查找。
如下:
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
//建立映射
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
kSpan->_isUse = true;
return kSpan;
}
for (int i = k + 1; i < NPAGES; i++)
{
//切割页(即页分配)
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
kSpan->_n = k;//页数
nSpan->_n -= k;
//填写页号
kSpan->_pageId = nSpan->_pageId;
nSpan->_pageId += k;
//建立映射
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
_spanLists[nSpan->_n].PushFront(nSpan);
kSpan->_isUse = true;//表示span被CentralCache使用,在下文会讲到。
return kSpan;
}
}
//向系统申请内存
Span* span = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
span->_pageId = (PAGE_ID)ptr>>PAGE_SHIFT;
span->_n = NPAGES - 1;
_spanLists[span->_n].PushFront(span);
return NewSpan(k);
}对于使用内存块查找span我们封装一个函数MapObjectToSpan,对哈希表(_idSpanMap)的查找find,可能被多个线程同时访问出现错误,需要加锁。如:
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID index = ((PAGE_ID)obj >> PAGE_SHIFT);
PageCache::_pageMtx.lock();
auto ret = _idSpanMap.find(index);
PageCache::_pageMtx.unlock();
if (ret == _idSpanMap.end())
{
assert(false);
return nullptr;
}
else
{
return ret->second;
}
}接下来我们来实现ReleaseListToSpans(start, size),它的核心工作:
注意:找到哈希桶后需要加桶锁。
对于第3点,在把span送入PageCache之前,需要把span从CentralCache移除,并把span前后指针指向空,自由链表指向空。其次走到这一步意味着程序将离开CentralCache层进入PageCache层,在此之前把桶锁解了,方便让其他线程使用,然后加锁执行PageCache的内存回收函数,解锁,然后再次申请桶锁。
代码示例:
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
void* p = start;
//找到每个p所在Span并连接到自由链表
//加桶锁
int index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (p)
{
void* next = Nextobj(p);
Span* span = PageCache::GetInstance()->MapObjectToSpan(p);
Nextobj(p) = span->_freeList;
span->_freeList = p;
span->_useCount--;
if (span->_useCount == 0)
{
//移出Span链表
_spanLists->Erase(span);
span->_freeList = nullptr;
span->_next = span->_prev = nullptr;
//交给PageCache回收
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
p = next;
}
_spanLists[index]._mtx.unlock();
}内存块全部回收后送入PageCache的作用:
内存块回收到span后,它在自由链表中的排布已经不像初次从PageCache取出那样是有序的,而是混乱的。等span全部内存块回收后,就可以把它们看做一个整体,尽管内存块的排布是混乱的也没关系,这只是逻辑上的混乱,回到PageCache层后它就是一个大块页。等PageCache再次分页时,内存块次序恢复和虚拟地址一样,增加高速缓存命中率。
小节目标:ReleaseSpanToPageCache(span)的实现。

当PageCache拿到span后不要直接连接到哈希桶里,而把它前后能合并的页都合并在一起组成一个大块的页,然后再连接到哈希桶,这样可以缓解内存外碎片。
注意 是把它前后的页的span合并在一起,只有连续页的span才能合并,而不是前后桶的span,前后页的span也无法依据桶来找,要找到管理它前后页的span需要用哈希映射。
但又会带来一些问题,哈希映射得到的span可能是在CentralCache中,也有可能是在PageCache中,因为我们用的是同一个哈希映射,而合并span,指的是PageCache中的span,要如何解决?
这里我们选择在span中添加成员变量_isUse来判断span是否分配给了CentralCache,这样就可以把它们区分开。先暂时这样写,后期再做优化。比如一个span的_isUse初始状态为false,分配给CentralCache后设为true,PageCache的回收处理后设为false。
合并条件:
合并过程:
合并结果处理:
代码示例:
void PageCache::ReleaseSpanToPageCache(Span* span)
{
PAGE_ID n = span->_pageId;
//向前合并
while (true)
{
PAGE_ID prev = span->_pageId - 1;
auto ret = _idSpanMap.find(prev);
if (ret != _idSpanMap.end()&&!ret->second->_isUse)
{
if (span->_n + ret->second->_n > NPAGES - 1)
break;
span->_pageId = ret->second->_pageId;
span->_n += ret->second->_n;
_spanLists[ret->second->_n].Erase(ret->second);
_spanPool.Delete(ret->second);
}
else break;
}
//向后合并
while (true)
{
PAGE_ID next = span->_pageId + span->_n;
auto nextptr = _idSpanMap.find(next);
if (nextptr != _idSpanMap.end() && !nextptr->second->_isUse)
{
if (span->_n + nextptr->second->_n > NPAGES - 1)
break;
span->_n += nextptr->second->_n;
_spanLists[nextptr->second->_n].Erase(nextptr->second);
_spanPool.Delete(nextptr->second);
}
else break;
}
span->_isUse = false;
_spanLists[span->_n].PushFront(span);
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}我们完成了tcmalloc基础的内存管理功能,但还存在一个关键问题:未处理超过256KB的大内存申请。
下文将解决这个问题,并实现两个目标:
功能完善:
大块内存的申请
在ThreadCache中我们最大只能申请到256KB的内存,要申请大于256KB的内存,就要添加新的逻辑,不能到自由链表桶中申请。
怎么解决呢?很简单,既然在ThreadCache层解决不了,那么直接去PageCache的桶中获取Span就行,因为我们以8KB为一页,256KB也就是32页。只要在32页到128页内的空间申请都到PageCache桶中申请。
代码实现:在ConcurrentAlloc函数中添加一个size>256KB的分支,让它到PageCache中申请Span,注意这个过程需要进行内存对齐和加锁。内存对齐:因为现在是以页为单位申请,所以需要以1页为对齐数对齐。需要添加RoundUp函数逻辑,如下:

这里看上去8*1024和1<<PAGE_SHIFT(即1<<13)是一样的,但事实上8*1024字节是固定的,PAGE_SHIFT是根据需求改变的。
ConcurrentAlloc中添加的部分:
static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_BYTES)
{
PAGE_ID bytes = SizeClass::RoundUp(size);
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(bytes >> PAGE_SHIFT);
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
//正常逻辑
//......
}
}对于小于128页的内存申请NewSpan无需修改,如果用户需求是>128页呢?此时PageCache桶就无法满足,我们直接向系统申请即可,并且使用Span结构统一管理。如下NewSpan中添加的部分:
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
if (k > NPAGES - 1)
{
Span* kSpan = new Span;
void* ptr = SystemAlloc(k);
kSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
kSpan->_n = k;
_idSpanMap[kSpan->_pageId] = kSpan;
return kSpan;
}
//......
}大块内存的释放
大内存块不能被挂在ThreadCache桶中,需要单独写释放逻辑。即直接放到PageCache桶中。代码实现:在ConcurrentFree中添加size>256KB的分支,调用ReleaseSpanToPageCache,并且在此期间进行上锁。如下,添加的部分:
static void ConcurrentFree(void* ptr,size_t size)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
if (size > MAX_BYTES)
{
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
//正常逻辑
//......
}
}同样的对于小于128页的内存释放ReleaseSpanToPageCache无需修改,如果释放的内存是>128页呢?此时无法挂到PageCache桶中,我们直接让系统释放即可,并且删除Span结构。如下ReleaseSpanToPageCache中添加的部分:
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
delete span;
return;
}
//......
}优化释放

如上实现,用户在调用释放内存的接口时是需要手动传入内存大小size,这样很繁琐,而且容易出错。我们希望不需要用户传这个参数也能完成工作。
该如何做?可以从这个方向思考?
第1点基本上不用考虑了,因为我们的代码中已经充斥着大量的size的使用,要用其他方式来替代成本太大了。
对于第2点是有可行性的,因为在Span切割时已经确定了一个Span中切割的所有小块内存的大小都是一样的。而在上一期我们已经建立了页号与Span的映射表,一个地址(ptr)除以8KB就能得到它所在的页号,也就能得到它对于的Span。
也就是在Span中记录它切割的内存块大小即可。所以添加成员变量_objSize,在Span分配和切割时进行填写,这里就不展示。
接下来可以把size参数去掉,通过ptr找到Span然后找到_objSize。如下:
static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
if (span->_objSize > MAX_BYTES)
{
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, span->_objSize);
}
}测试代码:
void BigAlloc()
{
void* p1 = ConcurrentAlloc(257 * 1024);
cout << p1 << endl;
ConcurrentFree(p1);
void* p2 = ConcurrentAlloc(129 * 8 * 1024);
cout << p2 << endl;
ConcurrentFree(p2);
}测试结果:

定长内存池替代new
在程序中我们还有一些地方使用了malloc,比如申请Span时,申请ThreadCache时。
注:new的底层就是malloc。
我们希望把这些地方也脱离malloc,为什么呢?
因为Span和ThreadCache都是固定的内存大小,我们可以使用定长内存池来替代。在Common.h中添加头文件,并定义一个模板参数为Span的定长内存池。
#include "ObjectPool.h"
using namespace my_MemoryPool;
struct Span;
static FixedMemoryPool<Span> _spanPool;如果是VS2022可以通过Ctrl+f查找所有的new和delete申请释放的Span,把它们替换成 _spanPool.New()和_spanPool.Delete()。
在ThreadCache的申请中,定义一个模板参数为ThreadCache的定长内存池静态变量。然后替换new。如下:
static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_BYTES)
{
//......
}
else
{
if (pTLSThreadCache == nullptr)
{
static FixedMemoryPool<ThreadCache> tcPool;
pTLSThreadCache = tcPool.New();
}
return pTLSThreadCache->Allocate(size);
}
}性能测试
主要是与malloc做对比,这里这样设计,每轮申请和释放ntimes次,nworks个线程,执行rounds轮,然后计算它们要花费的时间。以下给出测试代码,不过这里大家大可不必纠结代码,我们直接来看结果。
性能测试代码:
#include"ConcurrentAlloc.h"
#include<cstdio>
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
free(v[i]);
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%zu个线程并发执行%zu轮次,每轮次malloc %zu次: 花费:%.2f ms\n",
nworks, rounds, ntimes, (double)malloc_costtime);
printf("%zu个线程并发执行%zu轮次,每轮次free %zu次: 花费:%.2f ms\n",
nworks, rounds, ntimes, (double)free_costtime);
printf("%zu个线程并发malloc&free %zu次,总计花费:%.2f ms\n",
nworks, nworks * rounds * ntimes, (double)(malloc_costtime + free_costtime));
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
t.join();
printf("%zu个线程并发执行%zu轮次,每轮次concurrent alloc %zu次: 花费:%.2f ms\n",
nworks, rounds, ntimes, (double)malloc_costtime);
printf("%zu个线程并发执行%zu轮次,每轮次concurrent dealloc %zu次: 花费:%.2f ms\n",
nworks, rounds, ntimes, (double)free_costtime);
printf("%zu个线程并发concurrent alloc&dealloc %zu次,总计花费:%.2f ms\n",
nworks, nworks * rounds * ntimes, (double)(malloc_costtime + free_costtime));
}
int main()
{
size_t n = 10000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}Debug下测试结果:
固定内存申请:

随机内存申请:

我们观察发现,tcmalloc总在释放的过程慢于malloc,要突破性能瓶颈我们就需要对tcmalloc的释放过程做优化。
性能分析
这里做性能分析主要是找到哪个函数消耗的时间长,然后针对性的进行修改。
在做性能分析时要保证是在debug下,并把向malloc申请内存的部分注释掉。然后做如下操作:
右键项目打开属性,然后做如下配置:

打开调试性能探查器

打开检测

打开详细信息

打开火焰图


我们可以观察到几乎60%以上的时间消耗都是来自函数MapObjectToSpan,而主要是来自内部锁竞争的消耗。在map中查找越慢,锁竞争越激烈。
接下来我们对MapObjectToSpan进行替代。
MapObjectToSpan是用来查找页号与Span的映射关系的,使用哈希桶unordered_map,STL中并未对数据结构并发访问问题进行处理,不是原子的。所以在find()查找过程会出问题,需要加锁来保护临界资源。
这里我们考虑换一种结构,使用基数树,基数树是一种通过压缩公共前缀路径优化存储的前缀树结构,支持高效的前缀匹配和键值查询。
这里我们设定三棵基数树,分别是一层,两层,三层。
只有一层的基数树本质就是数组(直接定址法的哈希表),页号到Span的映射,如下:
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//构造函数,给数组开空间
explicit TCMalloc_PageMap1() {
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
//通过页号得到Span
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
//添加页号与Span的映射
void set(Number k, void* v) {
array_[k] = v;
}
};
BITS给的是32-PAGE_SHIFT(32位下)或64-PAGE_SHIFT(64位下),表示存储页号的位数。
比如32位机器下8KB为一页,需要(2^32)/(2^13)=2^19个页号,即2^(32-PAGE_SHIFT)。
如果内存比较大,就需要开辟更大的数组,又因为很难一次性开出。所以把它拆除多块。即2层或3层
对于两层这样设定:

以32位为例,如上需要19位空间来储存页号,对于第一层给5位,也就是2^5=32个空间。然后把后14位给第二层。合计开辟(2^5)*(2^14)。与一层基数树开辟的空间相同。
拿到传来的页号,它是4字节的即32位,用前5位去匹配第一层并找到第二层的映射表,再用后面的位去匹配映射表找到对应的Span。

三层基数树的设定同理,不做讲解。
注意对于64位机器,(2^64)/(2^13)=2^53个页,要一次性开辟这么大的空间几乎是不可能的,所以必须使用三层,相当于把空间拆成小块开辟。而且等需要用到时再开辟,要不然会造成很多浪费。
以32位机器为例,把原来unordered_map结构改为TCMalloc_PageMap1,如下:

然后用Ctrl+f找到所有使用_idSpanMap的地方,进行修改。
再次进行性能测试查看火焰图:

它们的时间消耗的是差不多的,没有针对性的优化意义不是很大。
测试结果
Debug模式
固定内存申请

随机内存申请:

Release模式
固定内存申请:

随机内存申请:

当下 tcmalloc 的性能堪称一骑绝尘,对 malloc 形成了直接碾压,优势一目了然!
tcmalloc/Code · 敲上瘾/ConcurrentMemoryPool - 码云 - 开源中国
非常感谢您能耐心读完这篇文章。倘若您从中有所收获,还望多多支持呀!