对于上一篇,我们一定要联调自己的代码,理清楚申请内存的过程!!!😋
接下来,我们就完成之前剩下的回收内存的操作!!!
层层间的内存回收
当线程不再需要某个内存块时,会调用 ThreadCache::Deallocate
将内存块回收到线程缓存thead cache中。线程缓存根据内存块的大小将其放入对应的链表(桶)中。但是这样下去话就会一直在thread cache的桶下一直往后挂,这就会导致其他线程用不到内存,内存浪费的问题,所以如果某个桶的长度超过了预设的阈值,线程缓存会将多余的内存块批量返回到中央缓存。中央缓存会根据内存块的大小,将其放入全局的链表中,供其他线程使用。
//释放对象时,链表过长时,回收内存回到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
//只能说长度太长了,需要拿出一些批量,并不是全部拿出来!!!
void* start = nullptr;
void* end = nullptr;
//去取一些内存对象出来
list.PopRange(start, end, list.MaxSize());
//找到对应的位置的Span,还给对应的Span
CentralCache::GetInstance()->ReleaseListToSpans(start, size);//不需要end,因为一段链条的尾部是nullptr,后面走到nullptr就可以判定了
}
接下来,thread cache已经取了一串的内存块传递给下一层central cache,因为这一串链并不一定是只属于一个特定的Span,我们反正一定能让这一串内存对象够找到自己所处的桶,是几字节,就是根据对应规则的哪一个桶,这也是我们为什么 ReleaseListToSpans(start, size); 需要传size参数,是spans,不是span!!!
传参进来的链表中的小对象块是属于哪一个Span呢???这都是不确定的,那么我们这么知道哪一个小对象内存块是属于哪一个Span呢??这就是比较麻烦的了!!!
这些小块内存一定是由一个页切出来的,假设我们的这一条链表是由两个页切出来的,2000页和2001页:
- 2000页的起始地址:2000*8*1024=FA0000
- 2001页的起始地址:2001*8*1024=FA2000
那么每一个小内存块对象是这么知道自己是属于哪一个页呢?其实是很简单的,3号和4号的地址就是在FA0000~FA2000之间的,其实3号和4号的地址去整除8KB就是FA0000去整除8K的值!!!我们就可以根据这个高效的对应关系得到内存对象和对应页的映射关系!!!
虽然我们从thread cache还回来的小对象内存块的链表,每一个对象的所在Span我们不好知道,但是我们可以知道每一个内存对象所在的对应的页!!!
一个比较暴力的方式:就是遍历所在桶位的所有Span,因为Span都记录着起始页号,是第几页的,这样我们就可以知道这些内存对象是属于哪一个Span了;
这种方式有点暴力了,是一种的算法了, 我们可以做一个映射来比较高效的解决,还有不仅仅是central cache需要,后面page cache的合并也是需要的,我们就可以在page cache里面做映射---unordered_map!page cache分配给central cache的Span都记录一下!也就是在New Span的时候记录一下PAGE_ID和Span*的映射关系!
后面,我们来的问题就是这么判断一个span的全部小块内存对象都回来了呢?其实我们之前就已经给出了答案,就是根据useCount是否为0来判断这个span中的之前切分出去的小块内存对象是否回来!都回来的时候,这个span就可以继续往下一层进行归还了!
//PageCache.cpp
//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
//CentralCache.cpp
//将一定数量的对象释放到spans当中,不是span!!!
//因为归还回来的
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
//进来这一块,什么都不要想,直接将桶锁给加上,加锁---解锁
_spanLists[index]._mtx.lock();
//那么这时候,传参进来的链表中的小对象块是属于哪一个Span呢???
//这都是不确定的,那么我们这么知道哪一个小对象内存块是属于哪一个Span呢??
//这就是比较麻烦的了!!!
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//归还:头插回去
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
if (span->_useCount == 0)
{
//说明该span切分出去的所有小块内存对象都回来了
//说明这个span就可以再回收给page cache了,page cache可以再尝试去做前后页的合并了
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
//页号和页数是不能置空的:我们就是用页号就可以充当成Span的起始地址了,页数就是可以体现这个Span的大小了
//释放span给page cache时,使用page cache的锁就可以了
//这时把桶锁解掉,不解的话就会导致其他线程要在这个桶位申请/释放内存,就会被阻塞住,你去干你的ReleaseSpanToPageCache(span);还要来占着茅坑不拉屎吗?
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
为了解决内存外碎片的问题,假设这个返回个page cache的Span是一页,两页的Span,假设是1页的,我们就可以直接将其挂到1page对应的哈希桶的SpanList中,不过这样就会导致这种问题:
假设现在都是零零散散的挂着,都是10页之内的,现在想要一个10页以上的内存,如果10页之内的内存块是连续的,就可以拼一下,但是不连续的话,10页的就直接要不了,就需要去堆上申请,10页之内的些span内存对象又碎片化的散在其中,所以我们应该解决一下这种外碎片问题;内碎片这种问题我们既然选择少数量的桶就代表着我们解决不了,但是内碎片问题现在已经无所谓了,第一次申请6字节,给出8字节,反正还会回来,回来可以给要8字节的用,只是暂时的!但是外碎片问题如果不解决就会一直存在!😭
所以span回收回来了第一步并不是直接挂到对应的桶下,而是对span前后的页,尝试进行合并,缓解内存碎片问题,那么怎么解决呢?
假设还回来的Span的pageid是1000,n是1,那么就可以往前往后查找相邻的页:看999页是不是空闲的,空闲的话还需要再往前走,直到不能合并;往后去查找1001页,一样的操作;
假设还回来的Span的pageid是1000,n是5,也是往前找999,依上操作往前,往后就是找1001了,而是找1005(0,1,2,3,4,5),依上操作往后合并!
我们就需要有办法找到前后页的Span进行合并,我们就需要一个id到span的映射,这不就是上面的unordered_map的作用吗!
这里还有一个问题,要小心的!查找前后相邻的span要知道能不能合并,有没有达到合并的要求呢?
- 如果找到的这个Span只要是挂在page cache里的,都可以合并!!!
- 在Central Cache的Span还在用呢?可不能拿来合并!!!
那么我们怎么识别该Span是在Central Cache里,还是在Page Cache当中呢?
我们不是有一个useCount吗,在Central Cache里的不是0,在Page Cache的是0,可是这是不行的!!!是有问题的,存在一个间隙,这个间隙就是在Central Cache在给Span切分的时候,其useCount正好是0!这个时候刚好合并找到该Span,这不就炸了吗?所以可千万不要使用useCount来判断!!!这会带来线程安全的问题!!!
很简单的解决方式---在Span类中加一个变量:_isUse:看有没有人在用!
//PageCache.cpp
//释放空间span回到page cache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//对span前后的页,尝试进行合并,缓解内存碎片问题
//现在是我去合并被人,不着急设置其_isUse为false,防止被别人给合并了,那就很不好了
//对前页进行合并
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
if (ret == _idSpanMap.end())
{
break;
}
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)
{
break;
}
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);//后面要delete,待会变成野指针,被别人访问到可不好!
delete prevSpan;
}
//对后页进行合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n] = span;
}
其实现在已经是OK的了,后面就是一些优化了,我们已经可以连调整体代码的申请释放逻辑了!
代码部分
Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
#include<unordered_map>
#ifdef _WIN32
#include<windows.h>
#else
//...
#endif
//方便,不使用using namespace std;是因为防止污染
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;//前两层的哈希桶的数量
static const size_t NPAGES = 129;//pagecache层的哈希桶的数量//数量是128,但是为了对应映射关系,因为数组的下标是从0开始的
static const size_t PAGE_SHIFT = 13;//假设一页是8K,这是进行页的转化的参数
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
//Linux
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
// 管理切分好的小对象的自由链表
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
// 头插
//*(void**)obj = _freeList;
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
//支持给一个范围,插入到自由链表---头插
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
//支持给一个范围,从自由链表中弹出(从前面取出,弹出)
void PopRange(void*& start, void*& end, size_t n)
{
assert(n <= _size);
start = _freeList;
for(size_t i = 0; i < n - 1; i++)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* Pop()
{
assert(_freeList);
// 头删
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;//这个要写,因为我们没有写构造
size_t _maxSize = 1;//慢调节算法的要点之一
size_t _size = 0;
};
// 计算对象大小的对其映射规则
class SizeClass
{
public:
// 提供函数来计算给一个字节数,对应到正确的桶位置;
// 规则如下:
// 整体控制在最多10%左右的内碎片浪费!!!
// ***************************************************************************************************
// ***************************************************************************************************
// [1,128] 8byte对齐 freelist[0,16) 这个没办法>^<
// [128+1,1024] 16byte对齐 freelist[16,72) 15/(129+15)=0.10....
// [1024+1,8*1024] 128byte对齐 freelist[72,128) 127/(1025+127)=0.11....
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184) //......
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) //......
// ***************************************************************************************************
// ***************************************************************************************************
//相当于RoundUp的子函数:给定当前size大小和对应规则的对齐数AlignNum,用于处理
//static inline size_t _RoundUp(size_t size, size_t alignNum)
//{
// size_t alignSize;
// if (size % alignNum != 0)
// {
// alignSize = (size / alignNum + 1) * alignNum;
// }
// else//等于0就不需要处理了,已经对齐了
// {
// alignSize = alignNum;
// }
//}
//上面的是我们普通人玩出来的,下面我们来看看高手是怎么玩的!
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
//你给我一个size,就需要算出对其以后是多少 --- 比如说:8->8 7->8
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
//说明大于256KB了,那么就有点问题了
assert(false);
return -1;
}
}
//一样的,我们来自己写写看,待会换成别人的刚好的方式
//size_t _Index(size_t bytes, size_t alignNum)
//{
// if (bytes % alignNum == 0)
// {
// return bytes / alignNum - 1;
// }
// else
// {
// return bytes / alignNum;
// }
//}
//高手的想法:
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128) {
return _Index(bytes, 3);
}
else if (bytes <= 1024) {
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
}
return -1;
}
// 一次thread cache从中心缓存获取多少个
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 小对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
};
//管理多个连续页的大块内存跨度的结构
struct Span
{
PAGE_ID _pageId = 0;//页号:大块内存的起始页的页号
size_t _n = 0;//页数
//双向链表
Span* _next = nullptr;
Span* _prev = nullptr;
size_t _useCount = 0;//切好的小块内存,被分配给thread cache的计数
void* _freeList = nullptr;//用于管理切好的小块内存的自由列表
bool _isUse = false;//是否在被使用
};
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//这里我们没必要实现一个迭代器,我们利用带头双向循环链表的特性:
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next == _head;
}
//插入Span:头插
void PushFront(Span* span)
{
Insert(Begin(), span);
}
//弹出一个Span
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
//prev newSpan pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx;//桶锁
};
ThreadCache.h
#pragma once
#include"Common.h"
class ThreadCache
{
public:
// 申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
//释放对象时,链表过长时,回收内存回到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[NFREELIST];//_freeLists 是一个数组,用于存储多个 FreeList 类型的对象。
};
//线程隔离 :每个线程对 pTLSThreadCache 的访问都是独立的,一个线程对其的修改不会影响其他线程中的该变量值。
//避免锁竞争 :由于每个线程都有自己的变量副本,避免了多线程环境下因共享变量而导致的锁竞争问题,提高了程序的并发性能。
//方便线程专用数据管理 :适合存储线程专用的资源或状态信息,例如每个线程的缓存、配置等。
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
//实际中,不可能让每一个线程自己来获取ThreadCache*这个对象,我们还需要再封装一层——————》ConcurrentAlloc.h
//也就是说:一个线程起来了,并不是马上就有pTLSThreadCache了,而是需要去调用相关的函数
ThraedCache.cpp
#include"ThreadCache.h"
#include"CentralCache.h"
//声明与实现分离
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始反馈调节算法:整体控制达到双重目的:
//1.最开始不会一次向central cache一次批量要太多,因为要太多可能用不完
//2.如果你不要size大小的需求,那么batchNum就会不断增长,知道上限(对象小上限高,对象大上限小)
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;//慢增长的过程!!!
}
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);//并没有说我们想要多少他这个Span里就有多少,我们要先得到一个实际的!!!
assert(actualNum > 0);//你能给多少我就用多少吧,但是起码也是要有一个的吧!!!
if (actualNum == 1)
{
assert(start == end);
return start;
}
else
{
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
return start;
}
}
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
//并不是所有要求申请的字节数都有完全对应的一个桶位,是1字节,7字节都是放在对应的8字节桶中,那么我们就需要实现一个对应的对其映射的规则:我们在Common.h中实现
size_t alignSize = SizeClass::RoundUp(size);//对齐数
size_t index = SizeClass::Index(alignSize);//计算出了对齐数,那么又是对应的哪一个桶呢?
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (!_freeLists[index].Empty())
{
return _freeLists->Pop();
}
else//这个桶下面的自由链表为空,为空只能向下一层去要了
{
return FetchFromCentralCache(index, alignSize);
}
}
void ThreadCache::Deallocate(void* ptr, size_t size)//free只需要传入对应要free空间的指针就可以了,但是为了知道放入到对应的正确的桶位置,需要size参数来定位桶的位置
{
assert(ptr);
assert(size <= MAX_BYTES);
//找出对应的自由链表的桶,插入进去
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//太长了的话,需要还给CentralCache ,CentralCache 满足某些条件有会还给下一层PageCache
//还回逻辑我们需要将所有的申请逻辑理清才好理解!!!
//我们简化一些tcmalloc的回收内存块对象的做法,他是考虑内存空间和桶的长度的两者条件
//我们自己实现的话就按照一条要求叭!
//当链表长度大于一次批量申请的内存时就开始还一段list给central cache
//tcmalloc细节比较足!!!我们这是取舍的,拿出最重要的,最核心的,我们要是以学习的态度!!!目的是学习!!!
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
//释放对象时,链表过长时,回收内存回到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
//只能说长度太长了,需要拿出一些批量,并不是全部拿出来!!!
void* start = nullptr;
void* end = nullptr;
//去取一些内存对象出来
list.PopRange(start, end, list.MaxSize());
//找到对应的位置的Span,还给对应的Span
CentralCache::GetInstance()->ReleaseListToSpans(start, size);//不需要end,因为一段链条的尾部是nullptr,后面走到nullptr就可以判定了
}
CentralCache.h
#pragma once
#include "Common.h"
// 单例模式
class CentralCache
{
public:
//小对象是由大对象切出来的,那么这个大对象又是多大呢?
//CentralCache里面定义了一个管理大块内存对象的结构---Span
//但是我们不设计到类内中,因为还涉及到一些其他的问题,我们实现过程中自然就会理解:因为不仅仅要给CentralCache使用,还要给下一层PageCache使用!!!
//我们需要搞定桶锁--->我们只需要将锁定义在SpanList中就可以了!!!
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取一个非空的span,因为要从中心缓存获取一定数量的对象给thread cache,我们需要找到一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 从中心缓存获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);//拿batchNum个size大小的对象,前两个为输出型参数
//将一定数量的对象释放到span当中
void ReleaseListToSpans(void* start, size_t size);
private:
SpanList _spanLists[NFREELIST];//和thread cache一样的,208个桶!
private:
CentralCache()
{
}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;//单例模式:懒汉模式
};
CentralCacge.cpp
#include"CentralCache.h"
#include"PageCache.h"
CentralCache CentralCache::_sInst;
//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
//需要遍历我们的SpanList,如果有就给,没有就需要向下一层了
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
//不为空,说明下面挂着有对象
return it;
}
else
{
//it++;我们没有封装迭代器,不要使用++,但是也是很简单的
it = it->_next;
}
}
//先把central cache的桶锁解掉,如果其他线程释放内存对象回来,不会阻塞 --- 提高效率
list._mtx.unlock();
//走到这说明已经没有空闲的Span了,只能找下一层page cache要了
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
PageCache::GetInstance()->_pageMtx.unlock();
//这里加锁和解锁的锁是同一个锁,即 PageCache 单例对象的 _pageMtx 锁,所以不存在解锁解的不是上次加锁的的情况。
//因为 PageCache::GetInstance() 返回的是同一个 PageCache 类的实例,所以 _pageMtx 是同一个互斥锁对象。
//锁匹配的本质:锁匹配的核心是加锁和解锁操作必须作用于同一个锁对象。只要满足这一点,加锁和解锁就是匹配的。
//单例模式的作用:单例模式在这里的作用是确保 _pageMtx 的唯一性,即每次调用 PageCache::GetInstance() 都返回同一个 PageCache 对象,从而保证 _pageMtx 是同一个锁对象。
//因果关系:单例模式是通过确保锁对象的唯一性,间接保证了加锁和解锁操作的匹配性。但锁匹配的直接原因是加锁和解锁操作作用于同一个锁对象。
//对获取的Span进行切分是不需要加锁的:因为其他线程已经拿不到这个Span了
//我们可以通过页号,计算页的起始地址:页号<<PAGE_SHIFT(0*8K,1*8K,.......}
char* start = (char*)(span->_pageId << PAGE_SHIFT);//算到起始地址
//要插入到指定桶下,但是要先切
size_t bytes = span->_n << PAGE_SHIFT;//计算大块内存的字节数大小
//切!!!将大块内存切成自由链表链接起来
char* end = start + bytes;
//切好了进行尾插比较好:因为这样就会分割的地址是连续链接的
//1.先切一块下来去做头,方便尾插
span->_freeList = start;
start += size;
void* tail = span->_freeList;
while (start < end)
{
NextObj(tail) = start;
tail = NextObj(tail);//tail = start;
start += size;
}
//恢复加锁:切好之后,需要将Span挂到桶里面去的时候,再加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
//重中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)//拿batchNum个size大小的对象,前两个为输出型参数
{
size_t index = SizeClass::Index(size);//获取对应的哈希桶的位置
//因为多个线程可能同时竞争同一把桶锁,我们需要加锁
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
//从span中获取batchNum个对象
//如果不够batchNum个,有多少取多少
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;//注意!!!这个和我们的下面的end的走的逻辑要对应上,不可以取成0!(既然申请到了非空的Span,那么该Span下也就起码有一个对象)
//这里我们需要注意我们真的可以申请这么多吗,这个非空的Span真的有这么多吗
while (i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
++i;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
span->_useCount += actualNum;
_spanLists[index]._mtx.unlock();//加锁就需要解锁,那么这时候也应该考虑死锁的问题了
return actualNum;
}
//将一定数量的对象释放到spans当中,不是span!!!
//因为归还回来的
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
//进来这一块,什么都不要想,直接将桶锁给加上,加锁---解锁
_spanLists[index]._mtx.lock();
//那么这时候,传参进来的链表中的小对象块是属于哪一个Span呢???
//这都是不确定的,那么我们这么知道哪一个小对象内存块是属于哪一个Span呢??
//这就是比较麻烦的了!!!
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//归还:头插回去
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
if (span->_useCount == 0)
{
//说明该span切分出去的所有小块内存对象都回来了
//说明这个span就可以再回收给page cache了,page cache可以再尝试去做前后页的合并了
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
//页号和页数是不能置空的:我们就是用页号就可以充当成Span的起始地址了,页数就是可以体现这个Span的大小了
//释放span给page cache时,使用page cache的锁就可以了
//这时把桶锁解掉,不解的话就会导致其他线程要在这个桶位申请/释放内存,就会被阻塞住,你去干你的ReleaseSpanToPageCache(span);还要来占着茅坑不拉屎吗?
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
PageCache.h
#pragma once
#include"Common.h"
//注意这里的哈希桶的映射关系不是上两层的规则了,但是是更简单的直接定址法!!!
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
//获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
//释放空间span回到page cache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
//获取一个k页的Span
Span* NewSpan(size_t k);
std::mutex _pageMtx;//全局的锁,锁住整体
private:
SpanList _spanLists[NPAGES];//128个桶,每个桶下挂的是Span!!!
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;//单例模式 --- 饿汉模式
};
PageCache.cpp
#include"PageCache.h"
PageCache PageCache::_sInst;
//获取一个k页的Span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
//先检查第k个桶里面有没有Span
if (!_spanLists[k].Empty())
{
//有就弹一个出去使用
return _spanLists->PopFront();
}
//说明第k个桶式空的,需要往下检查后面的桶里面有没有Span,如果有,可以进行切分使用
for (size_t i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
//开切:拿下-切割-分配->利用+挂起(k,n-k)
//k页的Span返回给CentralCache
//n-k页的Span挂到n-k的桶的位置下
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切一个k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);//挂起
//存储(n-k)span的首位页号跟(n-k)span的映射,没必要像下面的给Central Cache那么复杂,因为我们只需要找前后页,下面主要是为了从Central Cache中回收Span
//方便page cache回收内存的时候,进行合并查找
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
//建立id和span的映射,方便central cache的回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//走到这个位置就说明后面没有大页的Span了
//这时候就需要去找堆要一个128KB的Span了
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);//到这还需要像上面一样进行切割分配,我们可以使用挂起+递归一次就很明智的解决了
//不过我们是需要加锁的,一些普通的锁就不行了,因为递归,会产生死锁,我们可以使用递归锁!!!recursive_mutex
//我们也可以分理出来一个_NewSpan来解决
//递归调用自己慢了一点,但是计算机太快了,所以这个完全不是事,我们也当然可以选择一次就直接进行切割分配
//自我认为复用更加重要
return NewSpan(k);
}
//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
//释放空间span回到page cache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//对span前后的页,尝试进行合并,缓解内存碎片问题
//现在是我去合并被人,不着急设置其_isUse为false,防止被别人给合并了,那就很不好了
//对前页进行合并
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
if (ret == _idSpanMap.end())
{
break;
}
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)
{
break;
}
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);//后面要delete,待会变成野指针,被别人访问到可不好!
delete prevSpan;
}
//对后页进行合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n] = span;
}
ConcurrentAlloc.h
#pragma once
#include"Common.h"
#include"ThreadCache.h"
//搞成全局静态,不然包在多个.cpp当中的话,静态的保持在当前文件可见,否则全局的不加上静态,其链接属性就会冲突(因为一个.h会形成一个obj,所包含的可能最终会导致冲突!)
static void* ConcurrentAlloc(size_t size)
{
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
static void ConcurrentFree(void* ptr, size_t size)//我们传入size参数的问题更后面整体联系起来了再解决!!!
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
下一篇更精彩哦😝