双buffer实现无锁队列
时间: 2025-01-10 07:30:15 浏览: 47
### 双缓冲技术实现无锁队列
#### 背景介绍
双缓冲是一种常见的优化技术,在图形处理和其他领域广泛应用。在并发编程中,通过引入两个独立的数据区(即两份缓存),可以有效减少竞争并提高性能。对于无锁队列而言,这意味着可以在不阻塞其他线程的情况下完成入队和出队操作。
#### 设计思路
为了构建一个基于双缓冲机制的无锁队列,通常会采用如下策略:
- 使用两个指针分别指向当前正在使用的生产者缓冲区和消费者缓冲区;
- 当前生产者的写入只发生在其中一个缓冲区内,而消费者的读取则针对另一个已完成填充或部分填充的状态良好的缓冲区进行;
- 生产者一旦填满其负责的那个缓冲区,则尝试交换这两个角色所对应的物理地址;
- 如果此时消费速度跟不上生产速率,可能会导致临时性的等待现象;反之亦然。
这种设计允许高吞吐量的同时保持较低的竞争开销,因为大多数时间里只有一个活跃的操作集处于修改状态之中[^1]。
#### C++代码示例
下面是一个简单的C++版本的双缓冲无锁队列实现:
```cpp
#include <atomic>
#include <memory>
template<typename T>
class DoubleBufferedQueue {
private:
struct BufferNode;
public:
using value_type = T;
class Iterator {
friend class DoubleBufferedQueue<T>;
explicit Iterator(BufferNode* node) : current_(node) {}
bool operator!=(const Iterator& other) const noexcept { return this->current_ != other.current_; }
void operator++() { ++this->current_->index; }
reference operator*() const { return *reinterpret_cast<value_type*>(this->current_->data + sizeof(value_type)*this->current_->index.load()); }
private:
BufferNode* current_;
};
DoubleBufferedQueue(size_t capacity);
~DoubleBufferedQueue();
std::pair<bool, size_t> enqueue(const value_type&);
std::optional<value_type> dequeue();
std::pair<Iterator, Iterator> getRangeForRead();
protected:
static constexpr auto kMaxCapacity = SIZE_MAX / 2uL;
private:
struct BufferNode final {
char data[kMaxCapacity];
std::atomic<size_t> index{0};
BufferNode* next{nullptr};
BufferNode() = default;
~BufferNode() = default;
};
std::atomic<BufferNode*> head_{new BufferNode()};
std::atomic<BufferNode*> tail_{head_.load()};
size_t capacity_;
};
// 构造函数初始化成员变量
template<typename T>
inline DoubleBufferedQueue<T>::DoubleBufferedQueue(size_t capacity)
:capacity_(std::min(capacity,kMaxCapacity))
{}
// 销毁对象时释放资源
template<typename T>
inline DoubleBufferedQueue<T>::~DoubleBufferedQueue()
{
while(auto ptr = head_.exchange(nullptr)){
delete[] reinterpret_cast<char*>(ptr);
}
}
// 将新元素加入到队尾
template<typename T>
inline std::pair<bool,size_t> DoubleBufferedQueue<T>::enqueue(const value_type& item){
auto old_tail = tail_.load(std::memory_order_relaxed);
if(old_tail->next || !tail_.compare_exchange_strong(old_tail,old_tail->next)){
// 已经有新的节点被创建出来作为下一个buffer了
return {};
}
new (static_cast<void*>(&old_tail->data[old_tail->index.fetch_add(sizeof(T),std::memory_order_release)]))T(item);
if((old_tail->index.load()>=(capacity_-sizeof(T)))&&!(old_tail->next)){
// 创建一个新的空闲buffer用于后续存储更多item
auto pNewBuff=new BufferNode{};
do{}while(!tail_.compare_exchange_weak(old_tail,pNewBuff));
old_tail->next=pNewBuff;
}
return {{true},old_tail->index.load()-1};
}
// 移除并返回队首元素
template<typename T>
inline std::optional<value_type> DoubleBufferedQueue<T>::dequeue(){
auto front=head_.load(std::memory_order_acquire);
if(front==tail_.load()){
// 队列为空的情况
return {};
}
auto ret=*reinterpret_cast<const value_type*>(&front->data[(front->index++)%capacity_]);
(*ret).~T(); // 手动调用析构函数
if(head_.load()->index>=capacity_ && head_.load()!=tail_.load()){
// 完整遍历了一个buffer之后更新头结点位置
auto prev_head=head_.exchange(head_.load()->next,std::memory_order_acq_rel);
delete []prev_head;
}
return ret;
}
```
此段代码展示了如何利用模板类`DoubleBufferedQueue`来封装一个支持泛型类型的双缓冲无锁队列。其中包含了基本的功能如插入(`enqueue`)、删除(`dequeue`)以及获取可迭代范围的方法(`getRangeForRead`)。注意这里简化了一些细节以便更好地理解核心逻辑[^3]。
阅读全文
相关推荐


















