admin管理员组

文章数量:1030267

使用基数树优化高并发内存池(替代加锁访问的哈希表和红黑树)

前言:

本篇旨在熟悉 基于tcmalloc的高性能并发内存池项目之后,对于最后的优化进行的笔记梳理,项目完整代码 可点击:项目代码 进行查看。

优化思想风暴:

  1. 为了方便根据页号查找到对应的span, 这里我们可以使用红黑树或者哈希表这样一种结构,来存储他们的映射关系,如下:

  1. 比如: 在下面这个地方,需要根据对齐字节数,查找对应的span,以便于回收的时候使用span里面的_objsize,或者需要根据页号查找前后页的span, 此时都需要访问这个哈希表。

  1. 所以我们在创建span的时候就应该注入此信息,如下:

不管把此span所涉及的每一个页号都映射(给central cache用),还是只映射首位页(给pagecache cache合并大页的内存用),根据不同的场景,我们根据页号都能查找到对应的span。

当然, 我们在合并完大页的span之后,对于页号已经发生了改变,我们也需要进行及时的修订。


  1. 所以自然,我们在不同线程进行修改这个span的时候,调用此函数之前,需要加锁一把锁,我们放在page cache的成员里面

这里进行修改的地方,我们都需要加上锁,以防止多线程访问,引发的线程安全问题,当然,我们在读取数据的时候,也需要加上锁,如下:

看清楚!,这里我们加的还是那把pagecache成员变量的那把锁,只不过这把锁具有局部变量的特性,出了函数作用域会自动释放。


  1. 那么至此,我们可以此代码的性能测试,根据VS下的性能探测器,我们发现MapObjectToSpan非常的消耗时间,主要原因在于unique_lock导致线程阻塞,耗费大量时间,那么我们替换掉这个锁,在读取的时候进行无锁访问,至此,我们就引入了今天的主角(基数树

简单了解一下基数树的代码实现:

下面这部分代码,对基数树进行了简化,主要有三种基数树,区别就是分别进行了一层,两层,三层的基数树实现,代码下面,博主自行画了一个草图,可以大致了解。

代码语言:javascript代码运行次数:0运行复制
#pragma once

//基数树
#include "Common.h"


//一层
template<int BITS>
class TCMalloc_PageMap1
{
private:
	static const int LENGTH = 1 << BITS; //1<<19
	void** array_;
public:
	typedef uintptr_t Number;
	//只能显示调用的构造函数
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
		size_t size = sizeof(void*) << BITS; 
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);  //1<<13 8KB
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //2^19 
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
		array_[k] = v;
	}
};


// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};


//三层
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};

对于一层的基数树,其实就是一个哈希表,也可以理解为数组指针,一共有2^19个格子,存放32位下以8KB为单位的所有页号,它是在一开始就已经开辟好了内存

对于二层的基数树,原理如下,三层类比二层,例如再增加后几位作为第三层的下标,这里需要某个范围的页号对应的span时,会2^14次方一次,这样缓慢的开辟好,需要用到的范围就开辟,缓解了系统申请大块内存的负担,当然也是提前已经根据你需要的页号映射范围提前开辟好的。


我们可以把哈希表或者红黑树替换为这样一种基数树的结构,原因如下:

首先基数树的结构是固定的,在其他线程写入其他位置的时候,不会改变它的结构,而哈希表(可能会动态的扩容),红黑树(在修改节点时会进行旋转而改变结构)在写入其他位置的时候会改变结构,那么其他线程在读取的时候,可能就会访问到NULL等其他情况。

其次,读取的时候,我们需要根据页号访问到对应的span, 此时的span一定是已经被new出来之后,把页号和span*进行映射,所以我们访问时的那个span,一定时已经存在的,不会涉及正在读取此span,又被其他线程所修改,按照本项目访问所涉及到的代码进行讲解:

第一处,访问页号对应的span*,是为了替代掉释放时还需要传递size的情况,此时的span已经被new出来了,那么它所对应的页号早都已经映射上了,所以不会对此位置进行修改,读取和写入是分离的。

第二处,此时是需要根据小块内存地址,求出页号,根据页号找到对应的span, 进行内存的回收的,那么此时的span, 他也一定是被创建出来了以后,页号和span已经经过了映射,所以访问和写入不会同时发生,此时也是分离的。


此时再看两处涉及到修改基数树的位置,第一处就是初始化span的时候,此时只能有一个线程进行写入,此span的状态还是false, 还未被初始化,当它被创建好时,才会分配给centralcache,此时才能被使用,才会涉及到调用此函数获取objectsize,或者回收小块内存的情况,所以此时刻读写分离。

第二处同理,都是发生在pagecache中,此时是不再使用了,所以也不会涉及到上述两种访问的情况,所以此刻的读写也是分离的。

综上,同一个span之间, 访问和写入是互相分离的,而在不同的span之间,写入的时候使用红黑树或者哈希表可能会影响到其他的span, 会修改结构,但是基数树可以完美解决,故此,我们可以把unique这把锁移除删除掉。


此刻我们再次运行代码:

前:

后:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-04-16,如有侵权请联系 cloudcommunity@tencent 删除高并发函数内存线程优化

