JDK 8 LinkedBlockingQueue 源码详解(详细注释版)

JDK 8 LinkedBlockingQueue 源码详解(详细注释版)

1. 类定义和基本属性

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    // 序列化版本号
    private static final long serialVersionUID = -6903933977591709194L;

    /**
     * 链表节点内部类
     * 每个节点包含元素值和指向下一个节点的引用
     * 构成单向链表结构
     */
    static class Node<E> {
        E item;           // 节点存储的元素
        Node<E> next;     // 指向下一个节点的引用

        // 构造方法
        Node(E x) { item = x; }
    }

    /**
     * 队列的最大容量
     * 如果未指定容量,默认为Integer.MAX_VALUE(无界队列)
     * 用于控制队列大小,防止内存溢出
     */
    private final int capacity;

    /**
     * 当前队列中的元素数量
     * 使用原子类保证多线程环境下的可见性和原子性
     * 避免了使用锁来更新计数
     */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 队首节点的引用(链表头部)
     * take操作从这里开始
     * 使用transient关键字避免序列化
     */
    transient Node<E> head;

    /**
     * 队尾节点的引用(链表尾部)
     * put操作在这里结束
     * 使用transient关键字避免序列化
     */
    private transient Node<E> last;

    /**
     * 取出元素时使用的锁
     * 专门用于保护take、poll等取出操作
     * 实现生产者消费者锁分离,提高并发性能
     */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**
     * 等待取出元素的条件队列
     * 当队列为空时,消费者线程在此等待
     * 有新元素入队时会被唤醒
     */
    private final Condition notEmpty = takeLock.newCondition();

    /**
     * 插入元素时使用的锁
     * 专门用于保护put、offer等插入操作
     * 实现生产者消费者锁分离,提高并发性能
     */
    private final ReentrantLock putLock = new ReentrantLock();

    /**
     * 等待插入元素的条件队列
     * 当队列已满时,生产者线程在此等待
     * 有元素出队时会被唤醒
     */
    private final Condition notFull = putLock.newCondition();

    /**
     * 用于信号控制的内部类
     * 在序列化时用于控制锁的状态
     */
    private class LBQSpliterator implements Spliterator<E> {
        // 省略实现细节
    }

2. 构造方法(详细注释)

    /**
     * 默认构造方法(无界队列)
     * 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue
     * 理论上可以存储任意数量的元素(受限于内存)
     * 
     * 构造过程:
     * 1. 设置容量为最大整数值
     * 2. 初始化空的链表结构(哨兵节点)
     * 3. head和last都指向同一个哨兵节点
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE); // 调用指定容量的构造方法
    }

    /**
     * 指定容量的构造方法
     * 创建一个指定容量的LinkedBlockingQueue
     * @param capacity 队列容量,必须大于0
     * @throws IllegalArgumentException 如果capacity小于等于0
     * 
     * 容量选择说明:
     * - 有界队列:capacity < Integer.MAX_VALUE
     * - 无界队列:capacity = Integer.MAX_VALUE
     * - 建议根据实际需求和系统资源设置合适的容量
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException(); // 验证容量
        this.capacity = capacity; // 设置容量
        last = head = new Node<E>(null); // 创建哨兵节点,初始化链表
    }

    /**
     * 指定初始集合的构造方法
     * 创建一个包含指定集合元素的LinkedBlockingQueue
     * 容量默认为Integer.MAX_VALUE
     * @param c 包含初始元素的集合
     * @throws NullPointerException 如果c为null
     * 
     * 初始化过程:
     * 1. 调用默认构造方法创建队列
     * 2. 将集合元素逐个添加到队列中
     * 3. 使用putLock保护批量插入操作
     */
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE); // 创建无界队列
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // 获取插入锁
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException(); // 不允许null元素
                if (n == capacity)
                    throw new IllegalStateException("Queue full"); // 队列满
                enqueue(new Node<E>(e)); // 入队
                ++n;
            }
            count.set(n); // 设置元素计数
        } finally {
            putLock.unlock(); // 释放插入锁
        }
    }

