【C++】多线程链表队列(生产者-消费者模式)

这个图表分为几个主要部分:

  1. 类定义部分

  • 展示了Node<T>结构体的成员和构造函数

  • 列出了LinkedListQueue<T>类的属性(如头指针、尾指针、互斥锁等)

  • 展示了LinkedListQueue<T>类的方法(包括构造函数、析构函数、入队、出队等操作)

  • enqueue操作流程

    • 展示了入队操作的完整流程,包括创建新节点、获取锁、更新队列指针和通知等待线程

    • 使用流程图清晰地表示了空队列和非空队列两种情况下的处理逻辑

  • dequeue操作流程

    • 展示了出队操作的完整流程,包括获取锁、条件等待、提取数据和更新队列状态

    • 通过流程图展示了出队后队列变为空和非空两种情况的不同处理

  • 线程交互部分

    • 可视化了两个生产者线程和一个消费者线程之间的交互关系

    • 展示了它们通过共享队列进行通信的方式

    • 标注了同步机制(互斥锁、条件变量和原子变量)

    • 概述了main函数中的线程创建和等待逻辑

    基于链表实现的线程安全队列(生产者–消费者模型)的完整示例。为简单起见,我们自定义一个 LinkedListQueue<T>,并用 C++11 的线程、互斥锁与条件变量来协调生产者和消费者的行为。

    #include <iostream>  // 引入输入输出流库,用于标准输入输出功能#include <thread>    // 引入线程库,用于创建和管理多个线程#include <mutex>     // 引入互斥锁库,用于保护共享数据,防止多个线程同时修改数据#include <condition_variable>  // 引入条件变量库,用于线程间的等待和通知机制#include <atomic>    // 引入原子操作库,用于提供多线程环境下的原子变量操作#include <chrono>    // 引入时间库,用于进行时间操作,如线程睡眠等
    // 定义链表节点模板,用于构建队列内部的链表结构template<typename T>struct Node {    T data;                // 存储节点数据    Node* next;            // 指向下一个节点的指针    Node(const T& d) : data(d), next(nullptr) {}  // 构造函数,初始化数据为 d,next 指针默认为空};
    // 定义基于链表实现的线程安全队列模板类template<typename T>class LinkedListQueue {public:    // 构造函数:初始化队列为空(头尾指针置空,大小为 0)    LinkedListQueue() : head_(nullptr), tail_(nullptr), size_(0) {}
        // 析构函数:在对象销毁时调用 clear() 清空队列,释放所有节点资源    ~LinkedListQueue() {        clear();    }
        // 入队操作:将新元素加入队列尾部    void enqueue(const T& value) {        Node<T>* new_node = new Node<T>(value);  // 创建一个新节点,用于存储入队的值        std::unique_lock<std::mutex> lock(mtx_);   // 加锁,确保对队列的修改操作是线程安全的        if (!tail_) {  // 如果队列为空(没有尾节点)            head_ = tail_ = new_node;  // 则同时将头和尾指针指向新节点        } else {  // 如果队列非空            tail_->next = new_node;  // 将新节点链接到当前尾节点之后            tail_ = new_node;        // 更新尾指针,使之指向新加入的节点        }        ++size_;  // 队列中元素数量加 1        // 通知一个等待的消费者线程,新数据已经加入队列        cv_.notify_one();    }
        // 出队操作(阻塞版):从队列头部移除元素并返回其值,如果队列为空则阻塞等待    T dequeue() {        std::unique_lock<std::mutex> lock(mtx_);  // 加锁,确保线程安全地访问和修改共享队列        // 等待直到队列中至少有一个元素(条件满足时解除阻塞)        cv_.wait(lock, [this]() { return size_ > 0; });
            Node<T>* node = head_;  // 保存当前头节点的地址        T value = node->data;   // 读取头节点中的数据,准备返回        head_ = head_->next;    // 将头指针移动到下一个节点        if (!head_) {  // 如果队列因此为空            tail_ = nullptr;  // 同时将尾指针置为 nullptr        }        --size_;  // 队列中元素数量减 1        lock.unlock();  // 解锁,允许其他线程访问队列        delete node;  // 释放之前保存的头节点,防止内存泄漏        return value; // 返回出队元素    }
        // 清空队列:删除队列中所有的节点,释放内存    void clear() {        std::unique_lock<std::mutex> lock(mtx_);  // 上锁,确保在清空过程中的操作是线程安全的        while (head_) {  // 当队列中还有节点时            Node<T>* tmp = head_;   // 保存当前头节点            head_ = head_->next;    // 将头指针移动到下一个节点            delete tmp;           // 删除保存的节点,释放内存        }        tail_ = nullptr;  // 重置尾指针为空        size_ = 0;        // 重置队列大小为 0    }
        // 获取当前队列中元素的数量,保证线程安全    size_t size() const {        std::lock_guard<std::mutex> lock(mtx_);  // 加锁(在函数退出时自动释放),保护对 size_ 的访问        return size_;  // 返回队列中元素的数量    }
    private:    Node<T>* head_;                      // 指向队列头部节点的指针    Node<T>* tail_;                      // 指向队列尾部节点的指针    mutable std::mutex mtx_;             // 互斥锁,用于保护队列中的数据,即使在 const 方法中也可修改    std::condition_variable cv_;         // 条件变量,用于实现当队列为空时的阻塞等待及通知机制    size_t size_;                        // 记录队列中当前元素的数量};
    // 模拟生产者函数:生成若干整数,将它们入队,并在生产完成后设置原子标记void producer(LinkedListQueue<int>& queue, int count, int id, std::atomic<bool>& done_flag) {    for (int i = 1; i <= count; ++i) { // 循环生成 count 个整数        int value = id * 100 + i;  // 生成的数值,通过生产者 id*100 + 当前计数,便于区分不同生产者产生的数据        queue.enqueue(value);      // 将生成的数值加入队列        std::cout << "[Producer " << id << "] enqueued " << value << "\n";  // 输出入队日志,显示生产者 id 及入队的数值        std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 休眠 100 毫秒,模拟生产时的间隔    }    // 生产任务完成后,通过原子变量设置完成标记,确保线程间安全通信    done_flag.store(true);}
    // 模拟消费者函数:不停从队列中取数据,直到所有生产者都完成并且队列为空void consumer(LinkedListQueue<int>& queue, std::atomic<bool>& done_flag1, std::atomic<bool>& done_flag2) {    while (true) { // 进入无限循环,不断尝试消费队列中的元素        // 判断退出条件:当两个生产者都已经完成且队列为空时,退出循环        if (done_flag1.load() && done_flag2.load() && queue.size() == 0) {            break;        }        int value = queue.dequeue();  // 从队列中阻塞式取出一个元素(若队列为空则等待)        std::cout << "    [Consumer] dequeued " << value << "\n";  // 输出出队日志,显示消费者取出的数值        std::this_thread::sleep_for(std::chrono::milliseconds(150));  // 休眠 150 毫秒,模拟消费过程中的间隔    }    std::cout << "[Consumer] no more items, exiting.\n";  // 消费结束后,输出提示信息}
    int main() {    LinkedListQueue<int> queue;  // 创建一个存放 int 类型数据的线程安全队列    std::atomic<bool> prod1_done(false), prod2_done(false);  // 初始化两个原子标记,用于标识两个生产者是否完成
        // 启动两个生产者线程和一个消费者线程    std::thread p1(producer, std::ref(queue), 10, 1, std::ref(prod1_done));     // 启动第一个生产者线程,生产 10 个数据,id 为 1    std::thread p2(producer, std::ref(queue), 10, 2, std::ref(prod2_done));     // 启动第二个生产者线程,生产 10 个数据,id 为 2    std::thread c(consumer, std::ref(queue), std::ref(prod1_done), std::ref(prod2_done));  // 启动消费者线程,等待并处理入队的数据
        p1.join();  // 等待第一个生产者线程执行结束    p2.join();  // 等待第二个生产者线程执行结束    c.join();   // 等待消费者线程执行结束
        std::cout << "All threads have finished.\n";  // 输出所有线程执行完毕的提示信息    return 0;  // 主函数返回 0,表示程序正常结束}

    说明:

    1. LinkedListQueue 类

    • 内部通过 Node<T> 链表维护队列元素。

    • 使用 std::mutex 和 std::condition_variable 实现线程安全与阻塞等待。

    • enqueue 在插入元素后调用 notify_one() 唤醒等待的消费者。

    • dequeue 中,若队列为空则阻塞等待;一旦有元素,取出并返回。

  • 生产者 (producer)

    • 每个生产者线程生成 count 个整数,并将它们入队。

    • 使用简单的睡眠模拟生产延迟。

    • 完成后,通过 std::atomic<bool> 标记自身已结束。

  • 消费者 (consumer)

    • 不断从队列取数据。

    • 检测所有生产者完成标志且队列空时退出。

    • 通过睡眠模拟消费延迟。

  • 主函数 (main)

    • 创建并启动两个生产者线程和一个消费者线程,最后等待它们结束。

    这样就完成了一个基于链表队列的经典生产者—消费者示例。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值