【JAVA多线程】AQS,JAVA并发包的核心

目录

1.概述

1.1.什么是AQS

1.2.AQS和BlockQueue的区别

1.3.AQS的结构

2.源码分析

2.1.CLH队列

2.2.模板方法的实现

2.2.1.独占模式

1.获取资源

2.释放资源

2.2.2.共享模式


1.概述

1.1.什么是AQS

AQS非常非常重要,可以说是JAVA并发包(java.util.concurrent)的核心。

以下是一个常见的多线程场景:

当多个线程对同一个资源进行争抢时,没有抢到的线程怎么办?直接拒绝然后丢弃?最可行最合适的方式当然是让没有抢到资源的线程排队阻塞起来,这样即让出了CPU的资源,又方便当资源被释放、可争抢的时候唤醒线程来继续争抢。

我们发现一个信号量+一个线程安全的用来存放线程的队列,是并发编程体系的一个底座,这个底座有一个名字叫——同步器。

AQS(AbstractQueuedSynchronizer),一个JDK中提供的同步器,提供一系列线程的控制和同步机制,是JDK中诸多需要进行线程控制和同步的地方,诸如可重入锁ReentrantLock,以及诸多同步工具Semaphore、CountDownLatch、CyclicBarrier等,其底层都是使用的AQS来实现线程控制和同步。

1.2.AQS和BlockQueue的区别

阻塞队列(Blocking Queue)是一种特殊的队列,它的特殊之处在于它具有阻塞特性。当队列为空时,从队列中获取元素的线程会被阻塞,直到队列中有新的元素被加入;同样地,当队列满时,试图向队列中添加元素的线程也会被阻塞,直到队列中有空余的空间。这种特性使得阻塞队列非常适用于生产者-消费者模式中。

阻塞队列:

  • 元素的读写是线程安全的

  • 里面存的任务

  • 阻塞的是外部的生产者线程/消费者线程

AQS,同步器,是用来进行线程控制和同步的一个基础组件,用来让现场排队、阻塞、唤醒等操作。

AQS:

  • 元素的读写是安全的

  • 里面存的是线程

  • 阻塞的是内部存的线程

1.3.AQS的结构

AQS提供了什么?

一套API和一套数据结构。

可以看到AQS是一个抽象类:

既然是抽象类而不是直接是个接口就说明,其提供的不只是一个标准,而是一套资源。其底层用链表来组织其管理的线程,包含:

  • 链表的节点Node

  • 链表的头尾指针

  • 状态state,这是个volatile变量用于表示同步状态,在不同的实现类中,它的含义不同,有可能表示锁是否被持有、持有者持有锁的次数、信号量可用的许可数量等。

  • 条件变量ConditionObject

AQS提供了一套标准的API,来完成两方便的操作:

  • 状态控制

  • 资源争抢

状态控制:

  • getState():返回同步状态

  • setState(int newState):设置同步状态

  • compareAndSetState(int expect, int update):使用CAS设置同步状态

  • isHeldExclusively():当前线程是否持有资源 独占资源(不响应线程中断)

资源争抢和释放一共两种,共享模式、独占模式:

  • tryAcquire(int arg):独占式获取资源,子类实现

  • acquire(int arg):独占式获取资源模

  • tryRelease(int arg):独占式释放资源,子类实现

  • release(int arg):独占式释放资源模板 共享资源(不响应线程中断)

  • tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现

  • acquireShared(int arg):共享式获取资源模板

  • tryReleaseShared(int arg):共享式释放资源,子类实现

  • releaseShared(int arg):共享式释放资源模板

独占模式出现在对一把锁进行争抢、释放的场景中,如ReentrantLock中;共享模式出现在对多个资源的争抢、释放的场景中,如ReadWriteLock、CountDownLatch中。

2.源码分析

2.1.CLH队列

在AQS的源码开头,就描述了其是使用CLH队列来组织没争抢到资源的线程的:

该队列是用链表实现的,没被争抢到资源的线程会被封装成Node,被挂入链表中。node的入队和出队都用的CAS(自旋锁)来保证效率和安全的兼得。

这个Node类源码很简单,主要是要注意Node是有状态的,用状态来控制该节点该不该被唤醒,用waitStatus来表示状态:

Node 类中的静态常量定义了以下状态:

  • SIGNAL (-1):表示当前节点的后续节点需要被唤醒。这是默认状态,当一个节点被添加到队列中时,它的 waitStatus 会被设置为 -1。

  • PROPAGATE (-3):类似于 SIGNAL,但在某些情况下,如释放共享模式下的锁时,可能会设置为 -3,以帮助传播唤醒信

  • CONDITION (-2):表示节点位于条件队列中,而不是同步队列中。这通常与 Condition 对象相关联。

  • 0:表示没有特别的状态。当节点被初始化时,它的 waitStatus 默认为 0。

  • CANCELLED (1):表示节点已经被取消。这通常发生在线程被中断或者尝试获取锁失败时。一旦节点被标记为 CANCELLED,它就会从队列中移除。