3. 核心入队方法(详细注释)

    /**
     * 在队尾插入元素(非阻塞)
     * 如果队列已满,直接返回false
     * @param e 要插入的元素
     * @return 如果成功插入返回true,否则返回false
     * @throws NullPointerException 如果e为null
     * 
     * 操作流程:
     * 1. 检查元素不为null
     * 2. 获取插入锁
     * 3. 检查队列是否已满
     * 4. 如果未满,插入元素并更新状态
     * 5. 如果插入前队列为空,通知等待取元素的线程
     * 6. 如果队列未满,通知等待插入元素的线程
     * 7. 释放插入锁
     * 
     * 锁分离优势:
     * - 生产者和消费者使用不同的锁
     * - 提高并发性能
     * - 减少锁竞争
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException(); // 检查null
        final AtomicInteger count = this.count;
        if (count.get() == capacity) // 如果队列已满
            return false; // 直接返回false
        int c = -1;
        Node<E> node = new Node<E>(e); // 创建新节点
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // 获取插入锁
        try {
            if (count.get() < capacity) { // 再次检查队列是否已满
                enqueue(node); // 插入节点
                c = count.getAndIncrement(); // 增加计数并获取旧值
            }
        } finally {
            putLock.unlock(); // 释放插入锁
        }
        if (c == 0) // 如果插入前队列为空
            signalNotEmpty(); // 通知等待取元素的线程
        if (c + 1 < capacity) // 如果队列未满
            signalNotFull(); // 通知等待插入元素的线程
        return c >= 0; // 返回是否成功插入
    }

    /**
     * 在队尾插入元素(可超时阻塞)
     * 如果队列已满,等待指定时间直到有空间或超时
     * @param e 要插入的元素
     * @param timeout 等待时间
     * @param unit 时间单位
     * @return 如果成功插入返回true,否则返回false
     * @throws InterruptedException 如果线程被中断
     * @throws NullPointerException 如果e为null
     * 
     * 超时机制:
     * 1. 如果队列未满,直接插入
     * 2. 如果队列已满,等待指定时间
     * 3. 在等待期间可被中断
     * 4. 超时后如果仍未插入成功,返回false
     * 5. 使用nanos精确控制超时时间
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException(); // 检查null
        long nanos = unit.toNanos(timeout); // 转换为纳秒
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly(); // 可中断地获取插入锁
        try {
            while (count.get() == capacity) { // 如果队列已满
                if (nanos <= 0) { // 如果超时时间已到
                    return false; // 返回false
                }
                nanos = notFull.awaitNanos(nanos); // 在notFull条件上等待
            }
            enqueue(new Node<E>(e)); // 插入元素
            c = count.getAndIncrement(); // 增加计数并获取旧值
        } finally {
            putLock.unlock(); // 释放插入锁
        }
        if (c == 0) // 如果插入前队列为空
            signalNotEmpty(); // 通知等待取元素的线程
        if (c + 1 < capacity) // 如果队列未满
            signalNotFull(); // 通知等待插入元素的线程
        return true; // 返回true
    }

    /**
     * 在队尾插入元素(阻塞)
     * 如果队列已满,一直等待直到有空间
     * @param e 要插入的元素
     * @throws InterruptedException 如果线程被中断
     * @throws NullPointerException 如果e为null
     * 
     * 阻塞特性:
     * 1. 如果队列未满,直接插入
     * 2. 如果队列已满,无限期等待
     * 3. 在等待期间可被中断
     * 4. 有空间时自动唤醒并插入
     * 5. 保证不会丢失元素
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException(); // 检查null
        int c = -1;
        Node<E> node = new Node<E>(e); // 创建新节点
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly(); // 可中断地获取插入锁
        try {
            while (count.get() == capacity) { // 如果队列已满
                notFull.await(); // 在notFull条件上等待
            }
            enqueue(node); // 插入节点
            c = count.getAndIncrement(); // 增加计数并获取旧值
        } finally {
            putLock.unlock(); // 释放插入锁
        }
        if (c == 0) // 如果插入前队列为空
            signalNotEmpty(); // 通知等待取元素的线程
        if (c + 1 < capacity) // 如果队列未满
            signalNotFull(); // 通知等待插入元素的线程
    }

    /**
     * 实际的入队操作
     * 在已获得插入锁的情况下执行插入操作
     * @param node 要插入的节点
     * 
     * 链表插入操作:
     * 1. 将新节点链接到队尾
     * 2. 更新last指针
     * 3. 保持链表结构的完整性
     * 4. 时间复杂度O(1)
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node; // 链接节点并更新last指针
    }

4. 核心出队方法(详细注释)

    /**
     * 从队首取出元素(非阻塞)
     * 如果队列为空,直接返回null
     * @return 队首元素,如果队列为空返回null
     * 
     * 操作流程:
     * 1. 获取取出锁
     * 2. 检查队列是否为空
     * 3. 如果非空,取出元素并更新状态
     * 4. 如果取出前队列已满,通知等待插入元素的线程
     * 5. 如果队列非空,通知等待取元素的线程
     * 6. 释放取出锁
     * 
     * 锁分离优势:
     * - 生产者和消费者使用不同的锁
     * - 提高并发性能
     * - 减少锁竞争
     */
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0) // 如果队列为空
            return null; // 直接返回null
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock(); // 获取取出锁
        try {
            if (count.get() > 0) { // 再次检查队列是否非空
                x = dequeue(); // 取出元素
                c = count.getAndDecrement(); // 减少计数并获取旧值
            }
        } finally {
            takeLock.unlock(); // 释放取出锁
        }
        if (c > 1) // 如果取出后队列非空
            signalNotEmpty(); // 通知等待取元素的线程
        if (c == capacity) // 如果取出前队列已满
            signalNotFull(); // 通知等待插入元素的线程
        return x; // 返回取出的元素
    }

    /**
     * 从队首取出元素(可超时阻塞)
     * 如果队列为空,等待指定时间直到有元素或超时
     * @param timeout 等待时间
     * @param unit 时间单位
     * @return 队首元素,如果超时返回null
     * @throws InterruptedException 如果线程被中断
     * 
     * 超时机制:
     * 1. 如果队列非空,直接取出元素
     * 2. 如果队列为空,等待指定时间
     * 3. 在等待期间可被中断
     * 4. 超时后如果仍为空,返回null
     * 5. 使用nanos精确控制超时时间
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout); // 转换为纳秒
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); // 可中断地获取取出锁
        try {
            while (count.get() == 0) { // 如果队列为空
                if (nanos <= 0) { // 如果超时时间已到
                    return null; // 返回null
                }
                nanos = notEmpty.awaitNanos(nanos); // 在notEmpty条件上等待
            }
            x = dequeue(); // 取出元素
            c = count.getAndDecrement(); // 减少计数并获取旧值
        } finally {
            takeLock.unlock(); // 释放取出锁
        }
        if (c > 1) // 如果取出后队列非空
            signalNotEmpty(); // 通知等待取元素的线程
        if (c == capacity) // 如果取出前队列已满
            signalNotFull(); // 通知等待插入元素的线程
        return x; // 返回取出的元素
    }

    /**
     * 从队首取出元素(阻塞)
     * 如果队列为空,一直等待直到有元素
     * @return 队首元素
     * @throws InterruptedException 如果线程被中断
     * 
     * 阻塞特性:
     * 1. 如果队列非空,直接取出元素
     * 2. 如果队列为空,无限期等待
     * 3. 在等待期间可被中断
     * 4. 有元素时自动唤醒并取出
     * 5. 保证不会返回null
     */
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); // 可中断地获取取出锁
        try {
            while (count.get() == 0) { // 如果队列为空
                notEmpty.await(); // 在notEmpty条件上等待
            }
            x = dequeue(); // 取出元素
            c = count.getAndDecrement(); // 减少计数并获取旧值
        } finally {
            takeLock.unlock(); // 释放取出锁
        }
        if (c > 1) // 如果取出后队列非空
            signalNotEmpty(); // 通知等待取元素的线程
        if (c == capacity) // 如果取出前队列已满
            signalNotFull(); // 通知等待插入元素的线程
        return x; // 返回取出的元素
    }

    /**
     * 实际的出队操作
     * 在已获得取出锁的情况下执行取出操作
     * @return 取出的元素
     * 
     * 链表取出操作:
     * 1. 从head节点的下一个节点取出元素
     * 2. 更新head指针
     * 3. 清空旧节点的引用(帮助GC)
     * 4. 保持链表结构的完整性
     * 5. 时间复杂度O(1)
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head; // 获取当前head节点
        Node<E> first = h.next; // 获取第一个有效节点
        h.next = h; // help GC,断开旧head节点的链接
        head = first; // 更新head指针
        E x = first.item; // 获取元素
        first.item = null; // 清空引用,帮助GC
        return x; // 返回元素
    }

    /**
     * 获取但不移除队首元素
     * @return 队首元素,如果队列为空返回null
     * 
     * 查看操作特点:
     * 1. 不改变队列状态
     * 2. 非阻塞操作
     * 3. 线程安全
     * 4. 可能返回null(队列为空时)
     */
    public E peek() {
        if (count.get() == 0) // 如果队列为空
            return null; // 返回null
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock(); // 获取取出锁
        try {
            Node<E> first = head.next; // 获取第一个有效节点
            if (first == null) // 如果节点为空
                return null; // 返回null
            else
                return first.item; // 返回元素
        } finally {
            takeLock.unlock(); // 释放取出锁
        }
    }

