moodycamel 的并发队列(ConcurrentQueue)实现,它是一个高性能的多生产者多消费者(MPMC)无锁队列。以下是代码中几个主要类的功能及其关系:
1. ConcurrentQueue
类
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class ConcurrentQueue
{
public:
// Creates a queue with at least `capacity` element slots; note that the
explicit ConcurrentQueue(size_t capacity = 32 * BLOCK_SIZE)
: producerListTail(nullptr),
producerCount(0),
initialBlockPoolIndex(0),
nextExplicitConsumerId(0),
globalExplicitConsumerOffset(0)
{
///...
}
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
: producerListTail(nullptr),
producerCount(0),
initialBlockPoolIndex(0),
nextExplicitConsumerId(0),
globalExplicitConsumerOffset(0)
{
///...
}
~ConcurrentQueue()
{
///...
}
inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
{
swap_internal(other);
}
public:
inline bool enqueue(T const& item)
{
MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
else return inner_enqueue<CanAlloc>(item);
}
inline bool enqueue(T&& item)
{
MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
else return inner_enqueue<CanAlloc>(std::move(item));
}
inline bool enqueue(producer_token_t const& token, T const& item)
{
return inner_enqueue<CanAlloc>(token, item);
}
inline bool enqueue(producer_token_t const& token, T&& item)
{
return inner_enqueue<CanAlloc>(token, std::move(item));
}
template<typename It>
bool enqueue_bulk(It itemFirst, size_t count)
{
MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
}
template<typename It>
bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
}
inline bool try_enqueue(T const& item)
{
MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
else return inner_enqueue<CannotAlloc>(item);
}
inline bool try_enqueue(T&& item)
{
MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
else return inner_enqueue<CannotAlloc>(std::move(item));
}
inline bool try_enqueue(producer_token_t const& token, T const& item)
{
return inner_enqueue<CannotAlloc>(token, item);
}
inline bool try_enqueue(producer_token_t const& token, T&& item)
{
return inner_enqueue<CannotAlloc>(token, std::move(item));
}
template<typename It>
bool try_enqueue_bulk(It itemFirst, size_t count)
{
MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
}
template<typename It>
bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
}
template<typename U>
bool try_dequeue(U& item)
{
///...
}
template<typename U>
bool try_dequeue_non_interleaved(U& item)
{
for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
if (ptr->dequeue(item)) {
return true;
}
}
return false;
}
template<typename U>
bool try_dequeue(consumer_token_t& token, U& item)
{
///...
}
template<typename It>
size_t try_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
count += ptr->dequeue_bulk(itemFirst, max - count);
if (count == max) {
break;
}
}
return count;
}
template<typename It>
size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
///...
}
template<typename U>
inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
{
return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
}
template<typename It>
inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
{
return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
}
size_t size_approx() const
{
size_t size = 0;
for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
size += ptr->size_approx();
}
return size;
}
static constexpr bool is_lock_free()
{
}
private:
friend struct ProducerToken;
friend struct ConsumerToken;
struct ExplicitProducer;
friend struct ExplicitProducer;
struct ImplicitProducer;
friend struct ImplicitProducer;
friend class ConcurrentQueueTests;
enum AllocationMode { CanAlloc, CannotAlloc };
std::atomic<ProducerBase*> producerListTail;
std::atomic<std::uint32_t> producerCount;
std::atomic<size_t> initialBlockPoolIndex;
Block* initialBlockPool;
size_t initialBlockPoolSize;
FreeList<Block> freeList;
std::atomic<ImplicitProducerHash*> implicitProducerHash;
std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
ImplicitProducerHash initialImplicitProducerHash;
std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
std::atomic_flag implicitProducerHashResizeInProgress;
std::atomic<std::uint32_t> nextExplicitConsumerId;
std::atomic<std::uint32_t> globalExplicitConsumerOffset;
};
-
功能:提供多生产者多消费者的并发队列,支持无锁操作。
-
关键特性:
-
支持动态扩容。
-
支持显式生产者(
ProducerToken
)和隐式生产者(无 token 直接操作)。 -
支持批量操作(
enqueue_bulk
/dequeue_bulk
)。 -
支持消费者令牌(
ConsumerToken
)优化消费路径。
-
-
模板参数:
-
T
:队列中存储的元素类型。 -
Traits
:配置队列行为的特性(如块大小、内存分配策略等)。
-
2. 生产者令牌 ProducerToken
struct ProducerToken
{
template<typename T, typename Traits>
explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
template<typename T, typename Traits>
explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);
ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
: producer(other.producer)
{
other.producer = nullptr;
if (producer != nullptr) {
producer->token = this;
}
}
inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
{
swap(other);
return *this;
}
void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
{
std::swap(producer, other.producer);
if (producer != nullptr) {
producer->token = this;
}
if (other.producer != nullptr) {
other.producer->token = &other;
}
}
inline bool valid() const { return producer != nullptr; }
~ProducerToken()
{
if (producer != nullptr) {
producer->token = nullptr;
producer->inactive.store(true, std::memory_order_release);
}
}
// Disable copying and assignment
ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
private:
template<typename T, typename Traits> friend class ConcurrentQueue;
friend class ConcurrentQueueTests;
protected:
details::ConcurrentQueueProducerTypelessBase* producer;
};
-
功能:显式标识一个生产者,用于优化生产者的并发操作。
-
关键方法:
-
enqueue
/try_enqueue
:向队列中添加元素。 -
enqueue_bulk
/try_enqueue_bulk
:批量添加元素。
-
-
与
ConcurrentQueue
的关系:-
通过
ConcurrentQueue
的构造函数或recycle_or_create_producer
创建。 -
每个
ProducerToken
关联一个ExplicitProducer
(显式生产者)。
-
3. 消费者令牌 ConsumerToken
struct ConsumerToken
{
template<typename T, typename Traits>
explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
template<typename T, typename Traits>
explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);
ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
: initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
{
}
inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
{
swap(other);
return *this;
}
void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
{
std::swap(initialOffset, other.initialOffset);
std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
std::swap(currentProducer, other.currentProducer);
std::swap(desiredProducer, other.desiredProducer);
}
// Disable copying and assignment
ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
private:
template<typename T, typename Traits> friend class ConcurrentQueue;
friend class ConcurrentQueueTests;
private: // but shared with ConcurrentQueue
std::uint32_t initialOffset;
std::uint32_t lastKnownGlobalOffset;
std::uint32_t itemsConsumedFromCurrent;
details::ConcurrentQueueProducerTypelessBase* currentProducer;
details::ConcurrentQueueProducerTypelessBase* desiredProducer;
};
-
功能:显式标识一个消费者,用于优化消费者的并发操作。
-
关键方法:
-
try_dequeue
:从队列中取出元素。 -
try_dequeue_bulk
:批量取出元素。
-
-
与
ConcurrentQueue
的关系:-
通过
ConcurrentQueue
的构造函数创建。 -
消费者令牌通过轮询机制(
EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
)避免长时间锁定单个生产者。
-
4. 生产者基类 ProducerBase
///
// Producer base
///
struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase
{
ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
tailIndex(0),
headIndex(0),
dequeueOptimisticCount(0),
dequeueOvercommit(0),
tailBlock(nullptr),
isExplicit(isExplicit_),
parent(parent_)
{
}
virtual ~ProducerBase() { }
template<typename U>
inline bool dequeue(U& element)
{
if (isExplicit) {
return static_cast<ExplicitProducer*>(this)->dequeue(element);
}
else {
return static_cast<ImplicitProducer*>(this)->dequeue(element);
}
}
template<typename It>
inline size_t dequeue_bulk(It& itemFirst, size_t max)
{
if (isExplicit) {
return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
}
else {
return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
}
}
inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
inline size_t size_approx() const
{
auto tail = tailIndex.load(std::memory_order_relaxed);
auto head = headIndex.load(std::memory_order_relaxed);
return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
}
inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
protected:
std::atomic<index_t> tailIndex; // Where to enqueue to next
std::atomic<index_t> headIndex; // Where to dequeue from next
std::atomic<index_t> dequeueOptimisticCount;
std::atomic<index_t> dequeueOvercommit;
Block* tailBlock;
public:
bool isExplicit;
ConcurrentQueue* parent;
protected:
#ifdef MCDBGQ_TRACKMEM
friend struct MemStats;
#endif
};
-
功能:所有生产者的基类(显式和隐式生产者均继承自此类)。
-
关键字段:
-
tailIndex
/headIndex
:生产者的生产和消费位置。 -
tailBlock
:指向当前生产块的指针。
-
-
派生类:
-
ExplicitProducer
:显式生产者(与ProducerToken
关联)。 -
ImplicitProducer
:隐式生产者(无 token,直接操作队列)。
-
5. 显式生产者 ExplicitProducer
///
// Explicit queue
///
struct ExplicitProducer : public ProducerBase
{
explicit ExplicitProducer(ConcurrentQueue* parent_) :
ProducerBase(parent_, true),
blockIndex(nullptr),
pr_blockIndexSlotsUsed(0),
pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
pr_blockIndexFront(0),
pr_blockIndexEntries(nullptr),
pr_blockIndexRaw(nullptr)
{
size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
if (poolBasedIndexSize > pr_blockIndexSize) {
pr_blockIndexSize = poolBasedIndexSize;
}
new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
}
~ExplicitProducer()
{
// Destruct any elements not yet dequeued.
///...
}
template<AllocationMode allocMode, typename U>
inline bool enqueue(U&& element)
{
///...
}
template<typename U>
bool dequeue(U& element)
{
auto tail = this->tailIndex.load(std::memory_order_relaxed);
///...
return false;
}
template<AllocationMode allocMode, typename It>
bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst, size_t count)
{
///...
this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
}
template<typename It>
size_t dequeue_bulk(It& itemFirst, size_t max)
{
auto tail = this->tailIndex.load(std::memory_order_relaxed);
auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
///...
return 0;
}
private:
struct BlockIndexEntry
{
index_t base;
Block* block;
};
struct BlockIndexHeader
{
size_t size;
std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
BlockIndexEntry* entries;
void* prev;
};
bool new_block_index(size_t numberOfFilledSlotsToExpose)
{
auto prevBlockSizeMask = pr_blockIndexSize - 1;
///...
return true;
}
private:
std::atomic<BlockIndexHeader*> blockIndex;
// To be used by producer only -- consumer must use the ones in referenced by blockIndex
size_t pr_blockIndexSlotsUsed;
size_t pr_blockIndexSize;
size_t pr_blockIndexFront; // Next slot (not current)
BlockIndexEntry* pr_blockIndexEntries;
void* pr_blockIndexRaw;
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
public:
ExplicitProducer* nextExplicitProducer;
private:
#endif
#ifdef MCDBGQ_TRACKMEM
friend struct MemStats;
#endif
};
-
功能:与
ProducerToken
关联的生产者,独立管理自己的块和索引。 -
关键特性:
-
使用块索引(
BlockIndexHeader
)管理分配的块。 -
支持动态扩容(
new_block_index
)。
-
6. 隐式生产者 ImplicitProducer
//
// Implicit queue
//
struct ImplicitProducer : public ProducerBase
{
ImplicitProducer(ConcurrentQueue* parent_) :
ProducerBase(parent_, false),
nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
blockIndex(nullptr)
{
new_block_index();
}
~ImplicitProducer()
{
///...
}
template<AllocationMode allocMode, typename U>
inline bool enqueue(U&& element)
{
index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
index_t newTailIndex = 1 + currentTailIndex;
///...
// Enqueue
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
}
template<typename U>
bool dequeue(U& element)
{
// See ExplicitProducer::dequeue for rationale and explanation
index_t tail = this->tailIndex.load(std::memory_order_relaxed);
index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
///...
return false;
}
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable: 4706) // assignment within conditional expression
#endif
template<AllocationMode allocMode, typename It>
bool enqueue_bulk(It itemFirst, size_t count)
{
///...
this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
}
#ifdef _MSC_VER
#pragma warning(pop)
#endif
template<typename It>
size_t dequeue_bulk(It& itemFirst, size_t max)
{
auto tail = this->tailIndex.load(std::memory_order_relaxed);
auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
///...
return 0;
}
private:
// The block size must be > 1, so any number with the low bit set is an invalid block base index
static const index_t INVALID_BLOCK_BASE = 1;
struct BlockIndexEntry
{
std::atomic<index_t> key;
std::atomic<Block*> value;
};
struct BlockIndexHeader
{
size_t capacity;
std::atomic<size_t> tail;
BlockIndexEntry* entries;
BlockIndexEntry** index;
BlockIndexHeader* prev;
};
template<AllocationMode allocMode>
inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
{
///...
}
inline void rewind_block_index_tail()
{
auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
}
inline BlockIndexEntry* get_block_index_entry_for_index(index_t index) const
{
BlockIndexHeader* localBlockIndex;
auto idx = get_block_index_index_for_index(index, localBlockIndex);
return localBlockIndex->index[idx];
}
inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
{
///...
}
bool new_block_index()
{
auto prev = blockIndex.load(std::memory_order_relaxed);
size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
///...
return true;
}
private:
size_t nextBlockIndexCapacity;
std::atomic<BlockIndexHeader*> blockIndex;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
public:
details::ThreadExitListener threadExitListener;
private:
#endif
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
public:
ImplicitProducer* nextImplicitProducer;
private:
#endif
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
mutable debug::DebugMutex mutex;
#endif
#ifdef MCDBGQ_TRACKMEM
friend struct MemStats;
#endif
};
-
功能:无 token 的生产者,通过线程局部存储(TLS)和哈希表管理。
-
关键特性:
-
使用哈希表(
ImplicitProducerHash
)快速查找线程对应的生产者。 -
支持动态扩容(
new_block_index
)。 -
通过
threadExitListener
在线程退出时清理资源。
-
7. 内存管理类 FreeList
// A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
// simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
// speedy under low contention.
template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
struct FreeList
{
FreeList() : freeListHead(nullptr) { }
FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
inline void add(N* node)
{
#ifdef MCDBGQ_NOLOCKFREE_FREELIST
debug::DebugLock lock(mutex);
#endif
// We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
// set it using a fetch_add
if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
// Oh look! We were the last ones referencing this node, and we know
// we want to add it to the free list, so let's do it!
add_knowing_refcount_is_zero(node);
}
}
inline N* try_get()
{
#ifdef MCDBGQ_NOLOCKFREE_FREELIST
debug::DebugLock lock(mutex);
#endif
auto head = freeListHead.load(std::memory_order_acquire);
while (head != nullptr) {
auto prevHead = head;
auto refs = head->freeListRefs.load(std::memory_order_relaxed);
if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire)) {
head = freeListHead.load(std::memory_order_acquire);
continue;
}
// Good, reference count has been incremented (it wasn't at zero), which means we can read the
// next and not worry about it changing between now and the time we do the CAS
auto next = head->freeListNext.load(std::memory_order_relaxed);
if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
// Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
// matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
// Decrease refcount twice, once for our ref, and once for the list's ref
head->freeListRefs.fetch_sub(2, std::memory_order_release);
return head;
}
// OK, the head must have changed on us, but we still need to decrease the refcount we increased.
// Note that we don't need to release any memory effects, but we do need to ensure that the reference
// count decrement happens-after the CAS on the head.
refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
if (refs == SHOULD_BE_ON_FREELIST + 1) {
add_knowing_refcount_is_zero(prevHead);
}
}
return nullptr;
}
// Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
private:
inline void add_knowing_refcount_is_zero(N* node)
{
auto head = freeListHead.load(std::memory_order_relaxed);
while (true) {
node->freeListNext.store(head, std::memory_order_relaxed);
node->freeListRefs.store(1, std::memory_order_release);
if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
// Hmm, the add failed, but we can only try again when the refcount goes back to zero
if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_acq_rel) == 1) {
continue;
}
}
return;
}
}
private:
// Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
std::atomic<N*> freeListHead;
static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
#ifdef MCDBGQ_NOLOCKFREE_FREELIST
debug::DebugMutex mutex;
#endif
};
-
功能:管理空闲块的锁自由链表。
-
关键方法:
-
add
:将块添加到空闲链表。 -
try_get
:从空闲链表中获取块。
-
-
与
ConcurrentQueue
的关系:-
队列通过
FreeList
复用已分配的块,减少内存分配开销。
-
8. 块结构 Block
struct Block
{
Block()
: next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), dynamicallyAllocated(true)
{
#ifdef MCDBGQ_TRACKMEM
owner = nullptr;
#endif
}
template<InnerQueueContext context>
inline bool is_empty() const
{
MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
// Check flags
for (size_t i = 0; i < BLOCK_SIZE; ++i) {
if (!emptyFlags[i].load(std::memory_order_relaxed)) {
return false;
}
}
// Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
std::atomic_thread_fence(std::memory_order_acquire);
return true;
}
else {
// Check counter
if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
std::atomic_thread_fence(std::memory_order_acquire);
return true;
}
assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
return false;
}
}
// Returns true if the block is now empty (does not apply in explicit context)
template<InnerQueueContext context>
inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i)
{
MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
// Set flag
assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
return false;
}
else {
// Increment counter
auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_acq_rel);
assert(prevVal < BLOCK_SIZE);
return prevVal == BLOCK_SIZE - 1;
}
}
// Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
// Returns true if the block is now empty (does not apply in explicit context).
template<InnerQueueContext context>
inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i, size_t count)
{
MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
// Set flags
std::atomic_thread_fence(std::memory_order_release);
i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
for (size_t j = 0; j != count; ++j) {
assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
emptyFlags[i + j].store(true, std::memory_order_relaxed);
}
return false;
}
else {
// Increment counter
auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_acq_rel);
assert(prevVal + count <= BLOCK_SIZE);
return prevVal + count == BLOCK_SIZE;
}
}
template<InnerQueueContext context>
inline void set_all_empty()
{
MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
// Set all flags
for (size_t i = 0; i != BLOCK_SIZE; ++i) {
emptyFlags[i].store(true, std::memory_order_relaxed);
}
}
else {
// Reset counter
elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
}
}
template<InnerQueueContext context>
inline void reset_empty()
{
MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
// Reset flags
for (size_t i = 0; i != BLOCK_SIZE; ++i) {
emptyFlags[i].store(false, std::memory_order_relaxed);
}
}
else {
// Reset counter
elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
}
}
inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
private:
static_assert(std::alignment_of<T>::value <= sizeof(T), "The queue does not support types with an alignment greater than their size at this time");
MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * BLOCK_SIZE], T) elements;
public:
Block* next;
std::atomic<size_t> elementsCompletelyDequeued;
std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
public:
std::atomic<std::uint32_t> freeListRefs;
std::atomic<Block*> freeListNext;
bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
#ifdef MCDBGQ_TRACKMEM
void* owner;
#endif
};
static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
-
功能:存储队列元素的基本单元。
-
关键字段:
-
elements
:存储元素的数组。 -
next
:指向下一个块的指针。 -
emptyFlags
:标记元素是否已被消费。
-
-
与生产者的关系:
-
生产者通过
Block
管理元素的写入和读取。
-
类关系总结
-
ConcurrentQueue
是核心类,管理生产者和消费者的交互。 -
ProducerToken
和ConsumerToken
是外部接口,用于优化并发操作。 -
ProducerBase
是生产者的基类,派生为ExplicitProducer
(显式)和ImplicitProducer
(隐式)。 -
FreeList
和Block
** 负责内存管理和数据存储。
典型工作流程
-
生产者:
-
显式生产者通过
ProducerToken
操作队列。 -
隐式生产者直接操作队列,通过哈希表查找自己的状态。
-
-
消费者:
-
普通消费者直接消费。
-
使用
ConsumerToken
的消费者通过轮询机制优化消费路径。
-
-
内存管理:
-
空闲块通过
FreeList
复用,减少动态分配。
-
这个设计通过分离生产者和消费者的路径、动态扩容和块复用,实现了高性能的并发队列。