waitStatus 的使用:

  • 线程阻塞:当一个线程被插入到队列中时,它的 waitStatus 通常会被设置为 SIGNAL。这意味着当前节点的后续节点应该被唤醒,当当前节点释放时。

  • 线程唤醒:当一个线程释放锁或其他同步状态时,它会检查自己的 waitStatus,并根据情况唤醒后继节点。例如,在独占模式下,当线程释放锁时,它会检查其后继节点的 waitStatus,如果是 SIGNAL,则会唤醒该节点对应的线程。

  • 条件队列:当线程等待某个条件满足时,它会被移到条件队列中,并且其 waitStatus 会被设置为 CONDITION。当条件满足时,线程会被放回同步队列,并且 waitStatus 会被设置为 SIGNAL。

  • 取消节点:如果线程被中断或超时,或者由于其他原因无法继续等待,那么该节点会被标记为 CANCELLED,并且从队列中移除。

整个CLH队列可以理解为:

2.2.模板方法的实现

2.2.1.独占模式

1.获取资源

AQS的实现类很多,此处我们以ReentrantLock的FairSync(公平锁)为例。

lock()其实是去,调用AQS的acquire():

AQS的acquire()会去调用实现类的tryAcquire()来判断当前线程是否能抢到线程,如果抢不到就入队CLH,并且阻塞:

至此我们就能体会到了,AQS实现了核心的阻塞+入队以及唤醒+出队的一些列对CLH中线程的操作,但具体的争抢策略作为模板开放出去了,也就是上文我们说的一系列模板方法。

我们来看看FairSync的tryAcquire()实现:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();//获取锁的状态
            if (c == 0) {//如果锁没有被持有
                //就去判断CLH队列中还有没有线程,没有的话就去CAS获取锁
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前线程是锁的持有者就去增加锁的持有次数
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
2.释放资源

释放资源应该做些什么?这里我们想一下就能想出来:

  • 首先释放锁

  • 然后唤醒后面的线程。

释放锁:

可以看到ReentrantLock的unlock调用的AQS的release()去释放锁,AQS的release里会先去tryRelease()尝试释放锁,tryRelease()是交给子类重写的:

所以调用的就是reentrantLock的释放策略:

ReentrantLock的尝试释放的过程很简单,就是线程安全的去减少锁的持有次数+清除锁的持有者的信息:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //检查当前线程是否是锁的所有者,确保只有锁的持有者能释放锁
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                //清除锁的持有者
                setExclusiveOwnerThread(null);
            }
            //刷新状态
            setState(c);
            return free;
        }

唤醒后面的线程:

继续回到AQS的release,继续往下看,tryRelease()后确定可以释放锁后,unparkSuccessor()唤醒CLH队列中需要被唤醒的第一个线程。为什么说是第一个需要被唤醒的喃?前文已经说过了Node节点有状态,状态用来表示该节点是否需要被唤醒,有些状态是表示节点不需要被唤醒的。

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒后面的需要被唤醒的线程
            return true;
        }
        return false;
    }

unparkSuccessor()很简单,修改当前节点状态,唤醒后面需要唤醒的状态:

private void unparkSuccessor(Node node) {
        //将当前的节点从待唤醒状态变为已经唤醒
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
​
        //找到后续第一个需要被唤醒的节点,然后唤醒它
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

2.2.2.共享模式

之所以有共享模式,是因为有时候资源不一定是唯一的,比如除了锁之外,可能要求能有10条线程同时工作,又或者读写之间不互斥等需求。在这种允许多条线程同时工作的情景中就要用到共享模式,标志位state不再表示一个意思(是否被持有),而是用来表示多个意思,有可能是线程的数量,也有可能是高低位分别表示是否允许读或者写。

共享模式下,资源的获取是通过acquireShared和tryAcquireShared来实现的,释放是通过releaseShared和tryReleaseShared来实现的。具体代码实现的话其实和独占模式大差不差,只是在对标志位state的操作上略有不同,以及可能会有一些位操作。以下是一些常见的使用共享模式的场景:

  • 读写锁 (ReadWriteLock): 读写锁是一种常用的同步工具,它允许多个线程同时读取共享资源,但在写入时需要独占资源。 读锁通常使用共享模式实现,因为多个读线程可以同时访问资源。 写锁则使用独占模式,因为写入操作通常需要独占资源以避免数据不一致。

  • 信号量 (Semaphore): 信号量用于限制可以同时访问共享资源的线程数量。 它可以通过减少或增加一个计数器来管理资源的可用性,这个计数器通常就是 AQS 的 state 字段。 当线程尝试获取信号量时,它会减少 state 的值;当线程释放信号量时,它会增加 state 的值。 因为多个线程可以同时获取信号量,所以信号量通常使用共享模式。

  • 循环屏障 (CyclicBarrier): 循环屏障允许一组线程相互等待,直到所有线程都到达了一个共同的屏障点。 当最后一个线程到达屏障点时,所有线程都会被释放继续执行。 循环屏障通常使用共享模式来管理线程的等待和释放。

  • CountDownLatch: CountDownLatch 是一种同步辅助类,它允许一个或多个线程等待其他线程完成操作。 它维护了一个计数器,每当一个操作完成时,计数器就会减少。 当计数器达到零时,所有等待的线程都会被释放。 CountDownLatch 通常使用共享模式来管理计数器的减少。

  • Exchanger: Exchanger 允许两个线程交换对象。 两个线程各自提供一个对象并等待另一个线程到达交换点。 一旦两个线程都到达交换点,它们就可以交换对象并继续执行。 Exchanger 通常使用共享模式来管理线程的交换过程。 示例代码

由于这是一个系列文章,后面几篇文章我们就将聊到CountDownLatch、Semaphore、CyclicBarrier之类的同步工具类、ReadWriteLock之类的读写锁,到时候分析源码的时候,再来看看各自的具体实现。

评论 37
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

_BugMan

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值