Java基础:java.util.concurrent.BlockingQueue

前言

在一次项目中,偶遇BlockingQueue,特意查了下用法,使我对它有了强列的兴趣,经过一段时间的学习,将其整理,用图解的方式解释,方便理解。

介绍

在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。图(1_0.png)是其继承关系,可以看出BlockingQueue是继承Queue。

我们先看下BlockQueue的图解,通过图,我们很容易理解,这是一个队列基本图,在图中,我们发现入队有put、add、offer三个方法。出队有poll、remove、take三个方法,就是因为这几个方法,使得blockQueue功能很强大。

put、add、offer 区别

  • put
    把Object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
  • add
    把Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则报异常
  • offer
    将Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.
    当队列满了,put阻塞,等有了再加,add直接报错,offer返回状态

poll、remove、take 区别

  • take
    取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
  • remove
    取走BlockingQueue里排在首位的对象,若不能立即取出,则抛出异常
  • poll
    取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
        取不到时返回null;
    当队列空了,take取不到就等,remove就抛异常,poll就返回null

阻塞模型

  1. 当队列满了,再往队列里put数据就会出现线程等待,模型如下

  2. 当队列空了,再往队列里take数据,就会出现线程等待,模型如下

 实现了BlockingQueue的类

我们常用的队列就linkedBlockingQueue和ArrayBlockingQueue。

LinkedBlockingQueue 和 ArrayBlockingQueue

区别
我们区分下LinkedBlockingQueue和ArrayBlockingQueue,凡是linked打头的都是内部维护一个链表,Array打头的是维护一个数组,实现方式不一样,功能也有所区别。

  • LinkedBlockingQueue是内部维护一个链表,当执行put方法时,程序会先获得一个重入锁,防止多线程同时操作产生数据不正确,然后后生成一个数据节点(Node)添加到链表的最后面。如果队列满,则使用java.util.concurrent.locks.Conditionawait()方法来阻塞继续添加。

这是put方法源码:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

enqueue()的源码如下:

private void enqueue(Node<E> node) { 
  // assert 
  putLock.isHeldByCurrentThread(); 
  // assert 
  last.next == null; 
  last = last.next = node;
}

enqueue()的作用是将node添加到队列末尾,并设置node为新的尾节点!

signalNotEmpty()的源码如下:

private void signalNotEmpty() { 
  final ReentrantLock takeLock = this.takeLock; 
  takeLock.lock(); 
  try { 
    notEmpty.signal(); 
  } finally { 
    takeLock.unlock(); 
  }
}

signalNotEmpty()的作用是唤醒notEmpty上的等待线程。

take()取数据也用了await()方法,了解原理后,发现取数据时当链表为空时,就阻塞。
这是take()方法源码:

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常 
        takeLock.lockInterruptibly();
        try {
            // 若“队列为空”,则一直等待。 
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 取出元素 
            x = dequeue();
            // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。 
            c = count.getAndDecrement();
            if (c > 1) notEmpty.signal();
        } finally {
            // 释放“取出锁” 
            takeLock.unlock();
        }
        // 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。 
        if (c == capacity)
            signalNotFull();
        return x;
    }

说明:take()的作用是取出并返回队列的头。若队列为空,则一直等待。

dequeue()的源码如下:

private E dequeue() { 
  // assert 
  takeLock.isHeldByCurrentThread(); 
  // assert 
  head.item == null; 
  Node<E> h = head; 
  Node<E> first = h.next; 
  h.next = h; 
  // help GC 
  head = first; 
  E x = first.item; 
  first.item = null; 
  return x;
}

dequeue()的作用就是删除队列的头节点,并将表头指向“原头节点的下一个节点”。

signalNotFull()的源码如下:

private void signalNotFull() { 
  final ReentrantLock putLock = this.putLock; 
  putLock.lock(); 
  try { 
    notFull.signal(); 
  } finally { 
    putLock.unlock(); 
  }
}

signalNotFull()的作用就是唤醒notFull上的等待线程。

  • ArrayBlockingQueue是内部维护一个数组,当执行put方法时,程序也会先获得一个重入锁,防止多纯种同时操作数组,然后直接插入到数组上面,这里和linkedBlockingQueue有个区别就是不会产生一个新的对象节点,开销上插入比LinkedBlockQueue节省空间。

这是put方法源码:

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
  • 总结

这两个队列的实现几乎都是相似的,我们可以理解成对应的数组和链表的实现。并且是线程安全的,在特定场景中使用特定的实现类能保证高效和空间的合理性。

作者:wuxiaowei
链接:https://www.jianshu.com/p/9a7d73064e14

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值