这个图表分为几个主要部分:
- 类定义部分
:
展示了
Node<T>
结构体的成员和构造函数列出了
LinkedListQueue<T>
类的属性(如头指针、尾指针、互斥锁等)展示了
LinkedListQueue<T>
类的方法(包括构造函数、析构函数、入队、出队等操作)
- enqueue操作流程
:
展示了入队操作的完整流程,包括创建新节点、获取锁、更新队列指针和通知等待线程
使用流程图清晰地表示了空队列和非空队列两种情况下的处理逻辑
- dequeue操作流程
:
展示了出队操作的完整流程,包括获取锁、条件等待、提取数据和更新队列状态
通过流程图展示了出队后队列变为空和非空两种情况的不同处理
- 线程交互部分
:
可视化了两个生产者线程和一个消费者线程之间的交互关系
展示了它们通过共享队列进行通信的方式
标注了同步机制(互斥锁、条件变量和原子变量)
概述了main函数中的线程创建和等待逻辑
基于链表实现的线程安全队列(生产者–消费者模型)的完整示例。为简单起见,我们自定义一个
LinkedListQueue<T>
,并用 C++11 的线程、互斥锁与条件变量来协调生产者和消费者的行为。#include <iostream> // 引入输入输出流库,用于标准输入输出功能#include <thread> // 引入线程库,用于创建和管理多个线程#include <mutex> // 引入互斥锁库,用于保护共享数据,防止多个线程同时修改数据#include <condition_variable> // 引入条件变量库,用于线程间的等待和通知机制#include <atomic> // 引入原子操作库,用于提供多线程环境下的原子变量操作#include <chrono> // 引入时间库,用于进行时间操作,如线程睡眠等 // 定义链表节点模板,用于构建队列内部的链表结构template<typename T>struct Node { T data; // 存储节点数据 Node* next; // 指向下一个节点的指针 Node(const T& d) : data(d), next(nullptr) {} // 构造函数,初始化数据为 d,next 指针默认为空}; // 定义基于链表实现的线程安全队列模板类template<typename T>class LinkedListQueue {public: // 构造函数:初始化队列为空(头尾指针置空,大小为 0) LinkedListQueue() : head_(nullptr), tail_(nullptr), size_(0) {} // 析构函数:在对象销毁时调用 clear() 清空队列,释放所有节点资源 ~LinkedListQueue() { clear(); } // 入队操作:将新元素加入队列尾部 void enqueue(const T& value) { Node<T>* new_node = new Node<T>(value); // 创建一个新节点,用于存储入队的值 std::unique_lock<std::mutex> lock(mtx_); // 加锁,确保对队列的修改操作是线程安全的 if (!tail_) { // 如果队列为空(没有尾节点) head_ = tail_ = new_node; // 则同时将头和尾指针指向新节点 } else { // 如果队列非空 tail_->next = new_node; // 将新节点链接到当前尾节点之后 tail_ = new_node; // 更新尾指针,使之指向新加入的节点 } ++size_; // 队列中元素数量加 1 // 通知一个等待的消费者线程,新数据已经加入队列 cv_.notify_one(); } // 出队操作(阻塞版):从队列头部移除元素并返回其值,如果队列为空则阻塞等待 T dequeue() { std::unique_lock<std::mutex> lock(mtx_); // 加锁,确保线程安全地访问和修改共享队列 // 等待直到队列中至少有一个元素(条件满足时解除阻塞) cv_.wait(lock, [this]() { return size_ > 0; }); Node<T>* node = head_; // 保存当前头节点的地址 T value = node->data; // 读取头节点中的数据,准备返回 head_ = head_->next; // 将头指针移动到下一个节点 if (!head_) { // 如果队列因此为空 tail_ = nullptr; // 同时将尾指针置为 nullptr } --size_; // 队列中元素数量减 1 lock.unlock(); // 解锁,允许其他线程访问队列 delete node; // 释放之前保存的头节点,防止内存泄漏 return value; // 返回出队元素 } // 清空队列:删除队列中所有的节点,释放内存 void clear() { std::unique_lock<std::mutex> lock(mtx_); // 上锁,确保在清空过程中的操作是线程安全的 while (head_) { // 当队列中还有节点时 Node<T>* tmp = head_; // 保存当前头节点 head_ = head_->next; // 将头指针移动到下一个节点 delete tmp; // 删除保存的节点,释放内存 } tail_ = nullptr; // 重置尾指针为空 size_ = 0; // 重置队列大小为 0 } // 获取当前队列中元素的数量,保证线程安全 size_t size() const { std::lock_guard<std::mutex> lock(mtx_); // 加锁(在函数退出时自动释放),保护对 size_ 的访问 return size_; // 返回队列中元素的数量 } private: Node<T>* head_; // 指向队列头部节点的指针 Node<T>* tail_; // 指向队列尾部节点的指针 mutable std::mutex mtx_; // 互斥锁,用于保护队列中的数据,即使在 const 方法中也可修改 std::condition_variable cv_; // 条件变量,用于实现当队列为空时的阻塞等待及通知机制 size_t size_; // 记录队列中当前元素的数量}; // 模拟生产者函数:生成若干整数,将它们入队,并在生产完成后设置原子标记void producer(LinkedListQueue<int>& queue, int count, int id, std::atomic<bool>& done_flag) { for (int i = 1; i <= count; ++i) { // 循环生成 count 个整数 int value = id * 100 + i; // 生成的数值,通过生产者 id*100 + 当前计数,便于区分不同生产者产生的数据 queue.enqueue(value); // 将生成的数值加入队列 std::cout << "[Producer " << id << "] enqueued " << value << "\n"; // 输出入队日志,显示生产者 id 及入队的数值 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 休眠 100 毫秒,模拟生产时的间隔 } // 生产任务完成后,通过原子变量设置完成标记,确保线程间安全通信 done_flag.store(true);} // 模拟消费者函数:不停从队列中取数据,直到所有生产者都完成并且队列为空void consumer(LinkedListQueue<int>& queue, std::atomic<bool>& done_flag1, std::atomic<bool>& done_flag2) { while (true) { // 进入无限循环,不断尝试消费队列中的元素 // 判断退出条件:当两个生产者都已经完成且队列为空时,退出循环 if (done_flag1.load() && done_flag2.load() && queue.size() == 0) { break; } int value = queue.dequeue(); // 从队列中阻塞式取出一个元素(若队列为空则等待) std::cout << " [Consumer] dequeued " << value << "\n"; // 输出出队日志,显示消费者取出的数值 std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 休眠 150 毫秒,模拟消费过程中的间隔 } std::cout << "[Consumer] no more items, exiting.\n"; // 消费结束后,输出提示信息} int main() { LinkedListQueue<int> queue; // 创建一个存放 int 类型数据的线程安全队列 std::atomic<bool> prod1_done(false), prod2_done(false); // 初始化两个原子标记,用于标识两个生产者是否完成 // 启动两个生产者线程和一个消费者线程 std::thread p1(producer, std::ref(queue), 10, 1, std::ref(prod1_done)); // 启动第一个生产者线程,生产 10 个数据,id 为 1 std::thread p2(producer, std::ref(queue), 10, 2, std::ref(prod2_done)); // 启动第二个生产者线程,生产 10 个数据,id 为 2 std::thread c(consumer, std::ref(queue), std::ref(prod1_done), std::ref(prod2_done)); // 启动消费者线程,等待并处理入队的数据 p1.join(); // 等待第一个生产者线程执行结束 p2.join(); // 等待第二个生产者线程执行结束 c.join(); // 等待消费者线程执行结束 std::cout << "All threads have finished.\n"; // 输出所有线程执行完毕的提示信息 return 0; // 主函数返回 0,表示程序正常结束}
说明:
LinkedListQueue
类
内部通过
Node<T>
链表维护队列元素。使用
std::mutex
和std::condition_variable
实现线程安全与阻塞等待。enqueue
在插入元素后调用notify_one()
唤醒等待的消费者。dequeue
中,若队列为空则阻塞等待;一旦有元素,取出并返回。
生产者 (
producer
)每个生产者线程生成
count
个整数,并将它们入队。使用简单的睡眠模拟生产延迟。
完成后,通过
std::atomic<bool>
标记自身已结束。
消费者 (
consumer
)不断从队列取数据。
检测所有生产者完成标志且队列空时退出。
通过睡眠模拟消费延迟。
主函数 (
main
)创建并启动两个生产者线程和一个消费者线程,最后等待它们结束。
这样就完成了一个基于链表队列的经典生产者—消费者示例。