JDK 8 LinkedBlockingQueue 源码详解(详细注释版)
1. 类定义和基本属性
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 序列化版本号
private static final long serialVersionUID = -6903933977591709194L;
/**
* 链表节点内部类
* 每个节点包含元素值和指向下一个节点的引用
* 构成单向链表结构
*/
static class Node<E> {
E item; // 节点存储的元素
Node<E> next; // 指向下一个节点的引用
// 构造方法
Node(E x) { item = x; }
}
/**
* 队列的最大容量
* 如果未指定容量,默认为Integer.MAX_VALUE(无界队列)
* 用于控制队列大小,防止内存溢出
*/
private final int capacity;
/**
* 当前队列中的元素数量
* 使用原子类保证多线程环境下的可见性和原子性
* 避免了使用锁来更新计数
*/
private final AtomicInteger count = new AtomicInteger();
/**
* 队首节点的引用(链表头部)
* take操作从这里开始
* 使用transient关键字避免序列化
*/
transient Node<E> head;
/**
* 队尾节点的引用(链表尾部)
* put操作在这里结束
* 使用transient关键字避免序列化
*/
private transient Node<E> last;
/**
* 取出元素时使用的锁
* 专门用于保护take、poll等取出操作
* 实现生产者消费者锁分离,提高并发性能
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 等待取出元素的条件队列
* 当队列为空时,消费者线程在此等待
* 有新元素入队时会被唤醒
*/
private final Condition notEmpty = takeLock.newCondition();
/**
* 插入元素时使用的锁
* 专门用于保护put、offer等插入操作
* 实现生产者消费者锁分离,提高并发性能
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 等待插入元素的条件队列
* 当队列已满时,生产者线程在此等待
* 有元素出队时会被唤醒
*/
private final Condition notFull = putLock.newCondition();
/**
* 用于信号控制的内部类
* 在序列化时用于控制锁的状态
*/
private class LBQSpliterator implements Spliterator<E> {
// 省略实现细节
}
2. 构造方法(详细注释)
/**
* 默认构造方法(无界队列)
* 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue
* 理论上可以存储任意数量的元素(受限于内存)
*
* 构造过程:
* 1. 设置容量为最大整数值
* 2. 初始化空的链表结构(哨兵节点)
* 3. head和last都指向同一个哨兵节点
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE); // 调用指定容量的构造方法
}
/**
* 指定容量的构造方法
* 创建一个指定容量的LinkedBlockingQueue
* @param capacity 队列容量,必须大于0
* @throws IllegalArgumentException 如果capacity小于等于0
*
* 容量选择说明:
* - 有界队列:capacity < Integer.MAX_VALUE
* - 无界队列:capacity = Integer.MAX_VALUE
* - 建议根据实际需求和系统资源设置合适的容量
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException(); // 验证容量
this.capacity = capacity; // 设置容量
last = head = new Node<E>(null); // 创建哨兵节点,初始化链表
}
/**
* 指定初始集合的构造方法
* 创建一个包含指定集合元素的LinkedBlockingQueue
* 容量默认为Integer.MAX_VALUE
* @param c 包含初始元素的集合
* @throws NullPointerException 如果c为null
*
* 初始化过程:
* 1. 调用默认构造方法创建队列
* 2. 将集合元素逐个添加到队列中
* 3. 使用putLock保护批量插入操作
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE); // 创建无界队列
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 获取插入锁
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException(); // 不允许null元素
if (n == capacity)
throw new IllegalStateException("Queue full"); // 队列满
enqueue(new Node<E>(e)); // 入队
++n;
}
count.set(n); // 设置元素计数
} finally {
putLock.unlock(); // 释放插入锁
}
}
3. 核心入队方法(详细注释)
/**
* 在队尾插入元素(非阻塞)
* 如果队列已满,直接返回false
* @param e 要插入的元素
* @return 如果成功插入返回true,否则返回false
* @throws NullPointerException 如果e为null
*
* 操作流程:
* 1. 检查元素不为null
* 2. 获取插入锁
* 3. 检查队列是否已满
* 4. 如果未满,插入元素并更新状态
* 5. 如果插入前队列为空,通知等待取元素的线程
* 6. 如果队列未满,通知等待插入元素的线程
* 7. 释放插入锁
*
* 锁分离优势:
* - 生产者和消费者使用不同的锁
* - 提高并发性能
* - 减少锁竞争
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException(); // 检查null
final AtomicInteger count = this.count;
if (count.get() == capacity) // 如果队列已满
return false; // 直接返回false
int c = -1;
Node<E> node = new Node<E>(e); // 创建新节点
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 获取插入锁
try {
if (count.get() < capacity) { // 再次检查队列是否已满
enqueue(node); // 插入节点
c = count.getAndIncrement(); // 增加计数并获取旧值
}
} finally {
putLock.unlock(); // 释放插入锁
}
if (c == 0) // 如果插入前队列为空
signalNotEmpty(); // 通知等待取元素的线程
if (c + 1 < capacity) // 如果队列未满
signalNotFull(); // 通知等待插入元素的线程
return c >= 0; // 返回是否成功插入
}
/**
* 在队尾插入元素(可超时阻塞)
* 如果队列已满,等待指定时间直到有空间或超时
* @param e 要插入的元素
* @param timeout 等待时间
* @param unit 时间单位
* @return 如果成功插入返回true,否则返回false
* @throws InterruptedException 如果线程被中断
* @throws NullPointerException 如果e为null
*
* 超时机制:
* 1. 如果队列未满,直接插入
* 2. 如果队列已满,等待指定时间
* 3. 在等待期间可被中断
* 4. 超时后如果仍未插入成功,返回false
* 5. 使用nanos精确控制超时时间
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException(); // 检查null
long nanos = unit.toNanos(timeout); // 转换为纳秒
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 可中断地获取插入锁
try {
while (count.get() == capacity) { // 如果队列已满
if (nanos <= 0) { // 如果超时时间已到
return false; // 返回false
}
nanos = notFull.awaitNanos(nanos); // 在notFull条件上等待
}
enqueue(new Node<E>(e)); // 插入元素
c = count.getAndIncrement(); // 增加计数并获取旧值
} finally {
putLock.unlock(); // 释放插入锁
}
if (c == 0) // 如果插入前队列为空
signalNotEmpty(); // 通知等待取元素的线程
if (c + 1 < capacity) // 如果队列未满
signalNotFull(); // 通知等待插入元素的线程
return true; // 返回true
}
/**
* 在队尾插入元素(阻塞)
* 如果队列已满,一直等待直到有空间
* @param e 要插入的元素
* @throws InterruptedException 如果线程被中断
* @throws NullPointerException 如果e为null
*
* 阻塞特性:
* 1. 如果队列未满,直接插入
* 2. 如果队列已满,无限期等待
* 3. 在等待期间可被中断
* 4. 有空间时自动唤醒并插入
* 5. 保证不会丢失元素
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException(); // 检查null
int c = -1;
Node<E> node = new Node<E>(e); // 创建新节点
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 可中断地获取插入锁
try {
while (count.get() == capacity) { // 如果队列已满
notFull.await(); // 在notFull条件上等待
}
enqueue(node); // 插入节点
c = count.getAndIncrement(); // 增加计数并获取旧值
} finally {
putLock.unlock(); // 释放插入锁
}
if (c == 0) // 如果插入前队列为空
signalNotEmpty(); // 通知等待取元素的线程
if (c + 1 < capacity) // 如果队列未满
signalNotFull(); // 通知等待插入元素的线程
}
/**
* 实际的入队操作
* 在已获得插入锁的情况下执行插入操作
* @param node 要插入的节点
*
* 链表插入操作:
* 1. 将新节点链接到队尾
* 2. 更新last指针
* 3. 保持链表结构的完整性
* 4. 时间复杂度O(1)
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node; // 链接节点并更新last指针
}
4. 核心出队方法(详细注释)
/**
* 从队首取出元素(非阻塞)
* 如果队列为空,直接返回null
* @return 队首元素,如果队列为空返回null
*
* 操作流程:
* 1. 获取取出锁
* 2. 检查队列是否为空
* 3. 如果非空,取出元素并更新状态
* 4. 如果取出前队列已满,通知等待插入元素的线程
* 5. 如果队列非空,通知等待取元素的线程
* 6. 释放取出锁
*
* 锁分离优势:
* - 生产者和消费者使用不同的锁
* - 提高并发性能
* - 减少锁竞争
*/
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0) // 如果队列为空
return null; // 直接返回null
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // 获取取出锁
try {
if (count.get() > 0) { // 再次检查队列是否非空
x = dequeue(); // 取出元素
c = count.getAndDecrement(); // 减少计数并获取旧值
}
} finally {
takeLock.unlock(); // 释放取出锁
}
if (c > 1) // 如果取出后队列非空
signalNotEmpty(); // 通知等待取元素的线程
if (c == capacity) // 如果取出前队列已满
signalNotFull(); // 通知等待插入元素的线程
return x; // 返回取出的元素
}
/**
* 从队首取出元素(可超时阻塞)
* 如果队列为空,等待指定时间直到有元素或超时
* @param timeout 等待时间
* @param unit 时间单位
* @return 队首元素,如果超时返回null
* @throws InterruptedException 如果线程被中断
*
* 超时机制:
* 1. 如果队列非空,直接取出元素
* 2. 如果队列为空,等待指定时间
* 3. 在等待期间可被中断
* 4. 超时后如果仍为空,返回null
* 5. 使用nanos精确控制超时时间
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout); // 转换为纳秒
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 可中断地获取取出锁
try {
while (count.get() == 0) { // 如果队列为空
if (nanos <= 0) { // 如果超时时间已到
return null; // 返回null
}
nanos = notEmpty.awaitNanos(nanos); // 在notEmpty条件上等待
}
x = dequeue(); // 取出元素
c = count.getAndDecrement(); // 减少计数并获取旧值
} finally {
takeLock.unlock(); // 释放取出锁
}
if (c > 1) // 如果取出后队列非空
signalNotEmpty(); // 通知等待取元素的线程
if (c == capacity) // 如果取出前队列已满
signalNotFull(); // 通知等待插入元素的线程
return x; // 返回取出的元素
}
/**
* 从队首取出元素(阻塞)
* 如果队列为空,一直等待直到有元素
* @return 队首元素
* @throws InterruptedException 如果线程被中断
*
* 阻塞特性:
* 1. 如果队列非空,直接取出元素
* 2. 如果队列为空,无限期等待
* 3. 在等待期间可被中断
* 4. 有元素时自动唤醒并取出
* 5. 保证不会返回null
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 可中断地获取取出锁
try {
while (count.get() == 0) { // 如果队列为空
notEmpty.await(); // 在notEmpty条件上等待
}
x = dequeue(); // 取出元素
c = count.getAndDecrement(); // 减少计数并获取旧值
} finally {
takeLock.unlock(); // 释放取出锁
}
if (c > 1) // 如果取出后队列非空
signalNotEmpty(); // 通知等待取元素的线程
if (c == capacity) // 如果取出前队列已满
signalNotFull(); // 通知等待插入元素的线程
return x; // 返回取出的元素
}
/**
* 实际的出队操作
* 在已获得取出锁的情况下执行取出操作
* @return 取出的元素
*
* 链表取出操作:
* 1. 从head节点的下一个节点取出元素
* 2. 更新head指针
* 3. 清空旧节点的引用(帮助GC)
* 4. 保持链表结构的完整性
* 5. 时间复杂度O(1)
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head; // 获取当前head节点
Node<E> first = h.next; // 获取第一个有效节点
h.next = h; // help GC,断开旧head节点的链接
head = first; // 更新head指针
E x = first.item; // 获取元素
first.item = null; // 清空引用,帮助GC
return x; // 返回元素
}
/**
* 获取但不移除队首元素
* @return 队首元素,如果队列为空返回null
*
* 查看操作特点:
* 1. 不改变队列状态
* 2. 非阻塞操作
* 3. 线程安全
* 4. 可能返回null(队列为空时)
*/
public E peek() {
if (count.get() == 0) // 如果队列为空
return null; // 返回null
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // 获取取出锁
try {
Node<E> first = head.next; // 获取第一个有效节点
if (first == null) // 如果节点为空
return null; // 返回null
else
return first.item; // 返回元素
} finally {
takeLock.unlock(); // 释放取出锁
}
}
5. 批量操作方法(详细注释)
/**
* 批量取出元素到指定集合中
* 从队列中取出最多maxElements个元素添加到指定集合
* @param c 用于存储取出元素的集合
* @param maxElements 最大取出元素数量
* @return 实际取出的元素数量
* @throws UnsupportedOperationException 如果c不支持添加操作
* @throws ClassCastException 如果元素类型不兼容
* @throws NullPointerException 如果c为null
* @throws IllegalArgumentException 如果c是当前队列
*
* 批量取出特点:
* 1. 原子性:一次性取出多个元素
* 2. 线程安全:使用锁保护
* 3. 高效性:减少锁获取次数
* 4. 可控制数量:指定最大取出数量
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException(); // 检查集合不为null
if (c == this)
throw new IllegalArgumentException(); // 不能添加到自身
if (maxElements <= 0)
return 0; // 如果最大数量小于等于0,返回0
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // 获取取出锁
try {
int n = Math.min(maxElements, count.get()); // 实际取出数量
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item); // 添加到指定集合
p.item = null; // 清空引用
h.next = h; // help GC
h = p;
++i;
}
return n; // 返回实际取出数量
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock(); // 释放取出锁
if (signalNotFull)
signalNotFull(); // 通知等待插入的线程
}
}
/**
* 批量取出所有元素到指定集合中
* @param c 用于存储取出元素的集合
* @return 实际取出的元素数量
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE); // 取出所有元素
}
6. 查询和状态方法(详细注释)
/**
* 返回队列中元素的数量
* @return 队列中元素的数量
*
* size()操作特点:
* 1. 线程安全:使用AtomicInteger保证原子性
* 2. 快速操作:直接返回计数值
* 3. 瞬时值:反映调用时刻的状态
* 4. 无锁操作:避免锁竞争
*/
public int size() {
return count.get(); // 返回原子计数值
}
/**
* 返回队列的剩余容量
* @return 队列还能容纳的元素数量
*
* remainingCapacity()特点:
* 1. 线程安全:基于原子操作
* 2. 动态计算:总容量减去当前元素数量
* 3. 瞬时值:反映调用时刻的状态
* 4. 无界队列:返回Integer.MAX_VALUE
*/
public int remainingCapacity() {
return capacity - count.get(); // 返回剩余容量
}
/**
* 判断队列是否为空
* @return 如果队列为空返回true,否则返回false
*
* isEmpty()特点:
* 1. 线程安全:基于原子操作
* 2. 快速判断:检查计数是否为0
* 3. 瞬时判断:反映调用时刻的状态
* 4. 无锁操作:避免锁竞争
*/
public boolean isEmpty() {
return count.get() == 0; // 判断计数是否为0
}
/**
* 判断队列是否包含指定元素
* @param o 要查找的元素
* @return 如果包含返回true,否则返回false
*
* contains()特点:
* 1. 线程安全:获取取出锁保护
* 2. 全遍历:需要遍历整个队列
* 3. 时间复杂度:O(n)
* 4. 可能阻塞:如果其他线程正在操作队列
*/
public boolean contains(Object o) {
if (o == null) return false; // null元素不包含
fullyLock(); // 获取两个锁
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item)) // 比较元素
return true;
return false; // 没有找到
} finally {
fullyUnlock(); // 释放两个锁
}
}
/**
* 移除队列中的指定元素
* @param o 要移除的元素
* @return 如果成功移除返回true,否则返回false
*
* remove()特点:
* 1. 线程安全:获取两个锁保护
* 2. 全遍历:需要遍历整个队列
* 3. 时间复杂度:O(n)
* 4. 只移除第一个匹配的元素
* 5. 可能阻塞:如果其他线程正在操作队列
*/
public boolean remove(Object o) {
if (o == null) return false; // null元素不能移除
fullyLock(); // 获取两个锁
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) { // 找到匹配元素
unlink(p, trail); // 断开链接
return true; // 返回true
}
}
return false; // 没有找到
} finally {
fullyUnlock(); // 释放两个锁
}
}
/**
* 断开指定节点的链接
* @param p 要断开的节点
* @param trail 前驱节点
*/
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
7. 锁操作方法(详细注释)
/**
* 获取两个锁(全锁)
* 用于需要同时保护队列两端的操作
* @throws InterruptedException 如果线程被中断
*
* 锁获取顺序:
* 1. 先获取取出锁
* 2. 再获取插入锁
* 3. 避免死锁:统一的锁获取顺序
*/
void fullyLock() throws InterruptedException {
putLock.lock(); // 获取插入锁
takeLock.lock(); // 获取取出锁
}
/**
* 释放两个锁(全解锁)
* 与fullyLock()对应
*
* 锁释放顺序:
* 1. 先释放取出锁
* 2. 再释放插入锁
* 3. 与获取顺序相反
*/
void fullyUnlock() {
takeLock.unlock(); // 释放取出锁
putLock.unlock(); // 释放插入锁
}
/**
* 通知等待取元素的线程
* 在插入元素后调用
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // 获取取出锁
try {
notEmpty.signal(); // 通知等待的消费者
} finally {
takeLock.unlock(); // 释放取出锁
}
}
/**
* 通知等待插入元素的线程
* 在取出元素后调用
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 获取插入锁
try {
notFull.signal(); // 通知等待的生产者
} finally {
putLock.unlock(); // 释放插入锁
}
}
8. 迭代器实现(详细注释)
/**
* 返回队列的迭代器
* @return 队列的迭代器
*
* 迭代器特点:
* 1. 弱一致性:反映创建时的状态
* 2. 不抛出ConcurrentModificationException
* 3. 线程安全:使用锁保护
* 4. 支持remove()操作
* 5. 按FIFO顺序遍历
*/
public Iterator<E> iterator() {
return new Itr(); // 返回自定义迭代器
}
/**
* LinkedBlockingQueue的迭代器实现
*/
private class Itr implements Iterator<E> {
/**
* 当前节点
*/
private Node<E> current;
/**
* 上次返回的节点
*/
private Node<E> lastRet;
/**
* 当前节点的元素
*/
private E currentElement;
/**
* 构造方法
*/
Itr() {
fullyLock(); // 获取两个锁
try {
current = head.next; // 从第一个有效节点开始
if (current != null)
currentElement = current.item; // 获取元素
} finally {
fullyUnlock(); // 释放两个锁
}
}
/**
* 判断是否还有下一个元素
* @return 如果还有下一个元素返回true,否则返回false
*/
public boolean hasNext() {
return current != null; // 判断当前节点是否为空
}
/**
* 获取下一个元素
* @return 下一个元素
* @throws NoSuchElementException 如果没有下一个元素
*/
public E next() {
fullyLock(); // 获取两个锁
try {
if (current == null)
throw new NoSuchElementException(); // 没有下一个元素
E x = currentElement; // 获取当前元素
lastRet = current; // 记录上次返回的节点
current = current.next; // 移动到下一个节点
currentElement = (current == null) ? null : current.item; // 获取下一个元素
return x; // 返回当前元素
} finally {
fullyUnlock(); // 释放两个锁
}
}
/**
* 移除上次返回的元素
* @throws IllegalStateException 如果没有可移除的元素
*/
public void remove() {
if (lastRet == null)
throw new IllegalStateException(); // 没有可移除的元素
fullyLock(); // 获取两个锁
try {
Node<E> node = lastRet;
lastRet = null;
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (p == node) { // 找到要移除的节点
unlink(p, trail); // 断开链接
break;
}
}
} finally {
fullyUnlock(); // 释放两个锁
}
}
}
9. 转换和克隆方法(详细注释)
/**
* 转换为数组
* @return 包含队列所有元素的数组
*
* toArray()特点:
* 1. 线程安全:获取两个锁保护
* 2. 保持顺序:按照队列的FIFO顺序
* 3. 瞬时快照:反映调用时刻的状态
* 4. 完整复制:包含所有元素
*/
public Object[] toArray() {
fullyLock(); // 获取两个锁
try {
int size = count.get(); // 获取元素数量
Object[] a = new Object[size]; // 创建数组
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item; // 复制元素
return a; // 返回数组
} finally {
fullyUnlock(); // 释放两个锁
}
}
/**
* 转换为指定类型的数组
* @param a 指定类型的数组
* @return 转换后的数组
* @throws ArrayStoreException 如果数组类型不兼容
* @throws NullPointerException 如果a为null
*/
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
fullyLock(); // 获取两个锁
try {
int size = count.get(); // 获取元素数量
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), size); // 创建新数组
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item; // 复制元素
if (a.length > k)
a[k] = null; // 清空多余位置
return a; // 返回数组
} finally {
fullyUnlock(); // 释放两个锁
}
}
/**
* 克隆队列
* @return 克隆后的队列
*
* clone()特点:
* 1. 浅克隆:只克隆队列结构,不克隆元素
* 2. 独立状态:新队列有自己的锁和条件
* 3. 相同元素:包含相同的元素引用
* 4. 无界队列:克隆后的队列也是无界的
*/
public Object clone() {
try {
@SuppressWarnings("unchecked")
LinkedBlockingQueue<E> other = (LinkedBlockingQueue<E>) super.clone();
other.count = new AtomicInteger(); // 创建新的计数器
other.head = other.last = new Node<E>(null); // 创建新的哨兵节点
other.putLock = new ReentrantLock(); // 创建新的插入锁
other.notFull = other.putLock.newCondition(); // 创建新的notFull条件
other.takeLock = new ReentrantLock(); // 创建新的取出锁
other.notEmpty = other.takeLock.newCondition(); // 创建新的notEmpty条件
// 遍历原队列,将元素添加到新队列
fullyLock(); // 获取两个锁
try {
for (Node<E> p = head.next; p != null; p = p.next) {
other.put(p.item); // 添加元素
}
} finally {
fullyUnlock(); // 释放两个锁
}
return other;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}
10. LinkedBlockingQueue 的特点分析
核心设计理念:
/**
* LinkedBlockingQueue的核心设计思想:
*
* 1. 链表实现的阻塞队列:
* - 使用单向链表存储元素
* - 支持有界和无界两种模式
* - 动态内存分配,按需扩展
*
* 2. 双锁分离机制:
* - 使用两个独立的ReentrantLock
* - putLock保护插入操作
* - takeLock保护取出操作
* - 生产者和消费者可以并行操作
*
* 3. 原子计数器:
* - 使用AtomicInteger跟踪元素数量
* - 避免锁竞争,提高性能
* - 保证计数的原子性和可见性
*
* 4. 条件队列机制:
* - notEmpty条件用于等待元素
* - notFull条件用于等待空间
* - 支持阻塞和超时操作
*
* 5. 哨兵节点设计:
* - head节点是哨兵节点,不存储实际元素
* - 简化链表操作的边界处理
* - 避免空指针异常
*
* 6. 内存管理:
* - 及时清空节点引用,帮助GC
* - 链式结构便于内存回收
* - 避免内存泄漏
*
* 适用场景:
* - 高并发的生产者-消费者模式
* - 线程池任务队列
* - 需要高性能队列操作的场景
* - 可以接受无界队列的场景
*/
性能特征分析:
/**
* LinkedBlockingQueue的性能特征:
*
* 时间复杂度:
* - offer(e): O(1) - 非阻塞插入
* - put(e): O(1) 平均,O(∞) 最坏(阻塞等待)
* - poll(): O(1) - 非阻塞取出
* - take(): O(1) 平均,O(∞) 最坏(阻塞等待)
* - size(): O(1) - 原子计数器
* - contains(o): O(n) - 需要遍历队列
* - remove(o): O(n) - 需要遍历并断开链接
*
* 空间复杂度:
* - O(n) - 链表存储,按需分配
* - 每个节点额外的next指针开销
* - 哨兵节点的固定开销
*
* 并发特性:
* - 双锁机制:生产者消费者独立锁
* - 阻塞操作:支持生产者和消费者的协调
* - 原子计数:避免锁竞争
* - 内存一致性:通过锁保证可见性
*
* 锁竞争:
* - 生产者之间可能有锁竞争
* - 消费者之间可能有锁竞争
* - 生产者和消费者可以并行操作
* - 相比单锁机制性能更好
*
* 内存使用:
* - 动态分配,按需扩展
* - 无界队列可能消耗大量内存
* - 及时GC,避免内存泄漏
*
* 吞吐量:
* - 高并发下吞吐量优于ArrayBlockingQueue
* - 双锁分离减少锁竞争
* - 适合高并发场景
*/
11. 使用示例和最佳实践
/**
* 使用示例:
*
* // 基本使用(无界队列)
* LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
*
* // 指定容量的有界队列
* LinkedBlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);
*
* // 生产者线程
* Thread producer = new Thread(() -> {
* try {
* for (int i = 0; i < 1000; i++) {
* String item = "item-" + i;
* queue.put(item); // 阻塞插入
* System.out.println("Produced: " + item);
* }
* } catch (InterruptedException e) {
* Thread.currentThread().interrupt();
* }
* });
*
* // 消费者线程
* Thread consumer = new Thread(() -> {
* try {
* while (true) {
* String item = queue.take(); // 阻塞取出
* System.out.println("Consumed: " + item);
* Thread.sleep(100); // 模拟处理时间
* }
* } catch (InterruptedException e) {
* Thread.currentThread().interrupt();
* }
* });
*
* producer.start();
* consumer.start();
*
* // 线程池使用示例
* ExecutorService executor = Executors.newFixedThreadPool(10);
* LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
*
* // 提交大量任务
* for (int i = 0; i < 10000; i++) {
* final int taskId = i;
* executor.submit(() -> {
* System.out.println("Task " + taskId + " executed by " +
* Thread.currentThread().getName());
* });
* }
*
* 最佳实践:
*
* 1. 合理设置队列容量:
* // 根据系统资源和业务需求设置合适容量
* int capacity = 1000;
* LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(capacity);
*
* 2. 使用非阻塞操作避免死锁:
* // 在某些场景下使用非阻塞操作
* if (queue.offer(task)) {
* System.out.println("Task added successfully");
* } else {
* System.out.println("Queue is full, task rejected");
* // 实现拒绝策略
* }
*
* 3. 设置合理的超时时间:
* // 避免无限期等待
* try {
* Task task = queue.poll(5, TimeUnit.SECONDS);
* if (task != null) {
* processTask(task);
* } else {
* System.out.println("No task available within timeout");
* }
* } catch (InterruptedException e) {
* Thread.currentThread().interrupt();
* }
*
* 4. 处理中断异常:
* // 正确处理InterruptedException
* try {
* Task task = queue.take();
* processTask(task);
* } catch (InterruptedException e) {
* Thread.currentThread().interrupt(); // 恢复中断状态
* System.out.println("Consumer interrupted");
* }
*
* 5. 监控队列状态:
* // 定期监控队列使用情况
* int size = queue.size();
* int remaining = queue.remainingCapacity();
* double usage = (double) size / (size + remaining);
* System.out.println("Queue usage: " + String.format("%.2f%%", usage * 100));
*
* 6. 批量操作提高效率:
* // 使用批量操作减少锁竞争
* List<Task> tasks = new ArrayList<>();
* int count = queue.drainTo(tasks, 50); // 批量取出最多50个任务
* for (Task task : tasks) {
* processTask(task);
* }
*
* 7. 避免队列过载:
* // 实现背压机制
* if (queue.size() > 800) {
* System.out.println("Queue is nearly full, slowing down production");
* Thread.sleep(1000); // 暂停生产
* }
*
* 8. 正确使用迭代器:
* // 迭代器是弱一致性的
* Iterator<String> it = queue.iterator();
* while (it.hasNext()) {
* String item = it.next();
* // 处理元素
* // 注意:迭代过程中队列可能发生变化
* }
*
* 9. 内存管理:
* // 对于大对象,注意内存使用
* if (queue.size() > 10000) {
* System.out.println("Warning: Large queue size, potential memory issue");
* }
*
* 10. 异常处理:
* // 处理可能的异常情况
* try {
* queue.put(largeObject);
* } catch (OutOfMemoryError e) {
* System.out.println("Memory exhausted, consider reducing queue size");
* // 实现降级策略
* }
*/
12. 与其他阻塞队列的比较
/**
* LinkedBlockingQueue vs ArrayBlockingQueue vs SynchronousQueue:
*
* LinkedBlockingQueue:
* - 链表实现,动态内存分配
* - 双锁机制,生产者消费者独立锁
* - 支持有界和无界模式
* - 高并发性能好
* - 内存使用动态增长
*
* ArrayBlockingQueue:
* - 数组实现,固定内存分配
* - 单锁机制,生产者消费者共享锁
* - 必须指定容量(有界队列)
* - 内存使用固定
* - 适合需要有界缓冲的场景
*
* SynchronousQueue:
* - 容量为0,直接传递
* - 不存储元素,生产者消费者直接配对
* - 最低延迟,最高吞吐量
* - 适合直接传递的场景
*
* PriorityBlockingQueue:
* - 无界优先级队列
* - 按优先级排序
* - 适合需要优先级处理的场景
*
* DelayQueue:
* - 无界延迟队列
* - 只有延迟到期的元素才能取出
* - 适合定时任务调度
*
* 选择建议:
* - 高并发场景:LinkedBlockingQueue
* - 需要有界缓冲:ArrayBlockingQueue
* - 直接传递:SynchronousQueue
* - 优先级处理:PriorityBlockingQueue
* - 延迟处理:DelayQueue
*
* 性能对比:
* - 吞吐量:LinkedBlockingQueue > ArrayBlockingQueue > SynchronousQueue
* - 内存使用:ArrayBlockingQueue < LinkedBlockingQueue < SynchronousQueue
* - 延迟:SynchronousQueue < LinkedBlockingQueue < ArrayBlockingQueue
* - 并发性:LinkedBlockingQueue > ArrayBlockingQueue > SynchronousQueue
*/
13. 总结
LinkedBlockingQueue 的核心特性:
-
链表实现:
- 使用单向链表存储元素
- 动态内存分配,按需扩展
- 支持有界和无界两种模式
-
双锁分离:
- putLock保护插入操作
- takeLock保护取出操作
- 生产者和消费者可以并行操作
- 减少锁竞争,提高并发性能
-
原子计数:
- 使用AtomicInteger跟踪元素数量
- 避免锁竞争,提高性能
- 保证计数的原子性和可见性
-
多种阻塞策略:
- put()/take():无限期阻塞
- offer()/poll():超时阻塞
- offer()/poll():非阻塞操作
-
线程安全:
- 所有操作都通过锁保护
- 保证原子性和可见性
- 支持多个生产者和消费者
适用场景:
- 高并发的生产者-消费者模式
- 线程池任务队列
- 需要高性能队列操作的场景
- 可以接受无界队列的场景
- 需要批量操作的场景
注意事项:
- 无界队列可能导致内存溢出
- 链表结构有额外的内存开销
- 不允许存储null元素
- 迭代器是弱一致性的
- 大容量时注意内存使用
性能优化建议:
- 根据业务需求合理设置队列容量
- 正确处理中断异常
- 使用批量操作减少锁竞争
- 监控队列使用情况
- 实现合适的拒绝策略
- 避免队列过载和内存泄漏
- 定期清理不需要的元素