C++并发编程:无锁并发队列(MoodyCamel)核心类

【链接】GitHub - cameron314/concurrentqueue: A fast multi-producer, multi-consumer lock-free concurrent queue for C++11

        moodycamel 的并发队列(ConcurrentQueue)实现,它是一个高性能的多生产者多消费者(MPMC)无锁队列。以下是代码中几个主要类的功能及其关系:


1. ConcurrentQueue

template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class ConcurrentQueue
{
public:
	// Creates a queue with at least `capacity` element slots; note that the
	explicit ConcurrentQueue(size_t capacity = 32 * BLOCK_SIZE)
		: producerListTail(nullptr),
		producerCount(0),
		initialBlockPoolIndex(0),
		nextExplicitConsumerId(0),
		globalExplicitConsumerOffset(0)
	{
        ///...
	}
	
	ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
		: producerListTail(nullptr),
		producerCount(0),
		initialBlockPoolIndex(0),
		nextExplicitConsumerId(0),
		globalExplicitConsumerOffset(0)
	{
        ///...
	}
	
	~ConcurrentQueue()
	{
		///...
	}

	inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
	{
		swap_internal(other);
	}
		
public:
	inline bool enqueue(T const& item)
	{
		MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
		else return inner_enqueue<CanAlloc>(item);
	}	
	inline bool enqueue(T&& item)
	{
		MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
		else return inner_enqueue<CanAlloc>(std::move(item));
	}	
	inline bool enqueue(producer_token_t const& token, T const& item)
	{
		return inner_enqueue<CanAlloc>(token, item);
	}	
	inline bool enqueue(producer_token_t const& token, T&& item)
	{
		return inner_enqueue<CanAlloc>(token, std::move(item));
	}
	
	template<typename It>
	bool enqueue_bulk(It itemFirst, size_t count)
	{
		MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
		else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
	}
	
	template<typename It>
	bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
	{
		return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
	}
	
	inline bool try_enqueue(T const& item)
	{
		MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
		else return inner_enqueue<CannotAlloc>(item);
	}	
	inline bool try_enqueue(T&& item)
	{
		MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
		else return inner_enqueue<CannotAlloc>(std::move(item));
	}	
	inline bool try_enqueue(producer_token_t const& token, T const& item)
	{
		return inner_enqueue<CannotAlloc>(token, item);
	}	
	inline bool try_enqueue(producer_token_t const& token, T&& item)
	{
		return inner_enqueue<CannotAlloc>(token, std::move(item));
	}
	
	template<typename It>
	bool try_enqueue_bulk(It itemFirst, size_t count)
	{
		MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
		else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
	}
	
	template<typename It>
	bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
	{
		return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
	}
	
	template<typename U>
	bool try_dequeue(U& item)
	{
		///...
	}

	template<typename U>
	bool try_dequeue_non_interleaved(U& item)
	{
		for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
			if (ptr->dequeue(item)) {
				return true;
			}
		}
		return false;
	}
	
	template<typename U>
	bool try_dequeue(consumer_token_t& token, U& item)
	{
        ///...
	}
	
	template<typename It>
	size_t try_dequeue_bulk(It itemFirst, size_t max)
	{
		size_t count = 0;
		for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
			count += ptr->dequeue_bulk(itemFirst, max - count);
			if (count == max) {
				break;
			}
		}
		return count;
	}
	
	template<typename It>
	size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
	{
		///...
	}
		
	template<typename U>
	inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
	{
		return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
	}
	
	template<typename It>
	inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
	{
		return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
	}

	size_t size_approx() const
	{
		size_t size = 0;
		for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
			size += ptr->size_approx();
		}
		return size;
	}
	
	static constexpr bool is_lock_free()
	{
	}