5. 批量操作方法(详细注释)

    /**
     * 批量取出元素到指定集合中
     * 从队列中取出最多maxElements个元素添加到指定集合
     * @param c 用于存储取出元素的集合
     * @param maxElements 最大取出元素数量
     * @return 实际取出的元素数量
     * @throws UnsupportedOperationException 如果c不支持添加操作
     * @throws ClassCastException 如果元素类型不兼容
     * @throws NullPointerException 如果c为null
     * @throws IllegalArgumentException 如果c是当前队列
     * 
     * 批量取出特点:
     * 1. 原子性:一次性取出多个元素
     * 2. 线程安全:使用锁保护
     * 3. 高效性:减少锁获取次数
     * 4. 可控制数量:指定最大取出数量
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException(); // 检查集合不为null
        if (c == this)
            throw new IllegalArgumentException(); // 不能添加到自身
        if (maxElements <= 0)
            return 0; // 如果最大数量小于等于0,返回0
        boolean signalNotFull = false;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock(); // 获取取出锁
        try {
            int n = Math.min(maxElements, count.get()); // 实际取出数量
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                while (i < n) {
                    Node<E> p = h.next;
                    c.add(p.item); // 添加到指定集合
                    p.item = null; // 清空引用
                    h.next = h; // help GC
                    h = p;
                    ++i;
                }
                return n; // 返回实际取出数量
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    // assert h.item == null;
                    head = h;
                    signalNotFull = (count.getAndAdd(-i) == capacity);
                }
            }
        } finally {
            takeLock.unlock(); // 释放取出锁
            if (signalNotFull)
                signalNotFull(); // 通知等待插入的线程
        }
    }

    /**
     * 批量取出所有元素到指定集合中
     * @param c 用于存储取出元素的集合
     * @return 实际取出的元素数量
     */
    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE); // 取出所有元素
    }

