AQS是什么?
AQS(AbstractQueuedSynchronizer)是Java中重要的同步框架。该框架实现了大部分的同步器功能,使使用者仅覆盖几个简单的方法就可以得到各种各样的同步器,包括ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,ThreadPoolExecutor等,当然也可以很简单的实现我们自己定义的同步器。
从ReentrantLock说起。
当我们使用一个ReentrantLock时
Lock lock=new ReentrantLock();
lock.lock();
lock.unlock();
我们看下源码,是怎么实现的。
ReentrantLock.class
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
我们看到构造方法默认调用了一个叫NonfairSync的静态内部类,而NonfairSync继承自一个叫Sync静态抽象内部类,就是这个Sync继承自AQS。这样ReentrantLock和AQS的关系就有个了解了。
上锁过程。
当执行lock.lock时,很显然会调用NonfairSync中的lock方法。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
而这个方法里的内部操作大部分都是由AQS完成的。
AbstractQueuedSynchronizer.class
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
compareAndSetState使用本地方法实现CAS。
注意:acquire()方法中,调用tryAcquire()方法,而这个方法在AQS中并没有给出实现方法,直接抛出异常,这样就要求该方法在子类中实现。
我们继续看跟这个方法
ReentrantLock.class
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取锁状态
int c = getState();
//锁未被占用
if (c == 0) {
//使用CAS获取锁
if (compareAndSetState(0, acquires)) {
//获取锁成功,将当前线程设置为占用线程
setExclusiveOwnerThread(current);
//返回获取锁成功
return true;
}
}
//如果锁已经被占用,判断占用线程是否为本线程,如果是本线程,实现可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//改变锁的状态
setState(nextc);
//获取锁成功
return true;
}
//获取锁失败
return false;
}
}
我们看到其实调用的是Sync 内部静态类里的实现方法nonfairTryAcquire()。
通过注释,可以很清晰得看到获取锁的流程,且通过比较占用线程,实现了可重入锁。
以上是获取锁成功的情况,如果获取锁失败了呢,肯定要存入队列啊,那么是怎么存入队列的呢?在队列中待到什么时候可以出列呢?我们继续往下看。
前面我们看到这行代码
AQS类
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
获取锁失败后,就要执行acquireQueued及addWaiter方法了。这两个方法都是AQS方法。
AQS类
private Node addWaiter(Node mode) {
//生成当前node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果队列中已经存在节点
if (pred != null) {
node.prev = pred;
//将本节点放到队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果队列不存在,初始化队列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
**//对队列中的节点进行获取锁的相关设置**
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//得到前置节点
final Node p = node.predecessor();
//如果前置节点是头节点,本节点可以尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取锁成功,将当前节点设为头节点,并释放头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//当前节点的前置节点不是头节点,或者获取锁失败(非公平锁时,被新来的线程抢占了锁)
//判断是否需要挂起操作,防止CPU消耗
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
是否需要挂起方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前置节点的状态
int ws = pred.waitStatus;
//如果前置节点处于唤醒状态,则需挂起本节点
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//前置节点状态为取消状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
//轮询删除取消状态的前置节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//设置前置节点为唤醒状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
通过以上代码可以看到,只有当前置节点为头节点且自己成功获取到锁时才跳出循环,当前置节点处于唤醒状态时,挂起本节点。否则删除前置节点中的取消节点。
加锁过程小结:
通过lock方法,调用AQS中的acquire方法,acquire()方法会调用tryAcquire方法,该方法由自定义子类实现。若调用失败,则将该线程放入队列,并在队列中进行循环尝试获取锁或者挂起。
加锁我们明白了,那么队列中的节点是怎么出列的呢?
我们看下解锁过程。
lock.unlock();
会调用ReentrantLock.class
public void unlock() {
sync.release(1);
}
进而调用AQS中的release方法
AQS.class
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryRelease 同tryAcquire一样,都有子类实现。这样就回到ReentrantLock类
ReentrantLock.class
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
主要操作就是减少state状态值,对占有线程进行置为null。
注意下tryRelease成功做执行unparkSuccessor(h);方法,用于通知后置节点的终止挂起。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//获取当前节点的状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这样后置节点就可以继续循环尝试获取锁了。
解锁过程小结:
unlock 方法调用AQS的release方法,release在调用自定义同步器子类实现的tryRelease方法。如果成功,将本节点作为做头节点,释放原头节点,通知后置节点,中止挂起。后置节点就可以继续尝试获取锁了。
总结一下AQS重点
AQS作为一个同步框架,内部存在一个volatile 实现的state数值作为真正的锁字段,提供了抽象方法tryAcquire和tryRelease供子类实现,提供了虚拟线程队列,当tryAcquire失败时将当前线程放入虚拟队列并挂起,当tryRelease成功时取消后一个节点挂起,进行tryAcquire抢锁。
ReentrantLock 的可重入能力在于设置了锁的占有线程并在获取锁时进行判断并累计占有次数。公平锁在获取锁时会先判断队列是否存在,非公平锁会直接尝试抢锁,失败后入队。
加锁解锁过程我们都说完了,下面利用AQS实现个自定义的锁试一下。
自己写个MyLock
public class MyLock {
public static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0,arg);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
}
private Sync sync=new Sync();
public void lock(){
sync.acquire(1);
}
public void unlock(){
sync.release(1);
}
}
测试一下,不加锁
public class LockTest {
static int count=0;
static MyLock myLock=new MyLock();
public static void main(String[] args) throws Exception{
Runnable runnable=()->{
try {
// myLock.lock();
for (int i=0;i<1000;i++){
count++;
if (i % 50 == 0){
Thread.sleep(1);
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
// myLock.unlock();
}
};
Thread thread1=new Thread(runnable);
Thread thread2=new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
测试结果
1994
很显然,出现了并发问题,我们希望得到的结果是2000,却只得到了1994.每次得到的结果都不同
加上我们的自定义锁试一下,打开注释代码,得到结果每次都是2000。说明我们的锁成功生效。
所以我们仅仅需要重新实现AQS中的tryAcquire 和tryRelease就可以很容易实现一个锁了。
我们看到ReentratLock使用一样的方式,实现了公平锁和非公平锁。那么接下来再看一下其他的同步组件是什么实现的,本节太长了,另开一节吧。