private:
	friend struct ProducerToken;
	friend struct ConsumerToken;
	struct ExplicitProducer;
	friend struct ExplicitProducer;
	struct ImplicitProducer;
	friend struct ImplicitProducer;
	friend class ConcurrentQueueTests;		
	enum AllocationMode { CanAlloc, CannotAlloc };
	
	std::atomic<ProducerBase*> producerListTail;
	std::atomic<std::uint32_t> producerCount;
	
	std::atomic<size_t> initialBlockPoolIndex;
	Block* initialBlockPool;
	size_t initialBlockPoolSize;	
	FreeList<Block> freeList;
	std::atomic<ImplicitProducerHash*> implicitProducerHash;
	std::atomic<size_t> implicitProducerHashCount;		// Number of slots logically used
	ImplicitProducerHash initialImplicitProducerHash;
	std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
	std::atomic_flag implicitProducerHashResizeInProgress;	
	std::atomic<std::uint32_t> nextExplicitConsumerId;
	std::atomic<std::uint32_t> globalExplicitConsumerOffset;	
};
  • 功能:提供多生产者多消费者的并发队列,支持无锁操作。

  • 关键特性

    • 支持动态扩容。

    • 支持显式生产者(ProducerToken)和隐式生产者(无 token 直接操作)。

    • 支持批量操作(enqueue_bulk/dequeue_bulk)。

    • 支持消费者令牌(ConsumerToken)优化消费路径。

  • 模板参数

    • T:队列中存储的元素类型。

    • Traits:配置队列行为的特性(如块大小、内存分配策略等)。

2. 生产者令牌 ProducerToken

struct ProducerToken
{
	template<typename T, typename Traits>
	explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
	
	template<typename T, typename Traits>
	explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);
	
	ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
		: producer(other.producer)
	{
		other.producer = nullptr;
		if (producer != nullptr) {
			producer->token = this;
		}
	}
	
	inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
	{
		swap(other);
		return *this;
	}
	
	void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
	{
		std::swap(producer, other.producer);
		if (producer != nullptr) {
			producer->token = this;
		}
		if (other.producer != nullptr) {
			other.producer->token = &other;
		}
	}
	

	inline bool valid() const { return producer != nullptr; }
	
	~ProducerToken()
	{
		if (producer != nullptr) {
			producer->token = nullptr;
			producer->inactive.store(true, std::memory_order_release);
		}
	}
	
	// Disable copying and assignment
	ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
	ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
	
private:
	template<typename T, typename Traits> friend class ConcurrentQueue;
	friend class ConcurrentQueueTests;
	
protected:
	details::ConcurrentQueueProducerTypelessBase* producer;
};

  • 功能:显式标识一个生产者,用于优化生产者的并发操作。

  • 关键方法

    • enqueue/try_enqueue:向队列中添加元素。

    • enqueue_bulk/try_enqueue_bulk:批量添加元素。

  • 与 ConcurrentQueue 的关系

    • 通过 ConcurrentQueue 的构造函数或 recycle_or_create_producer 创建。

    • 每个 ProducerToken 关联一个 ExplicitProducer(显式生产者)。

3. 消费者令牌 ConsumerToken

struct ConsumerToken
{
	template<typename T, typename Traits>
	explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
	
	template<typename T, typename Traits>
	explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);
	
	ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
		: initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
	{
	}
	
	inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
	{
		swap(other);
		return *this;
	}
	
	void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
	{
		std::swap(initialOffset, other.initialOffset);
		std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
		std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
		std::swap(currentProducer, other.currentProducer);
		std::swap(desiredProducer, other.desiredProducer);
	}
	
	// Disable copying and assignment
	ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
	ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;

private:
	template<typename T, typename Traits> friend class ConcurrentQueue;
	friend class ConcurrentQueueTests;
	
private: // but shared with ConcurrentQueue
	std::uint32_t initialOffset;
	std::uint32_t lastKnownGlobalOffset;
	std::uint32_t itemsConsumedFromCurrent;
	details::ConcurrentQueueProducerTypelessBase* currentProducer;
	details::ConcurrentQueueProducerTypelessBase* desiredProducer;
};
  • 功能:显式标识一个消费者,用于优化消费者的并发操作。

  • 关键方法

    • try_dequeue:从队列中取出元素。

    • try_dequeue_bulk:批量取出元素。

  • 与 ConcurrentQueue 的关系

    • 通过 ConcurrentQueue 的构造函数创建。

    • 消费者令牌通过轮询机制(EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE)避免长时间锁定单个生产者。