6. 查询和状态方法(详细注释)

    /**
     * 返回队列中元素的数量
     * @return 队列中元素的数量
     * 
     * size()操作特点:
     * 1. 线程安全:使用AtomicInteger保证原子性
     * 2. 快速操作:直接返回计数值
     * 3. 瞬时值:反映调用时刻的状态
     * 4. 无锁操作:避免锁竞争
     */
    public int size() {
        return count.get(); // 返回原子计数值
    }

    /**
     * 返回队列的剩余容量
     * @return 队列还能容纳的元素数量
     * 
     * remainingCapacity()特点:
     * 1. 线程安全:基于原子操作
     * 2. 动态计算:总容量减去当前元素数量
     * 3. 瞬时值:反映调用时刻的状态
     * 4. 无界队列:返回Integer.MAX_VALUE
     */
    public int remainingCapacity() {
        return capacity - count.get(); // 返回剩余容量
    }

    /**
     * 判断队列是否为空
     * @return 如果队列为空返回true,否则返回false
     * 
     * isEmpty()特点:
     * 1. 线程安全:基于原子操作
     * 2. 快速判断:检查计数是否为0
     * 3. 瞬时判断:反映调用时刻的状态
     * 4. 无锁操作:避免锁竞争
     */
    public boolean isEmpty() {
        return count.get() == 0; // 判断计数是否为0
    }

    /**
     * 判断队列是否包含指定元素
     * @param o 要查找的元素
     * @return 如果包含返回true,否则返回false
     * 
     * contains()特点:
     * 1. 线程安全:获取取出锁保护
     * 2. 全遍历:需要遍历整个队列
     * 3. 时间复杂度:O(n)
     * 4. 可能阻塞:如果其他线程正在操作队列
     */
    public boolean contains(Object o) {
        if (o == null) return false; // null元素不包含
        fullyLock(); // 获取两个锁
        try {
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item)) // 比较元素
                    return true;
            return false; // 没有找到
        } finally {
            fullyUnlock(); // 释放两个锁
        }
    }

    /**
     * 移除队列中的指定元素
     * @param o 要移除的元素
     * @return 如果成功移除返回true,否则返回false
     * 
     * remove()特点:
     * 1. 线程安全:获取两个锁保护
     * 2. 全遍历:需要遍历整个队列
     * 3. 时间复杂度:O(n)
     * 4. 只移除第一个匹配的元素
     * 5. 可能阻塞:如果其他线程正在操作队列
     */
    public boolean remove(Object o) {
        if (o == null) return false; // null元素不能移除
        fullyLock(); // 获取两个锁
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) { // 找到匹配元素
                    unlink(p, trail); // 断开链接
                    return true; // 返回true
                }
            }
            return false; // 没有找到
        } finally {
            fullyUnlock(); // 释放两个锁
        }
    }

    /**
     * 断开指定节点的链接
     * @param p 要断开的节点
     * @param trail 前驱节点
     */
    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