使用基数树优化高并发内存池(替代加锁访问的哈希表和红黑树)

前言:

本篇旨在熟悉 基于tcmalloc的高性能并发内存池项目之后,对于最后的优化进行的笔记梳理,项目完整代码 可点击:项目代码 进行查看。

优化思想风暴:

  1. 为了方便根据页号查找到对应的span, 这里我们可以使用红黑树或者哈希表这样一种结构,来存储他们的映射关系,如下:

  1. 比如: 在下面这个地方,需要根据对齐字节数,查找对应的span,以便于回收的时候使用span里面的_objsize,或者需要根据页号查找前后页的span, 此时都需要访问这个哈希表。

  1. 所以我们在创建span的时候就应该注入此信息,如下:

不管把此span所涉及的每一个页号都映射(给central cache用),还是只映射首位页(给pagecache cache合并大页的内存用),根据不同的场景,我们根据页号都能查找到对应的span。

当然, 我们在合并完大页的span之后,对于页号已经发生了改变,我们也需要进行及时的修订。


  1. 所以自然,我们在不同线程进行修改这个span的时候,调用此函数之前,需要加锁一把锁,我们放在page cache的成员里面

这里进行修改的地方,我们都需要加上锁,以防止多线程访问,引发的线程安全问题,当然,我们在读取数据的时候,也需要加上锁,如下:

看清楚!,这里我们加的还是那把pagecache成员变量的那把锁,只不过这把锁具有局部变量的特性,出了函数作用域会自动释放。


  1. 那么至此,我们可以此代码的性能测试,根据VS下的性能探测器,我们发现MapObjectToSpan非常的消耗时间,主要原因在于unique_lock导致线程阻塞,耗费大量时间,那么我们替换掉这个锁,在读取的时候进行无锁访问,至此,我们就引入了今天的主角(基数树

简单了解一下基数树的代码实现:

下面这部分代码,对基数树进行了简化,主要有三种基数树,区别就是分别进行了一层,两层,三层的基数树实现,代码下面,博主自行画了一个草图,可以大致了解。

代码语言:javascript代码运行次数:0运行复制
#pragma once

//基数树
#include "Common.h"


//一层
template<int BITS>
class TCMalloc_PageMap1
{
private:
	static const int LENGTH = 1 << BITS; //1<<19
	void** array_;
public:
	typedef uintptr_t Number;
	//只能显示调用的构造函数
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
		size_t size = sizeof(void*) << BITS; 
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);  //1<<13 8KB
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //2^19 
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
		array_[k] = v;
	}
};


// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};


//三层
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};

对于一层的基数树,其实就是一个哈希表,也可以理解为数组指针,一共有2^19个格子,存放32位下以8KB为单位的所有页号,它是在一开始就已经开辟好了内存

对于二层的基数树,原理如下,三层类比二层,例如再增加后几位作为第三层的下标,这里需要某个范围的页号对应的span时,会2^14次方一次,这样缓慢的开辟好,需要用到的范围就开辟,缓解了系统申请大块内存的负担,当然也是提前已经根据你需要的页号映射范围提前开辟好的。


我们可以把哈希表或者红黑树替换为这样一种基数树的结构,原因如下:

首先基数树的结构是固定的,在其他线程写入其他位置的时候,不会改变它的结构,而哈希表(可能会动态的扩容),红黑树(在修改节点时会进行旋转而改变结构)在写入其他位置的时候会改变结构,那么其他线程在读取的时候,可能就会访问到NULL等其他情况。

其次,读取的时候,我们需要根据页号访问到对应的span, 此时的span一定是已经被new出来之后,把页号和span*进行映射,所以我们访问时的那个span,一定时已经存在的,不会涉及正在读取此span,又被其他线程所修改,按照本项目访问所涉及到的代码进行讲解:

第一处,访问页号对应的span*,是为了替代掉释放时还需要传递size的情况,此时的span已经被new出来了,那么它所对应的页号早都已经映射上了,所以不会对此位置进行修改,读取和写入是分离的。

第二处,此时是需要根据小块内存地址,求出页号,根据页号找到对应的span, 进行内存的回收的,那么此时的span, 他也一定是被创建出来了以后,页号和span已经经过了映射,所以访问和写入不会同时发生,此时也是分离的。


此时再看两处涉及到修改基数树的位置,第一处就是初始化span的时候,此时只能有一个线程进行写入,此span的状态还是false, 还未被初始化,当它被创建好时,才会分配给centralcache,此时才能被使用,才会涉及到调用此函数获取objectsize,或者回收小块内存的情况,所以此时刻读写分离。

第二处同理,都是发生在pagecache中,此时是不再使用了,所以也不会涉及到上述两种访问的情况,所以此刻的读写也是分离的。

综上,同一个span之间, 访问和写入是互相分离的,而在不同的span之间,写入的时候使用红黑树或者哈希表可能会影响到其他的span, 会修改结构,但是基数树可以完美解决,故此,我们可以把unique这把锁移除删除掉。


此刻我们再次运行代码:

前:

后:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-04-16,如有侵权请联系 cloudcommunity@tencent 删除高并发函数内存线程优化

本文标签: 使用基数树优化高并发内存池(替代加锁访问的哈希表和红黑树)