4. 生产者基类 ProducerBase

	///
	// Producer base
	///
	
	struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase
	{
		ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
			tailIndex(0),
			headIndex(0),
			dequeueOptimisticCount(0),
			dequeueOvercommit(0),
			tailBlock(nullptr),
			isExplicit(isExplicit_),
			parent(parent_)
		{
		}
		
		virtual ~ProducerBase() { }
		
		template<typename U>
		inline bool dequeue(U& element)
		{
			if (isExplicit) {
				return static_cast<ExplicitProducer*>(this)->dequeue(element);
			}
			else {
				return static_cast<ImplicitProducer*>(this)->dequeue(element);
			}
		}
		
		template<typename It>
		inline size_t dequeue_bulk(It& itemFirst, size_t max)
		{
			if (isExplicit) {
				return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
			}
			else {
				return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
			}
		}
		
		inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
		
		inline size_t size_approx() const
		{
			auto tail = tailIndex.load(std::memory_order_relaxed);
			auto head = headIndex.load(std::memory_order_relaxed);
			return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
		}
		
		inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
	protected:
		std::atomic<index_t> tailIndex;		// Where to enqueue to next
		std::atomic<index_t> headIndex;		// Where to dequeue from next
		
		std::atomic<index_t> dequeueOptimisticCount;
		std::atomic<index_t> dequeueOvercommit;
		
		Block* tailBlock;
		
	public:
		bool isExplicit;
		ConcurrentQueue* parent;
		
	protected:
#ifdef MCDBGQ_TRACKMEM
		friend struct MemStats;
#endif
	};
	
	
  • 功能:所有生产者的基类(显式和隐式生产者均继承自此类)。

  • 关键字段

    • tailIndex/headIndex:生产者的生产和消费位置。

    • tailBlock:指向当前生产块的指针。

  • 派生类

    • ExplicitProducer:显式生产者(与 ProducerToken 关联)。

    • ImplicitProducer:隐式生产者(无 token,直接操作队列)。

5. 显式生产者 ExplicitProducer

	///
	// Explicit queue
	///
		
	struct ExplicitProducer : public ProducerBase
	{
		explicit ExplicitProducer(ConcurrentQueue* parent_) :
			ProducerBase(parent_, true),
			blockIndex(nullptr),
			pr_blockIndexSlotsUsed(0),
			pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
			pr_blockIndexFront(0),
			pr_blockIndexEntries(nullptr),
			pr_blockIndexRaw(nullptr)
		{
			size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
			if (poolBasedIndexSize > pr_blockIndexSize) {
				pr_blockIndexSize = poolBasedIndexSize;
			}
			
			new_block_index(0);		// This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
		}
		
		~ExplicitProducer()
		{
			// Destruct any elements not yet dequeued.
            ///...
		}
		
		template<AllocationMode allocMode, typename U>
		inline bool enqueue(U&& element)
		{
			///...
		}
		
		template<typename U>
		bool dequeue(U& element)
		{
			auto tail = this->tailIndex.load(std::memory_order_relaxed);

			///...
		
			return false;
		}
		
		template<AllocationMode allocMode, typename It>
		bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst, size_t count)
		{
			///...

			this->tailIndex.store(newTailIndex, std::memory_order_release);
			return true;
		}
		
		template<typename It>
		size_t dequeue_bulk(It& itemFirst, size_t max)
		{
			auto tail = this->tailIndex.load(std::memory_order_relaxed);
			auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
			

			///...

			return 0;
		}
		
	private:
		struct BlockIndexEntry
		{
			index_t base;
			Block* block;
		};
		
		struct BlockIndexHeader
		{
			size_t size;
			std::atomic<size_t> front;		// Current slot (not next, like pr_blockIndexFront)
			BlockIndexEntry* entries;
			void* prev;
		};
		
		
		bool new_block_index(size_t numberOfFilledSlotsToExpose)
		{
			auto prevBlockSizeMask = pr_blockIndexSize - 1;
			
			///...
			
			return true;
		}
		
	private:
		std::atomic<BlockIndexHeader*> blockIndex;
		
		// To be used by producer only -- consumer must use the ones in referenced by blockIndex
		size_t pr_blockIndexSlotsUsed;
		size_t pr_blockIndexSize;
		size_t pr_blockIndexFront;		// Next slot (not current)
		BlockIndexEntry* pr_blockIndexEntries;
		void* pr_blockIndexRaw;
		
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
	public:
		ExplicitProducer* nextExplicitProducer;
	private:
#endif
		
#ifdef MCDBGQ_TRACKMEM
		friend struct MemStats;
#endif
	};
	
	
  • 功能:与 ProducerToken 关联的生产者,独立管理自己的块和索引。

  • 关键特性

    • 使用块索引(BlockIndexHeader)管理分配的块。

    • 支持动态扩容(new_block_index)。

6. 隐式生产者 ImplicitProducer

	//
	// Implicit queue
	//
	
	struct ImplicitProducer : public ProducerBase
	{			
		ImplicitProducer(ConcurrentQueue* parent_) :
			ProducerBase(parent_, false),
			nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
			blockIndex(nullptr)
		{
			new_block_index();
		}
		
		~ImplicitProducer()
		{
			///...
		}
		
		template<AllocationMode allocMode, typename U>
		inline bool enqueue(U&& element)
		{
			index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
			index_t newTailIndex = 1 + currentTailIndex;
 
            ///...
			
			// Enqueue
			new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
			
			this->tailIndex.store(newTailIndex, std::memory_order_release);
			return true;
		}
		
		template<typename U>
		bool dequeue(U& element)
		{
			// See ExplicitProducer::dequeue for rationale and explanation
			index_t tail = this->tailIndex.load(std::memory_order_relaxed);
			index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
			
			///...
		
			return false;
		}
		
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable: 4706)  // assignment within conditional expression
#endif
		template<AllocationMode allocMode, typename It>
		bool enqueue_bulk(It itemFirst, size_t count)
		{
			///...
			this->tailIndex.store(newTailIndex, std::memory_order_release);
			return true;
		}
#ifdef _MSC_VER
#pragma warning(pop)
#endif
		
		template<typename It>
		size_t dequeue_bulk(It& itemFirst, size_t max)
		{
			auto tail = this->tailIndex.load(std::memory_order_relaxed);
			auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
			///...
			
			return 0;
		}
		
	private:
		// The block size must be > 1, so any number with the low bit set is an invalid block base index
		static const index_t INVALID_BLOCK_BASE = 1;
		
		struct BlockIndexEntry
		{
			std::atomic<index_t> key;
			std::atomic<Block*> value;
		};
		
		struct BlockIndexHeader
		{
			size_t capacity;
			std::atomic<size_t> tail;
			BlockIndexEntry* entries;
			BlockIndexEntry** index;
			BlockIndexHeader* prev;
		};
		
		template<AllocationMode allocMode>
		inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
		{
			///...
		}
		
		inline void rewind_block_index_tail()
		{
			auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
			localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
		}
		
		inline BlockIndexEntry* get_block_index_entry_for_index(index_t index) const
		{
			BlockIndexHeader* localBlockIndex;
			auto idx = get_block_index_index_for_index(index, localBlockIndex);
			return localBlockIndex->index[idx];
		}
		
		inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
		{
            ///...
		}
		
		bool new_block_index()
		{
			auto prev = blockIndex.load(std::memory_order_relaxed);
			size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
			///...
			
			return true;
		}
		
	private:
		size_t nextBlockIndexCapacity;
		std::atomic<BlockIndexHeader*> blockIndex;