7. 锁操作方法(详细注释)

    /**
     * 获取两个锁(全锁)
     * 用于需要同时保护队列两端的操作
     * @throws InterruptedException 如果线程被中断
     * 
     * 锁获取顺序:
     * 1. 先获取取出锁
     * 2. 再获取插入锁
     * 3. 避免死锁:统一的锁获取顺序
     */
    void fullyLock() throws InterruptedException {
        putLock.lock(); // 获取插入锁
        takeLock.lock(); // 获取取出锁
    }

    /**
     * 释放两个锁(全解锁)
     * 与fullyLock()对应
     * 
     * 锁释放顺序:
     * 1. 先释放取出锁
     * 2. 再释放插入锁
     * 3. 与获取顺序相反
     */
    void fullyUnlock() {
        takeLock.unlock(); // 释放取出锁
        putLock.unlock(); // 释放插入锁
    }

    /**
     * 通知等待取元素的线程
     * 在插入元素后调用
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock(); // 获取取出锁
        try {
            notEmpty.signal(); // 通知等待的消费者
        } finally {
            takeLock.unlock(); // 释放取出锁
        }
    }

    /**
     * 通知等待插入元素的线程
     * 在取出元素后调用
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // 获取插入锁
        try {
            notFull.signal(); // 通知等待的生产者
        } finally {
            putLock.unlock(); // 释放插入锁
        }
    }

8. 迭代器实现(详细注释)

    /**
     * 返回队列的迭代器
     * @return 队列的迭代器
     * 
     * 迭代器特点:
     * 1. 弱一致性:反映创建时的状态
     * 2. 不抛出ConcurrentModificationException
     * 3. 线程安全:使用锁保护
     * 4. 支持remove()操作
     * 5. 按FIFO顺序遍历
     */
    public Iterator<E> iterator() {
        return new Itr(); // 返回自定义迭代器
    }

    /**
     * LinkedBlockingQueue的迭代器实现
     */
    private class Itr implements Iterator<E> {
        /**
         * 当前节点
         */
        private Node<E> current;

        /**
         * 上次返回的节点
         */
        private Node<E> lastRet;

        /**
         * 当前节点的元素
         */
        private E currentElement;

        /**
         * 构造方法
         */
        Itr() {
            fullyLock(); // 获取两个锁
            try {
                current = head.next; // 从第一个有效节点开始
                if (current != null)
                    currentElement = current.item; // 获取元素
            } finally {
                fullyUnlock(); // 释放两个锁
            }
        }

        /**
         * 判断是否还有下一个元素
         * @return 如果还有下一个元素返回true,否则返回false
         */
        public boolean hasNext() {
            return current != null; // 判断当前节点是否为空
        }

        /**
         * 获取下一个元素
         * @return 下一个元素
         * @throws NoSuchElementException 如果没有下一个元素
         */
        public E next() {
            fullyLock(); // 获取两个锁
            try {
                if (current == null)
                    throw new NoSuchElementException(); // 没有下一个元素
                E x = currentElement; // 获取当前元素
                lastRet = current; // 记录上次返回的节点
                current = current.next; // 移动到下一个节点
                currentElement = (current == null) ? null : current.item; // 获取下一个元素
                return x; // 返回当前元素
            } finally {
                fullyUnlock(); // 释放两个锁
            }
        }

        /**
         * 移除上次返回的元素
         * @throws IllegalStateException 如果没有可移除的元素
         */
        public void remove() {
            if (lastRet == null)
                throw new IllegalStateException(); // 没有可移除的元素
            fullyLock(); // 获取两个锁
            try {
                Node<E> node = lastRet;
                lastRet = null;
                for (Node<E> trail = head, p = trail.next;
                     p != null;
                     trail = p, p = p.next) {
                    if (p == node) { // 找到要移除的节点
                        unlink(p, trail); // 断开链接
                        break;
                    }
                }
            } finally {
                fullyUnlock(); // 释放两个锁
            }
        }
    }