#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
	public:
		details::ThreadExitListener threadExitListener;
	private:
#endif
		
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
	public:
		ImplicitProducer* nextImplicitProducer;
	private:
#endif

#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
		mutable debug::DebugMutex mutex;
#endif
#ifdef MCDBGQ_TRACKMEM
		friend struct MemStats;
#endif
	};
	
	
  • 功能:无 token 的生产者,通过线程局部存储(TLS)和哈希表管理。

  • 关键特性

    • 使用哈希表(ImplicitProducerHash)快速查找线程对应的生产者。

    • 支持动态扩容(new_block_index)。

    • 通过 threadExitListener 在线程退出时清理资源。

7. 内存管理类 FreeList

	// A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
	// simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
	// speedy under low contention.
	template<typename N>		// N must inherit FreeListNode or have the same fields (and initialization of them)
	struct FreeList
	{
		FreeList() : freeListHead(nullptr) { }
		FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
		void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
		
		FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
		FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
		
		inline void add(N* node)
		{
#ifdef MCDBGQ_NOLOCKFREE_FREELIST
			debug::DebugLock lock(mutex);
#endif		
			// We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
			// set it using a fetch_add
			if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
				// Oh look! We were the last ones referencing this node, and we know
				// we want to add it to the free list, so let's do it!
		 		add_knowing_refcount_is_zero(node);
			}
		}
		
		inline N* try_get()
		{
#ifdef MCDBGQ_NOLOCKFREE_FREELIST
			debug::DebugLock lock(mutex);
#endif		
			auto head = freeListHead.load(std::memory_order_acquire);
			while (head != nullptr) {
				auto prevHead = head;
				auto refs = head->freeListRefs.load(std::memory_order_relaxed);
				if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire)) {
					head = freeListHead.load(std::memory_order_acquire);
					continue;
				}
				
				// Good, reference count has been incremented (it wasn't at zero), which means we can read the
				// next and not worry about it changing between now and the time we do the CAS
				auto next = head->freeListNext.load(std::memory_order_relaxed);
				if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
					// Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
					// matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
					assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
					
					// Decrease refcount twice, once for our ref, and once for the list's ref
					head->freeListRefs.fetch_sub(2, std::memory_order_release);
					return head;
				}
				
				// OK, the head must have changed on us, but we still need to decrease the refcount we increased.
				// Note that we don't need to release any memory effects, but we do need to ensure that the reference
				// count decrement happens-after the CAS on the head.
				refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
				if (refs == SHOULD_BE_ON_FREELIST + 1) {
					add_knowing_refcount_is_zero(prevHead);
				}
			}
			
			return nullptr;
		}
		
		// Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
		N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
		
	private:
		inline void add_knowing_refcount_is_zero(N* node)
		{
			auto head = freeListHead.load(std::memory_order_relaxed);
			while (true) {
				node->freeListNext.store(head, std::memory_order_relaxed);
				node->freeListRefs.store(1, std::memory_order_release);
				if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
					// Hmm, the add failed, but we can only try again when the refcount goes back to zero
					if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_acq_rel) == 1) {
						continue;
					}
				}
				return;
			}
		}
		
	private:
		// Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
		std::atomic<N*> freeListHead;
	
	static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
	static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
		
#ifdef MCDBGQ_NOLOCKFREE_FREELIST
		debug::DebugMutex mutex;
#endif
	};
	
	
  • 功能:管理空闲块的锁自由链表。

  • 关键方法

    • add:将块添加到空闲链表。

    • try_get:从空闲链表中获取块。

  • 与 ConcurrentQueue 的关系

    • 队列通过 FreeList 复用已分配的块,减少内存分配开销。