9. 转换和克隆方法(详细注释)

    /**
     * 转换为数组
     * @return 包含队列所有元素的数组
     * 
     * toArray()特点:
     * 1. 线程安全:获取两个锁保护
     * 2. 保持顺序:按照队列的FIFO顺序
     * 3. 瞬时快照:反映调用时刻的状态
     * 4. 完整复制:包含所有元素
     */
    public Object[] toArray() {
        fullyLock(); // 获取两个锁
        try {
            int size = count.get(); // 获取元素数量
            Object[] a = new Object[size]; // 创建数组
            int k = 0;
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = p.item; // 复制元素
            return a; // 返回数组
        } finally {
            fullyUnlock(); // 释放两个锁
        }
    }

    /**
     * 转换为指定类型的数组
     * @param a 指定类型的数组
     * @return 转换后的数组
     * @throws ArrayStoreException 如果数组类型不兼容
     * @throws NullPointerException 如果a为null
     */
    @SuppressWarnings("unchecked")
    public <T> T[] toArray(T[] a) {
        fullyLock(); // 获取两个锁
        try {
            int size = count.get(); // 获取元素数量
            if (a.length < size)
                a = (T[])java.lang.reflect.Array.newInstance(
                    a.getClass().getComponentType(), size); // 创建新数组
            int k = 0;
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = (T)p.item; // 复制元素
            if (a.length > k)
                a[k] = null; // 清空多余位置
            return a; // 返回数组
        } finally {
            fullyUnlock(); // 释放两个锁
        }
    }

    /**
     * 克隆队列
     * @return 克隆后的队列
     * 
     * clone()特点:
     * 1. 浅克隆:只克隆队列结构,不克隆元素
     * 2. 独立状态:新队列有自己的锁和条件
     * 3. 相同元素:包含相同的元素引用
     * 4. 无界队列:克隆后的队列也是无界的
     */
    public Object clone() {
        try {
            @SuppressWarnings("unchecked")
            LinkedBlockingQueue<E> other = (LinkedBlockingQueue<E>) super.clone();
            other.count = new AtomicInteger(); // 创建新的计数器
            other.head = other.last = new Node<E>(null); // 创建新的哨兵节点
            other.putLock = new ReentrantLock(); // 创建新的插入锁
            other.notFull = other.putLock.newCondition(); // 创建新的notFull条件
            other.takeLock = new ReentrantLock(); // 创建新的取出锁
            other.notEmpty = other.takeLock.newCondition(); // 创建新的notEmpty条件
            // 遍历原队列,将元素添加到新队列
            fullyLock(); // 获取两个锁
            try {
                for (Node<E> p = head.next; p != null; p = p.next) {
                    other.put(p.item); // 添加元素
                }
            } finally {
                fullyUnlock(); // 释放两个锁
            }
            return other;
        } catch (CloneNotSupportedException e) {
            throw new InternalError();
        }
    }

10. LinkedBlockingQueue 的特点分析

核心设计理念:

/**
 * LinkedBlockingQueue的核心设计思想:
 * 
 * 1. 链表实现的阻塞队列:
 *    - 使用单向链表存储元素
 *    - 支持有界和无界两种模式
 *    - 动态内存分配,按需扩展
 * 
 * 2. 双锁分离机制:
 *    - 使用两个独立的ReentrantLock
 *    - putLock保护插入操作
 *    - takeLock保护取出操作
 *    - 生产者和消费者可以并行操作
 * 
 * 3. 原子计数器:
 *    - 使用AtomicInteger跟踪元素数量
 *    - 避免锁竞争,提高性能
 *    - 保证计数的原子性和可见性
 * 
 * 4. 条件队列机制:
 *    - notEmpty条件用于等待元素
 *    - notFull条件用于等待空间
 *    - 支持阻塞和超时操作
 * 
 * 5. 哨兵节点设计:
 *    - head节点是哨兵节点,不存储实际元素
 *    - 简化链表操作的边界处理
 *    - 避免空指针异常
 * 
 * 6. 内存管理:
 *    - 及时清空节点引用,帮助GC
 *    - 链式结构便于内存回收
 *    - 避免内存泄漏
 * 
 * 适用场景:
 * - 高并发的生产者-消费者模式
 * - 线程池任务队列
 * - 需要高性能队列操作的场景
 * - 可以接受无界队列的场景
 */

性能特征分析:

/**
 * LinkedBlockingQueue的性能特征:
 * 
 * 时间复杂度:
 * - offer(e): O(1) - 非阻塞插入
 * - put(e): O(1) 平均,O(∞) 最坏(阻塞等待)
 * - poll(): O(1) - 非阻塞取出
 * - take(): O(1) 平均,O(∞) 最坏(阻塞等待)
 * - size(): O(1) - 原子计数器
 * - contains(o): O(n) - 需要遍历队列
 * - remove(o): O(n) - 需要遍历并断开链接
 * 
 * 空间复杂度:
 * - O(n) - 链表存储,按需分配
 * - 每个节点额外的next指针开销
 * - 哨兵节点的固定开销
 * 
 * 并发特性:
 * - 双锁机制:生产者消费者独立锁
 * - 阻塞操作:支持生产者和消费者的协调
 * - 原子计数:避免锁竞争
 * - 内存一致性:通过锁保证可见性
 * 
 * 锁竞争:
 * - 生产者之间可能有锁竞争
 * - 消费者之间可能有锁竞争
 * - 生产者和消费者可以并行操作
 * - 相比单锁机制性能更好
 * 
 * 内存使用:
 * - 动态分配,按需扩展
 * - 无界队列可能消耗大量内存
 * - 及时GC,避免内存泄漏
 * 
 * 吞吐量:
 * - 高并发下吞吐量优于ArrayBlockingQueue
 * - 双锁分离减少锁竞争
 * - 适合高并发场景
 */

11. 使用示例和最佳实践

/**
 * 使用示例:
 * 
 * // 基本使用(无界队列)
 * LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
 * 
 * // 指定容量的有界队列
 * LinkedBlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);
 * 
 * // 生产者线程
 * Thread producer = new Thread(() -> {
 *     try {
 *         for (int i = 0; i < 1000; i++) {
 *             String item = "item-" + i;
 *             queue.put(item); // 阻塞插入
 *             System.out.println("Produced: " + item);
 *         }
 *     } catch (InterruptedException e) {
 *         Thread.currentThread().interrupt();
 *     }
 * });
 * 
 * // 消费者线程
 * Thread consumer = new Thread(() -> {
 *     try {
 *         while (true) {
 *             String item = queue.take(); // 阻塞取出
 *             System.out.println("Consumed: " + item);
 *             Thread.sleep(100); // 模拟处理时间
 *         }
 *     } catch (InterruptedException e) {
 *         Thread.currentThread().interrupt();
 *     }
 * });
 * 
 * producer.start();
 * consumer.start();
 * 
 * // 线程池使用示例
 * ExecutorService executor = Executors.newFixedThreadPool(10);
 * LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
 * 
 * // 提交大量任务
 * for (int i = 0; i < 10000; i++) {
 *     final int taskId = i;
 *     executor.submit(() -> {
 *         System.out.println("Task " + taskId + " executed by " + 
 *                           Thread.currentThread().getName());
 *     });
 * }
 * 
 * 最佳实践:
 * 
 * 1. 合理设置队列容量:
 *    // 根据系统资源和业务需求设置合适容量
 *    int capacity = 1000;
 *    LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(capacity);
 * 
 * 2. 使用非阻塞操作避免死锁:
 *    // 在某些场景下使用非阻塞操作
 *    if (queue.offer(task)) {
 *        System.out.println("Task added successfully");
 *    } else {
 *        System.out.println("Queue is full, task rejected");
 *        // 实现拒绝策略
 *    }
 * 
 * 3. 设置合理的超时时间:
 *    // 避免无限期等待
 *    try {
 *        Task task = queue.poll(5, TimeUnit.SECONDS);
 *        if (task != null) {
 *            processTask(task);
 *        } else {
 *            System.out.println("No task available within timeout");
 *        }
 *    } catch (InterruptedException e) {
 *        Thread.currentThread().interrupt();
 *    }
 * 
 * 4. 处理中断异常:
 *    // 正确处理InterruptedException
 *    try {
 *        Task task = queue.take();
 *        processTask(task);
 *    } catch (InterruptedException e) {
 *        Thread.currentThread().interrupt(); // 恢复中断状态
 *        System.out.println("Consumer interrupted");
 *    }
 * 
 * 5. 监控队列状态:
 *    // 定期监控队列使用情况
 *    int size = queue.size();
 *    int remaining = queue.remainingCapacity();
 *    double usage = (double) size / (size + remaining);
 *    System.out.println("Queue usage: " + String.format("%.2f%%", usage * 100));
 * 
 * 6. 批量操作提高效率:
 *    // 使用批量操作减少锁竞争
 *    List<Task> tasks = new ArrayList<>();
 *    int count = queue.drainTo(tasks, 50); // 批量取出最多50个任务
 *    for (Task task : tasks) {
 *        processTask(task);
 *    }
 * 
 * 7. 避免队列过载:
 *    // 实现背压机制
 *    if (queue.size() > 800) {
 *        System.out.println("Queue is nearly full, slowing down production");
 *        Thread.sleep(1000); // 暂停生产
 *    }
 * 
 * 8. 正确使用迭代器:
 *    // 迭代器是弱一致性的
 *    Iterator<String> it = queue.iterator();
 *    while (it.hasNext()) {
 *        String item = it.next();
 *        // 处理元素
 *        // 注意:迭代过程中队列可能发生变化
 *    }
 * 
 * 9. 内存管理:
 *    // 对于大对象,注意内存使用
 *    if (queue.size() > 10000) {
 *        System.out.println("Warning: Large queue size, potential memory issue");
 *    }
 * 
 * 10. 异常处理:
 *    // 处理可能的异常情况
 *    try {
 *        queue.put(largeObject);
 *    } catch (OutOfMemoryError e) {
 *        System.out.println("Memory exhausted, consider reducing queue size");
 *        // 实现降级策略
 *    }
 */