8. 块结构 Block

	struct Block
	{
		Block()
			: next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), dynamicallyAllocated(true)
		{
#ifdef MCDBGQ_TRACKMEM
			owner = nullptr;
#endif
		}
		
		template<InnerQueueContext context>
		inline bool is_empty() const
		{
			MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
				// Check flags
				for (size_t i = 0; i < BLOCK_SIZE; ++i) {
					if (!emptyFlags[i].load(std::memory_order_relaxed)) {
						return false;
					}
				}
				
				// Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
				std::atomic_thread_fence(std::memory_order_acquire);
				return true;
			}
			else {
				// Check counter
				if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
					std::atomic_thread_fence(std::memory_order_acquire);
					return true;
				}
				assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
				return false;
			}
		}
		
		// Returns true if the block is now empty (does not apply in explicit context)
		template<InnerQueueContext context>
		inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i)
		{
			MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
				// Set flag
				assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
				emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
				return false;
			}
			else {
				// Increment counter
				auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_acq_rel);
				assert(prevVal < BLOCK_SIZE);
				return prevVal == BLOCK_SIZE - 1;
			}
		}
		
		// Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
		// Returns true if the block is now empty (does not apply in explicit context).
		template<InnerQueueContext context>
		inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i, size_t count)
		{
			MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
				// Set flags
				std::atomic_thread_fence(std::memory_order_release);
				i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
				for (size_t j = 0; j != count; ++j) {
					assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
					emptyFlags[i + j].store(true, std::memory_order_relaxed);
				}
				return false;
			}
			else {
				// Increment counter
				auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_acq_rel);
				assert(prevVal + count <= BLOCK_SIZE);
				return prevVal + count == BLOCK_SIZE;
			}
		}
		
		template<InnerQueueContext context>
		inline void set_all_empty()
		{
			MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
				// Set all flags
				for (size_t i = 0; i != BLOCK_SIZE; ++i) {
					emptyFlags[i].store(true, std::memory_order_relaxed);
				}
			}
			else {
				// Reset counter
				elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
			}
		}
		
		template<InnerQueueContext context>
		inline void reset_empty()
		{
			MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
				// Reset flags
				for (size_t i = 0; i != BLOCK_SIZE; ++i) {
					emptyFlags[i].store(false, std::memory_order_relaxed);
				}
			}
			else {
				// Reset counter
				elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
			}
		}
		
		inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
		inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
		
	private:
		static_assert(std::alignment_of<T>::value <= sizeof(T), "The queue does not support types with an alignment greater than their size at this time");
		MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * BLOCK_SIZE], T) elements;
	public:
		Block* next;
		std::atomic<size_t> elementsCompletelyDequeued;
		std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
	public:
		std::atomic<std::uint32_t> freeListRefs;
		std::atomic<Block*> freeListNext;
		bool dynamicallyAllocated;		// Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
		
#ifdef MCDBGQ_TRACKMEM
		void* owner;
#endif
	};
	static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");

  • 功能:存储队列元素的基本单元。

  • 关键字段

    • elements:存储元素的数组。

    • next:指向下一个块的指针。

    • emptyFlags:标记元素是否已被消费。

  • 与生产者的关系

    • 生产者通过 Block 管理元素的写入和读取。

类关系总结

  1. ConcurrentQueue 是核心类,管理生产者和消费者的交互。

  2. ProducerToken 和 ConsumerToken 是外部接口,用于优化并发操作。

  3. ProducerBase 是生产者的基类,派生为 ExplicitProducer(显式)和 ImplicitProducer(隐式)。

  4. FreeList 和 Block** 负责内存管理和数据存储。

典型工作流程

  1. 生产者

    • 显式生产者通过 ProducerToken 操作队列。

    • 隐式生产者直接操作队列,通过哈希表查找自己的状态。

  2. 消费者

    • 普通消费者直接消费。

    • 使用 ConsumerToken 的消费者通过轮询机制优化消费路径。

  3. 内存管理

    • 空闲块通过 FreeList 复用,减少动态分配。

这个设计通过分离生产者和消费者的路径、动态扩容和块复用,实现了高性能的并发队列。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值