12. 与其他阻塞队列的比较

/**
 * LinkedBlockingQueue vs ArrayBlockingQueue vs SynchronousQueue:
 * 
 * LinkedBlockingQueue:
 * - 链表实现,动态内存分配
 * - 双锁机制,生产者消费者独立锁
 * - 支持有界和无界模式
 * - 高并发性能好
 * - 内存使用动态增长
 * 
 * ArrayBlockingQueue:
 * - 数组实现,固定内存分配
 * - 单锁机制,生产者消费者共享锁
 * - 必须指定容量(有界队列)
 * - 内存使用固定
 * - 适合需要有界缓冲的场景
 * 
 * SynchronousQueue:
 * - 容量为0,直接传递
 * - 不存储元素,生产者消费者直接配对
 * - 最低延迟,最高吞吐量
 * - 适合直接传递的场景
 * 
 * PriorityBlockingQueue:
 * - 无界优先级队列
 * - 按优先级排序
 * - 适合需要优先级处理的场景
 * 
 * DelayQueue:
 * - 无界延迟队列
 * - 只有延迟到期的元素才能取出
 * - 适合定时任务调度
 * 
 * 选择建议:
 * - 高并发场景:LinkedBlockingQueue
 * - 需要有界缓冲:ArrayBlockingQueue
 * - 直接传递:SynchronousQueue
 * - 优先级处理:PriorityBlockingQueue
 * - 延迟处理:DelayQueue
 * 
 * 性能对比:
 * - 吞吐量:LinkedBlockingQueue > ArrayBlockingQueue > SynchronousQueue
 * - 内存使用:ArrayBlockingQueue < LinkedBlockingQueue < SynchronousQueue
 * - 延迟:SynchronousQueue < LinkedBlockingQueue < ArrayBlockingQueue
 * - 并发性:LinkedBlockingQueue > ArrayBlockingQueue > SynchronousQueue
 */

13. 总结

LinkedBlockingQueue 的核心特性:

  1. 链表实现

    • 使用单向链表存储元素
    • 动态内存分配,按需扩展
    • 支持有界和无界两种模式
  2. 双锁分离

    • putLock保护插入操作
    • takeLock保护取出操作
    • 生产者和消费者可以并行操作
    • 减少锁竞争,提高并发性能
  3. 原子计数

    • 使用AtomicInteger跟踪元素数量
    • 避免锁竞争,提高性能
    • 保证计数的原子性和可见性
  4. 多种阻塞策略

    • put()/take():无限期阻塞
    • offer()/poll():超时阻塞
    • offer()/poll():非阻塞操作
  5. 线程安全

    • 所有操作都通过锁保护
    • 保证原子性和可见性
    • 支持多个生产者和消费者

适用场景:

  • 高并发的生产者-消费者模式
  • 线程池任务队列
  • 需要高性能队列操作的场景
  • 可以接受无界队列的场景
  • 需要批量操作的场景

注意事项:

  • 无界队列可能导致内存溢出
  • 链表结构有额外的内存开销
  • 不允许存储null元素
  • 迭代器是弱一致性的
  • 大容量时注意内存使用

性能优化建议:

  1. 根据业务需求合理设置队列容量
  2. 正确处理中断异常
  3. 使用批量操作减少锁竞争
  4. 监控队列使用情况
  5. 实现合适的拒绝策略
  6. 避免队列过载和内存泄漏
  7. 定期清理不需要的元素
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值